Re: Logical Replication WIP

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: Logical Replication WIP
Дата
Msg-id 20161104130048.a52zpwme4jywnw62@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: Logical Replication WIP  (Petr Jelinek <petr@2ndquadrant.com>)
Ответы Re: Logical Replication WIP  (Peter Eisentraut <peter.eisentraut@2ndquadrant.com>)
Re: Logical Replication WIP  (Petr Jelinek <petr@2ndquadrant.com>)
Список pgsql-hackers
Hi,

+ <sect1 id="catalog-pg-publication-rel">
+  <title><structname>pg_publication_rel</structname></title>
+
+  <indexterm zone="catalog-pg-publication-rel">
+   <primary>pg_publication_rel</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_publication_rel</structname> catalog contains
+   mapping between tables and publications in the database. This is many to
+   many mapping.
+  </para>

I wonder if we shouldn't abstract this a bit away from relations to
allow other objects to be exported to. Could structure it a bit more
like pg_depend.


+ALTER PUBLICATION <replaceable class="PARAMETER">name</replaceable> [ [ WITH ] <replaceable
class="PARAMETER">option</replaceable>[ ... ] ]
 
+
+<phrase>where <replaceable class="PARAMETER">option</replaceable> can be:</phrase>
+
+      PuBLISH_INSERT | NOPuBLISH_INSERT
+    | PuBLISH_UPDATE | NOPuBLISH_UPDATE
+    | PuBLISH_DELETE | NOPuBLISH_DELETE

That's odd casing.


+   <varlistentry>
+    <term><literal>PuBLISH_INSERT</literal></term>
+    <term><literal>NOPuBLISH_INSERT</literal></term>
+    <term><literal>PuBLISH_UPDATE</literal></term>
+    <term><literal>NOPuBLISH_UPDATE</literal></term>
+    <term><literal>PuBLISH_DELETE</literal></term>
+    <term><literal>NOPuBLISH_DELETE</literal></term>

More odd casing.

+   <varlistentry>
+    <term><literal>FOR TABLE</literal></term>
+    <listitem>
+     <para>
+      Specifies optional list of tables to add to the publication.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>FOR TABLE ALL IN SCHEMA</literal></term>
+    <listitem>
+     <para>
+      Specifies optional schema for which all logged tables will be added to
+      publication.
+     </para>
+    </listitem>
+   </varlistentry>

"FOR TABLE ALL IN SCHEMA" sounds weird.


+  <para>
+   This operation does not reserve any resources on the server. It only
+   defines grouping and filtering logic for future subscribers.
+  </para>

That's strictly speaking not true, maybe rephrase a bit?

+/*
+ * Check if relation can be in given publication and throws appropriate
+ * error if not.
+ */
+static void
+check_publication_add_relation(Relation targetrel)
+{
+    /* Must be table */
+    if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("only tables can be added to publication"),
+                 errdetail("%s is not a table",
+                           RelationGetRelationName(targetrel))));
+
+    /* Can't be system table */
+    if (IsCatalogRelation(targetrel))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("only user tables can be added to publication"),
+                 errdetail("%s is a system table",
+                           RelationGetRelationName(targetrel))));
+
+    /* UNLOGGED and TEMP relations cannot be part of publication. */
+    if (!RelationNeedsWAL(targetrel))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("UNLOGGED and TEMP relations cannot be replicated")));
+}

This probably means we need a check in the ALTER TABLE ... SET UNLOGGED
path.


+/*
+ * Returns if relation represented by oid and Form_pg_class entry
+ * is publishable.
+ *
+ * Does same checks as the above, but does not need relation to be opened
+ * and also does not throw errors.
+ */
+static bool
+is_publishable_class(Oid relid, Form_pg_class reltuple)
+{
+    return reltuple->relkind == RELKIND_RELATION &&
+        !IsCatalogClass(relid, reltuple) &&
+        reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
+        /* XXX needed to exclude information_schema tables */
+        relid >= FirstNormalObjectId;
+}

Shouldn't that be IsCatalogRelation() instead?


+CREATE VIEW pg_publication_tables AS
+    SELECT
+        P.pubname AS pubname,
+        N.nspname AS schemaname,
+        C.relname AS tablename
+    FROM pg_publication P, pg_class C
+         JOIN pg_namespace N ON (N.oid = C.relnamespace)
+    WHERE C.relkind = 'r'
+      AND C.oid IN (SELECT relid FROM pg_get_publication_tables(P.pubname));

That's going to be quite inefficient if you filter by table... Might be
better to do that via the underlying table.


+/*
+ * Create new publication.
+ * TODO ACL check
+ */

Hm?

+ObjectAddress
+CreatePublication(CreatePublicationStmt *stmt)
+{
+    check_replication_permissions();

+
+/*
+ * Drop publication by OID
+ */
+void
+DropPublicationById(Oid pubid)
+
+/*
+ * Remove relation from publication by mapping OID.
+ */
+void
+RemovePublicationRelById(Oid proid)
+{

Permission checks?

+}

Hm. Neither of these does dependency checking, wonder if that can be
argued to be problematic.


+/*
+ * Gather Relations based o provided by RangeVar list.
+ * The gathered tables are locked in ShareUpdateExclusiveLock mode.
+ */

s/o/on/.  Not sure if gather is the best name.

+static List *
+GatherTableList(List *tables)


+/*
+ * Close all relations in the list.
+ */
+static void
+CloseTables(List *rels)

Shouldn't that be CloseTableList() based on the preceding function's naming?

+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
+                     AlterPublicationStmt *stmt)
+{
+    ListCell       *lc;
+
+    Assert(!stmt || !stmt->for_all_tables);
+
+    foreach(lc, rels)
+    {
+        Relation    rel = (Relation) lfirst(lc);
+        ObjectAddress    obj;
+
+        obj = publication_add_relation(pubid, rel, if_not_exists);
+        if (stmt)
+            EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+                                             (Node *) stmt);
+    }
+}
+
+/*
+ * Remove listed tables to the publication.
+ */

s/to/from/

+static void
+PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
+{
+    ObjectAddress    obj;
+    ListCell       *lc;
+    Oid                prid;
+
+    foreach(lc, rels)
+    {
+        Relation    rel = (Relation) lfirst(lc);
+        Oid            relid = RelationGetRelid(rel);
+
+        prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
+                               ObjectIdGetDatum(pubid));
+        if (!OidIsValid(prid))
+        {
+            if (missing_ok)
+                continue;
+
+            ereport(ERROR,
+                    (errcode(ERRCODE_UNDEFINED_OBJECT),
+                     errmsg("relation \"%s\" is not part of the publication",
+                            RelationGetRelationName(rel))));
+        }
+
+        ObjectAddressSet(obj, PublicationRelRelationId, prid);
+        performDeletion(&obj, DROP_CASCADE, 0);
+    }
+}


/*
+ * Check if command can be executed with current replica identity.
+ */
+static void
+CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
+{
+    PublicationActions *pubactions;
+
+    /* We only need to do checks for UPDATE and DELETE. */
+    if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
+        return;
+
+    /* If relation has replica identity we are always good. */
+    if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+        OidIsValid(RelationGetReplicaIndex(rel)))
+        return;
+
+    /*
+     * This is either UPDATE OR DELETE and there is no replica identity.
+     *
+     * Check if the table publishes UPDATES or DELETES.
+     */
+    pubactions = GetRelationPublicationActions(rel);
+    if (pubactions->pubupdate || pubactions->pubdelete)

I think that leads to spurious errors. Consider a DELETE with a
publication that replicates updates but not deletes.

+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("cannot update table \"%s\" because it does not have replica identity and publishes updates",
+                        RelationGetRelationName(rel)),
+                 errhint("To enable updating the table, provide set REPLICA IDENTITY using ALTER TABLE.")));
+}

"provide set"


+publication_opt_item:
+            IDENT
+                {
+                    /*
+                     * We handle identifiers that aren't parser keywords with
+                     * the following special-case codes, to avoid bloating the
+                     * size of the main parser.
+                     */
+                    if (strcmp($1, "publish_insert") == 0)
+                        $$ = makeDefElem("publish_insert",
+                                         (Node *)makeInteger(TRUE), @1);
+                    else if (strcmp($1, "nopublish_insert") == 0)
+                        $$ = makeDefElem("publish_insert",
+                                         (Node *)makeInteger(FALSE), @1);
+                    else if (strcmp($1, "publish_update") == 0)
+                        $$ = makeDefElem("publish_update",
+                                         (Node *)makeInteger(TRUE), @1);
+                    else if (strcmp($1, "nopublish_update") == 0)
+                        $$ = makeDefElem("publish_update",
+                                         (Node *)makeInteger(FALSE), @1);
+                    else if (strcmp($1, "publish_delete") == 0)
+                        $$ = makeDefElem("publish_delete",
+                                         (Node *)makeInteger(TRUE), @1);
+                    else if (strcmp($1, "nopublish_delete") == 0)
+                        $$ = makeDefElem("publish_delete",
+                                         (Node *)makeInteger(FALSE), @1);
+                    else
+                        ereport(ERROR,
+                                (errcode(ERRCODE_SYNTAX_ERROR),
+                                 errmsg("unrecognized publication option \"%s\"", $1),
+                                     parser_errposition(@1)));
+                }
+        ;

I still would very much like to move this outside of gram.y and just use
IDENTs here. Like how COPY options are handled.


+/*
+ * Get publication actions for list of publication oids.
+ */
+struct PublicationActions *
+GetRelationPublicationActions(Relation relation)

API description and function name/parameters don't quite match.



+CATALOG(pg_publication,6104)
+{
+    NameData    pubname;            /* name of the publication */
+
+    /*
+     * indicates that this is special publication which should encompass
+     * all tables in the database (except for the unlogged and temp ones)
+     */
+    bool        puballtables;
+
+    /* true if inserts are published */
+    bool        pubinsert;
+
+    /* true if updates are published */
+    bool        pubupdate;
+
+    /* true if deletes are published */
+    bool        pubdelete;
+
+} FormData_pg_publication;

Shouldn't this have an owner?   I also wonder if we want an easier to
extend form of pubinsert/update/delete (say to add pubddl, pubtruncate,
pub ... without changing the schema).


+/* ----------------
+ *        pg_publication_rel definition.  cpp turns this into
+ *        typedef struct FormData_pg_publication_rel
+ *
+ * ----------------
+ */
+#define PublicationRelRelationId                6106
+
+CATALOG(pg_publication_rel,6106)
+{
+    Oid        prpubid;                /* Oid of the publication */
+    Oid        prrelid;                /* Oid of the relation */
+} FormData_pg_publication_rel;

To me it seems like a good idea to have objclassid/objsubid here.


Regards,

Andres



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Robert Haas
Дата:
Сообщение: Re: Gather Merge
Следующее
От: Robert Haas
Дата:
Сообщение: Re: Typo in comment in contrib/postgres_fdw/deparse.c