Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 90 additions & 1 deletion src/bin/pgcopydb/cli_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ static void cli_list_extension_requirements(int argc, char **argv);
static void cli_list_collations(int argc, char **argv);
static void cli_list_tables(int argc, char **argv);
static void cli_list_table_parts(int argc, char **argv);
static void cli_list_matviews(int argc, char **argv);
static void cli_list_sequences(int argc, char **argv);
static void cli_list_indexes(int argc, char **argv);
static void cli_list_depends(int argc, char **argv);
Expand Down Expand Up @@ -98,6 +99,17 @@ static CommandLine list_table_parts_command =
cli_list_db_getopts,
cli_list_table_parts);

static CommandLine list_matviews_command =
make_command(
"matviews",
"List all the materialized views to copy data from",
" --source ... ",
" --source Postgres URI to the source database\n"
" --filter <filename> Use the filters defined in <filename>\n"
" --list-skipped List only tables that are setup to be skipped\n",
cli_list_db_getopts,
cli_list_matviews);

static CommandLine list_sequences_command =
make_command(
"sequences",
Expand Down Expand Up @@ -164,6 +176,7 @@ static CommandLine *list_subcommands[] = {
&list_collations_command,
&list_tables_command,
&list_table_parts_command,
&list_matviews_command,
&list_sequences_command,
&list_indexes_command,
&list_depends_command,
Expand Down Expand Up @@ -1259,7 +1272,7 @@ cli_list_table_parts(int argc, char **argv)


/*
* cli_list_tables implements the command: pgcopydb list tables
* cli_list_sequences implements the command: pgcopydb list sequences
*/
static void
cli_list_sequences(int argc, char **argv)
Expand Down Expand Up @@ -1339,6 +1352,82 @@ cli_list_sequences(int argc, char **argv)
}


/*
* cli_list_matviews implements the command: pgcopydb list matviews
*/
static void
cli_list_matviews(int argc, char **argv)
{
PGSQL pgsql = { 0 };
SourceFilters filters = { 0 };
SourceMatViewArray matviewArray = { 0, NULL };

log_info("Listing matviews in source database");

if (!IS_EMPTY_STRING_BUFFER(listDBoptions.filterFileName))
{
if (!parse_filters(listDBoptions.filterFileName, &filters))
{
log_error("Failed to parse filters in file \"%s\"",
listDBoptions.filterFileName);
exit(EXIT_CODE_BAD_ARGS);
}

if (listDBoptions.listSkipped)
{
if (filters.type != SOURCE_FILTER_TYPE_NONE)
{
filters.type = filterTypeComplement(filters.type);

if (filters.type == SOURCE_FILTER_TYPE_NONE)
{
log_error("BUG: can't list skipped matviews "
" from filtering type %d",
filters.type);
exit(EXIT_CODE_INTERNAL_ERROR);
}
}
}
}

ConnStrings *dsn = &(listDBoptions.connStrings);

if (!pgsql_init(&pgsql, dsn->source_pguri, PGSQL_CONN_SOURCE))
{
/* errors have already been logged */
exit(EXIT_CODE_SOURCE);
}

if (!schema_list_matviews(&pgsql, &filters, &matviewArray))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
}

log_info("Fetched information for %d matviews", matviewArray.count);

fformat(stdout, "%8s | %20s | %30s | %11s \n",
"OID", "Schema Name", "Matview Name", "isPopulated");

fformat(stdout, "%8s-+-%20s-+-%30s-+-%11s\n",
"--------",
"--------------------",
"------------------------------",
"-----------");

for (int i = 0; i < matviewArray.count; i++)
{
fformat(stdout, "%8d | %20s | %30s | %11s\n",
matviewArray.array[i].oid,
matviewArray.array[i].nspname,
matviewArray.array[i].relname,
matviewArray.array[i].isPopulated ? "yes" : "no");
}

fformat(stdout, "\n");
}


/*
* cli_list_indexes implements the command: pgcopydb list indexes
*/
Expand Down
212 changes: 212 additions & 0 deletions src/bin/pgcopydb/schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ typedef struct SourceSequenceArrayContext
bool parsedOk;
} SourceSequenceArrayContext;

/* Context used when fetching database definitions */
typedef struct SourceMatViewArrayContext
{
char sqlstate[SQLSTATE_LENGTH];
SourceMatViewArray *matvArray;
bool parsedOk;
} SourceMatViewArrayContext;

/* Context used when fetching all the indexes definitions */
typedef struct SourceIndexArrayContext
{
Expand Down Expand Up @@ -176,6 +184,12 @@ static bool parseCurrentSourceTable(PGresult *result,

static bool parseAttributesArray(SourceTable *table, JSON_Value *json);

static void getMatViewList(void *ctx, PGresult *result);

static bool parseCurrentSourceMatview(PGresult *result,
int rowNumber,
SourceMatView *matv);

static void getSequenceArray(void *ctx, PGresult *result);

static bool parseCurrentSourceSequence(PGresult *result,
Expand Down Expand Up @@ -2362,6 +2376,55 @@ schema_set_sequence_value(PGSQL *pgsql, SourceSequence *seq)
}


/*
* matviews_list runs a Postgres catalog query that lists Materialized Views
* definitions.
*/
bool
schema_list_matviews(PGSQL *pgsql,
SourceFilters *filters,
SourceMatViewArray *matvArray)
{
SourceMatViewArrayContext parseContext = { { 0 }, matvArray, false };

char *sql =
" select c.oid, "
" format('%I', n.nspname) as nspname, "
" format('%I', c.relname) as relname, "
" relispopulated, "
" definition, "
" json_agg(i.indexrelid::bigint) "

" from pg_catalog.pg_class c"
" join pg_catalog.pg_namespace n on c.relnamespace = n.oid"
" join pg_catalog.pg_matviews m "
" on m.schemaname = n.nspname "
" and m.matviewname = c.relname "
" join pg_catalog.pg_index i on i.indrelid = c.oid"

" where c.relkind = 'm' "

"group by c.oid, nspname, relname, relispopulated, definition "
"order by nspname, relname";

if (!pgsql_execute_with_params(pgsql, sql,
0, NULL, NULL,
&parseContext, &getMatViewList))
{
log_error("Failed to list matviews");
return false;
}

if (!parseContext.parsedOk)
{
log_error("Failed to list matviews");
return false;
}

return true;
}


/*
* For code simplicity the index array is also the SourceFilterType enum value.
*/
Expand Down Expand Up @@ -5032,6 +5095,155 @@ parseCurrentSourceSequence(PGresult *result, int rowNumber, SourceSequence *seq)
}


/*
* getIndexArray loops over the SQL result for the matviews query and allocates
* an array of matview then populates it with the query result.
*/
static void
getMatViewList(void *ctx, PGresult *result)
{
SourceMatViewArrayContext *context = (SourceMatViewArrayContext *) ctx;
int nTuples = PQntuples(result);

log_debug("getMatViewList: %d", nTuples);

if (PQnfields(result) != 6)
{
log_error("Query returned %d columns, expected 6", PQnfields(result));
context->parsedOk = false;
return;
}

/* we're not supposed to re-cycle arrays here */
if (context->matvArray->array != NULL)
{
/* issue a warning but let's try anyway */
log_warn("BUG? context's array is not null in getMatViewList");

free(context->matvArray->array);
context->matvArray->array = NULL;
}

context->matvArray->count = nTuples;
context->matvArray->array =
(SourceMatView *) calloc(nTuples, sizeof(SourceMatView));

if (context->matvArray->array == NULL)
{
log_fatal(ALLOCATION_FAILED_ERROR);
return;
}

bool parsedOk = true;

for (int rowNumber = 0; rowNumber < nTuples; rowNumber++)
{
SourceMatView *matv = &(context->matvArray->array[rowNumber]);

parsedOk = parsedOk &&
parseCurrentSourceMatview(result, rowNumber, matv);
}

if (!parsedOk)
{
free(context->matvArray->array);
context->matvArray->array = NULL;
}

context->parsedOk = parsedOk;
}


/*
* parseCurrentSourceMatview parses a single row of the matview listing query
* result.
*/
static bool
parseCurrentSourceMatview(PGresult *result, int rowNumber, SourceMatView *matv)
{
int errors = 0;

/* 1. c.oid */
char *value = PQgetvalue(result, rowNumber, 0);

if (!stringToUInt32(value, &(matv->oid)) || matv->oid == 0)
{
log_error("Invalid OID \"%s\"", value);
++errors;
}

/* 2. n.nspname */
value = PQgetvalue(result, rowNumber, 1);
int length = strlcpy(matv->nspname, value, PG_NAMEDATALEN);

if (length >= PG_NAMEDATALEN)
{
log_error("Schema name \"%s\" is %d bytes long, "
"the maximum expected is %d (PG_NAMEDATALEN - 1)",
value, length, PG_NAMEDATALEN - 1);
++errors;
}

/* 3. c.relname */
value = PQgetvalue(result, rowNumber, 2);
length = strlcpy(matv->relname, value, PG_NAMEDATALEN);

if (length >= PG_NAMEDATALEN)
{
log_error("Materialized View name \"%s\" is %d bytes long, "
"the maximum expected is %d (PG_NAMEDATALEN - 1)",
value, length, PG_NAMEDATALEN - 1);
++errors;
}

/* compute the qualified name from the nspname and relname */
length = sformat(matv->qname, sizeof(matv->qname), "%s.%s",
matv->nspname,
matv->relname);

if (length >= sizeof(matv->qname))
{
log_error("Qualified seq name \"%s\".\"%s\" is %d bytes long, "
"the maximum expected is %lld",
matv->nspname,
matv->relname,
length,
(long long) sizeof(matv->qname) - 1);
++errors;
}

/* 4. relispopulated */
value = PQgetvalue(result, rowNumber, 3);

if (value == NULL || ((*value != 't') && (*value != 'f')))
{
log_error("Invalid relispopulated value \"%s\"", value);
++errors;
}
else
{
matv->isPopulated = (*value) == 't';
}

/* 5. sql definition */
value = PQgetvalue(result, rowNumber, 4);
length = strlen(value) + 1;
matv->def = (char *) calloc(length, sizeof(char));

if (matv->def == NULL)
{
log_fatal(ALLOCATION_FAILED_ERROR);
return false;
}

strlcpy(matv->def, value, length);

/* 6. index oid array */

return errors == 0;
}


/*
* getIndexArray loops over the SQL result for the index array query and
* allocates an array of tables then populates it with the query result.
Expand Down
Loading