Re: Subscription tests fail under CLOBBER_CACHE_ALWAYS

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: Subscription tests fail under CLOBBER_CACHE_ALWAYS
Дата
Msg-id 162691.1621630908@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: Subscription tests fail under CLOBBER_CACHE_ALWAYS  (Amit Langote <amitlangote09@gmail.com>)
Ответы Re: Subscription tests fail under CLOBBER_CACHE_ALWAYS  (Amit Langote <amitlangote09@gmail.com>)
Список pgsql-hackers
Amit Langote <amitlangote09@gmail.com> writes:
> IMHO, it would be better to keep the lowest-level
> apply_handle_XXX_internal() out of this, because presumably we're only
> cleaning up the mess in higher-level callers.  Somewhat related, one
> of the intentions behind a04daa97a43, which removed
> es_result_relation_info in favor of passing the ResultRelInfo
> explicitly to the executor's lower-level functions, was to avoid bugs
> caused by failing to set/reset that global field correctly in
> higher-level callers.

Yeah, that's a fair point, and after some reflection I think that
repeatedly changing the "active" field of the struct is exactly
what was bothering me about the v2 patch.  So in the attached v3,
I went back to passing that as an explicit argument.  The state
struct now has no fields that need to change after first being set.

I did notice that we could remove some other random arguments
by adding the LogicalRepRelMapEntry* to the state struct,
so this also does that.

            regards, tom lane

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 452e5f3df7..6a30662f37 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -178,6 +178,18 @@ static HTAB *xidhash = NULL;
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;

+typedef struct ApplyExecutionData
+{
+    EState       *estate;            /* executor state, used to track resources */
+
+    LogicalRepRelMapEntry *targetRel;    /* replication target rel */
+    ResultRelInfo *targetRelInfo;    /* ResultRelInfo for same */
+
+    /* These fields are used when the target relation is partitioned: */
+    ModifyTableState *mtstate;    /* dummy ModifyTable state */
+    PartitionTupleRouting *proute;    /* partition routing info */
+} ApplyExecutionData;
+
 typedef struct SubXactInfo
 {
     TransactionId xid;            /* XID of the subxact */
@@ -226,24 +238,23 @@ static void apply_dispatch(StringInfo s);

 static void apply_handle_commit_internal(StringInfo s,
                                          LogicalRepCommitData *commit_data);
-static void apply_handle_insert_internal(ResultRelInfo *relinfo,
-                                         EState *estate, TupleTableSlot *remoteslot);
-static void apply_handle_update_internal(ResultRelInfo *relinfo,
-                                         EState *estate, TupleTableSlot *remoteslot,
-                                         LogicalRepTupleData *newtup,
-                                         LogicalRepRelMapEntry *relmapentry);
-static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
+static void apply_handle_insert_internal(ApplyExecutionData *edata,
+                                         ResultRelInfo *relinfo,
+                                         TupleTableSlot *remoteslot);
+static void apply_handle_update_internal(ApplyExecutionData *edata,
+                                         ResultRelInfo *relinfo,
                                          TupleTableSlot *remoteslot,
-                                         LogicalRepRelation *remoterel);
+                                         LogicalRepTupleData *newtup);
+static void apply_handle_delete_internal(ApplyExecutionData *edata,
+                                         ResultRelInfo *relinfo,
+                                         TupleTableSlot *remoteslot);
 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
                                     LogicalRepRelation *remoterel,
                                     TupleTableSlot *remoteslot,
                                     TupleTableSlot **localslot);
-static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
-                                       EState *estate,
+static void apply_handle_tuple_routing(ApplyExecutionData *edata,
                                        TupleTableSlot *remoteslot,
                                        LogicalRepTupleData *newtup,
-                                       LogicalRepRelMapEntry *relmapentry,
                                        CmdType operation);

 /*
@@ -336,18 +347,17 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)

 /*
  * Executor state preparation for evaluation of constraint expressions,
- * indexes and triggers.
+ * indexes and triggers for the specified relation.
  *
- * resultRelInfo is a ResultRelInfo for the relation to be passed to the
- * executor routines.  The caller must open and close any indexes to be
- * updated independently of the relation registered here.
+ * Note that the caller must open and close any indexes to be updated.
  */
-static EState *
-create_estate_for_relation(LogicalRepRelMapEntry *rel,
-                           ResultRelInfo **resultRelInfo)
+static ApplyExecutionData *
+create_edata_for_relation(LogicalRepRelMapEntry *rel)
 {
+    ApplyExecutionData *edata;
     EState       *estate;
     RangeTblEntry *rte;
+    ResultRelInfo *resultRelInfo;

     /*
      * Input functions may need an active snapshot, as may AFTER triggers
@@ -356,7 +366,10 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel,
      */
     PushActiveSnapshot(GetTransactionSnapshot());

-    estate = CreateExecutorState();
+    edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
+    edata->targetRel = rel;
+
+    edata->estate = estate = CreateExecutorState();

     rte = makeNode(RangeTblEntry);
     rte->rtekind = RTE_RELATION;
@@ -365,13 +378,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel,
     rte->rellockmode = AccessShareLock;
     ExecInitRangeTable(estate, list_make1(rte));

-    *resultRelInfo = makeNode(ResultRelInfo);
+    edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);

     /*
      * Use Relation opened by logicalrep_rel_open() instead of opening it
      * again.
      */
-    InitResultRelInfo(*resultRelInfo, rel->localrel, 1, NULL, 0);
+    InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);

     /*
      * We put the ResultRelInfo in the es_opened_result_relations list, even
@@ -384,29 +397,39 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel,
      * an apply operation being responsible for that.
      */
     estate->es_opened_result_relations =
-        lappend(estate->es_opened_result_relations, *resultRelInfo);
+        lappend(estate->es_opened_result_relations, resultRelInfo);

     estate->es_output_cid = GetCurrentCommandId(true);

     /* Prepare to catch AFTER triggers. */
     AfterTriggerBeginQuery();

-    return estate;
+    /* other fields of edata remain NULL for now */
+
+    return edata;
 }

 /*
  * Finish any operations related to the executor state created by
- * create_estate_for_relation().
+ * create_edata_for_relation().
  */
 static void
-finish_estate(EState *estate)
+finish_edata(ApplyExecutionData *edata)
 {
+    EState       *estate = edata->estate;
+
     /* Handle any queued AFTER triggers. */
     AfterTriggerEndQuery(estate);

+    /* Shut down tuple routing, if any was done. */
+    if (edata->proute)
+        ExecCleanupTupleRouting(edata->mtstate, edata->proute);
+
     /* Cleanup. */
     ExecResetTupleTable(estate->es_tupleTable, false);
     FreeExecutorState(estate);
+    pfree(edata);
+
     PopActiveSnapshot();
 }

@@ -1189,10 +1212,10 @@ GetRelationIdentityOrPK(Relation rel)
 static void
 apply_handle_insert(StringInfo s)
 {
-    ResultRelInfo *resultRelInfo;
     LogicalRepRelMapEntry *rel;
     LogicalRepTupleData newtup;
     LogicalRepRelId relid;
+    ApplyExecutionData *edata;
     EState       *estate;
     TupleTableSlot *remoteslot;
     MemoryContext oldctx;
@@ -1215,7 +1238,8 @@ apply_handle_insert(StringInfo s)
     }

     /* Initialize the executor state. */
-    estate = create_estate_for_relation(rel, &resultRelInfo);
+    edata = create_edata_for_relation(rel);
+    estate = edata->estate;
     remoteslot = ExecInitExtraTupleSlot(estate,
                                         RelationGetDescr(rel->localrel),
                                         &TTSOpsVirtual);
@@ -1228,24 +1252,32 @@ apply_handle_insert(StringInfo s)

     /* For a partitioned table, insert the tuple into a partition. */
     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-        apply_handle_tuple_routing(resultRelInfo, estate,
-                                   remoteslot, NULL, rel, CMD_INSERT);
+        apply_handle_tuple_routing(edata,
+                                   remoteslot, NULL, CMD_INSERT);
     else
-        apply_handle_insert_internal(resultRelInfo, estate,
+        apply_handle_insert_internal(edata, edata->targetRelInfo,
                                      remoteslot);

-    finish_estate(estate);
+    finish_edata(edata);

     logicalrep_rel_close(rel, NoLock);

     CommandCounterIncrement();
 }

-/* Workhorse for apply_handle_insert() */
+/*
+ * Workhorse for apply_handle_insert()
+ * relinfo is for the relation we're actually inserting into
+ * (could be a child partition of edata->targetRelInfo)
+ */
 static void
-apply_handle_insert_internal(ResultRelInfo *relinfo,
-                             EState *estate, TupleTableSlot *remoteslot)
+apply_handle_insert_internal(ApplyExecutionData *edata,
+                             ResultRelInfo *relinfo,
+                             TupleTableSlot *remoteslot)
 {
+    EState       *estate = edata->estate;
+
+    /* We must open indexes here. */
     ExecOpenIndices(relinfo, false);

     /* Do the insert. */
@@ -1296,9 +1328,9 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
 static void
 apply_handle_update(StringInfo s)
 {
-    ResultRelInfo *resultRelInfo;
     LogicalRepRelMapEntry *rel;
     LogicalRepRelId relid;
+    ApplyExecutionData *edata;
     EState       *estate;
     LogicalRepTupleData oldtup;
     LogicalRepTupleData newtup;
@@ -1329,7 +1361,8 @@ apply_handle_update(StringInfo s)
     check_relation_updatable(rel);

     /* Initialize the executor state. */
-    estate = create_estate_for_relation(rel, &resultRelInfo);
+    edata = create_edata_for_relation(rel);
+    estate = edata->estate;
     remoteslot = ExecInitExtraTupleSlot(estate,
                                         RelationGetDescr(rel->localrel),
                                         &TTSOpsVirtual);
@@ -1369,26 +1402,32 @@ apply_handle_update(StringInfo s)

     /* For a partitioned table, apply update to correct partition. */
     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-        apply_handle_tuple_routing(resultRelInfo, estate,
-                                   remoteslot, &newtup, rel, CMD_UPDATE);
+        apply_handle_tuple_routing(edata,
+                                   remoteslot, &newtup, CMD_UPDATE);
     else
-        apply_handle_update_internal(resultRelInfo, estate,
-                                     remoteslot, &newtup, rel);
+        apply_handle_update_internal(edata, edata->targetRelInfo,
+                                     remoteslot, &newtup);

-    finish_estate(estate);
+    finish_edata(edata);

     logicalrep_rel_close(rel, NoLock);

     CommandCounterIncrement();
 }

-/* Workhorse for apply_handle_update() */
+/*
+ * Workhorse for apply_handle_update()
+ * relinfo is for the relation we're actually updating in
+ * (could be a child partition of edata->targetRelInfo)
+ */
 static void
-apply_handle_update_internal(ResultRelInfo *relinfo,
-                             EState *estate, TupleTableSlot *remoteslot,
-                             LogicalRepTupleData *newtup,
-                             LogicalRepRelMapEntry *relmapentry)
+apply_handle_update_internal(ApplyExecutionData *edata,
+                             ResultRelInfo *relinfo,
+                             TupleTableSlot *remoteslot,
+                             LogicalRepTupleData *newtup)
 {
+    EState       *estate = edata->estate;
+    LogicalRepRelMapEntry *relmapentry = edata->targetRel;
     Relation    localrel = relinfo->ri_RelationDesc;
     EPQState    epqstate;
     TupleTableSlot *localslot;
@@ -1447,10 +1486,10 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
 static void
 apply_handle_delete(StringInfo s)
 {
-    ResultRelInfo *resultRelInfo;
     LogicalRepRelMapEntry *rel;
     LogicalRepTupleData oldtup;
     LogicalRepRelId relid;
+    ApplyExecutionData *edata;
     EState       *estate;
     TupleTableSlot *remoteslot;
     MemoryContext oldctx;
@@ -1476,7 +1515,8 @@ apply_handle_delete(StringInfo s)
     check_relation_updatable(rel);

     /* Initialize the executor state. */
-    estate = create_estate_for_relation(rel, &resultRelInfo);
+    edata = create_edata_for_relation(rel);
+    estate = edata->estate;
     remoteslot = ExecInitExtraTupleSlot(estate,
                                         RelationGetDescr(rel->localrel),
                                         &TTSOpsVirtual);
@@ -1488,26 +1528,32 @@ apply_handle_delete(StringInfo s)

     /* For a partitioned table, apply delete to correct partition. */
     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-        apply_handle_tuple_routing(resultRelInfo, estate,
-                                   remoteslot, NULL, rel, CMD_DELETE);
+        apply_handle_tuple_routing(edata,
+                                   remoteslot, NULL, CMD_DELETE);
     else
-        apply_handle_delete_internal(resultRelInfo, estate,
-                                     remoteslot, &rel->remoterel);
+        apply_handle_delete_internal(edata, edata->targetRelInfo,
+                                     remoteslot);

-    finish_estate(estate);
+    finish_edata(edata);

     logicalrep_rel_close(rel, NoLock);

     CommandCounterIncrement();
 }

-/* Workhorse for apply_handle_delete() */
+/*
+ * Workhorse for apply_handle_delete()
+ * relinfo is for the relation we're actually deleting from
+ * (could be a child partition of edata->targetRelInfo)
+ */
 static void
-apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
-                             TupleTableSlot *remoteslot,
-                             LogicalRepRelation *remoterel)
+apply_handle_delete_internal(ApplyExecutionData *edata,
+                             ResultRelInfo *relinfo,
+                             TupleTableSlot *remoteslot)
 {
+    EState       *estate = edata->estate;
     Relation    localrel = relinfo->ri_RelationDesc;
+    LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
     EPQState    epqstate;
     TupleTableSlot *localslot;
     bool        found;
@@ -1577,16 +1623,17 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
  * This handles insert, update, delete on a partitioned table.
  */
 static void
-apply_handle_tuple_routing(ResultRelInfo *relinfo,
-                           EState *estate,
+apply_handle_tuple_routing(ApplyExecutionData *edata,
                            TupleTableSlot *remoteslot,
                            LogicalRepTupleData *newtup,
-                           LogicalRepRelMapEntry *relmapentry,
                            CmdType operation)
 {
+    EState       *estate = edata->estate;
+    LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+    ResultRelInfo *relinfo = edata->targetRelInfo;
     Relation    parentrel = relinfo->ri_RelationDesc;
-    ModifyTableState *mtstate = NULL;
-    PartitionTupleRouting *proute = NULL;
+    ModifyTableState *mtstate;
+    PartitionTupleRouting *proute;
     ResultRelInfo *partrelinfo;
     Relation    partrel;
     TupleTableSlot *remoteslot_part;
@@ -1594,12 +1641,14 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
     MemoryContext oldctx;

     /* ModifyTableState is needed for ExecFindPartition(). */
-    mtstate = makeNode(ModifyTableState);
+    edata->mtstate = mtstate = makeNode(ModifyTableState);
     mtstate->ps.plan = NULL;
     mtstate->ps.state = estate;
     mtstate->operation = operation;
     mtstate->resultRelInfo = relinfo;
-    proute = ExecSetupPartitionTupleRouting(estate, parentrel);
+
+    /* ... as is PartitionTupleRouting. */
+    edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);

     /*
      * Find the partition to which the "search tuple" belongs.
@@ -1633,14 +1682,13 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
     switch (operation)
     {
         case CMD_INSERT:
-            apply_handle_insert_internal(partrelinfo, estate,
+            apply_handle_insert_internal(edata, partrelinfo,
                                          remoteslot_part);
             break;

         case CMD_DELETE:
-            apply_handle_delete_internal(partrelinfo, estate,
-                                         remoteslot_part,
-                                         &relmapentry->remoterel);
+            apply_handle_delete_internal(edata, partrelinfo,
+                                         remoteslot_part);
             break;

         case CMD_UPDATE:
@@ -1752,9 +1800,8 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
                     Assert(partrelinfo_new != partrelinfo);

                     /* DELETE old tuple found in the old partition. */
-                    apply_handle_delete_internal(partrelinfo, estate,
-                                                 localslot,
-                                                 &relmapentry->remoterel);
+                    apply_handle_delete_internal(edata, partrelinfo,
+                                                 localslot);

                     /* INSERT new tuple into the new partition. */

@@ -1782,7 +1829,7 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
                         slot_getallattrs(remoteslot);
                     }
                     MemoryContextSwitchTo(oldctx);
-                    apply_handle_insert_internal(partrelinfo_new, estate,
+                    apply_handle_insert_internal(edata, partrelinfo_new,
                                                  remoteslot_part);
                 }
             }
@@ -1792,8 +1839,6 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
             elog(ERROR, "unrecognized CmdType: %d", (int) operation);
             break;
     }
-
-    ExecCleanupTupleRouting(mtstate, proute);
 }

 /*

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: Re: Move pg_attribute.attcompression to earlier in struct for reduced size?
Следующее
От: Andres Freund
Дата:
Сообщение: Re: Move pg_attribute.attcompression to earlier in struct for reduced size?