Re: Subscription tests fail under CLOBBER_CACHE_ALWAYS
От | Tom Lane |
---|---|
Тема | Re: Subscription tests fail under CLOBBER_CACHE_ALWAYS |
Дата | |
Msg-id | 162691.1621630908@sss.pgh.pa.us обсуждение исходный текст |
Ответ на | Re: Subscription tests fail under CLOBBER_CACHE_ALWAYS (Amit Langote <amitlangote09@gmail.com>) |
Ответы |
Re: Subscription tests fail under CLOBBER_CACHE_ALWAYS
(Amit Langote <amitlangote09@gmail.com>)
|
Список | pgsql-hackers |
Amit Langote <amitlangote09@gmail.com> writes: > IMHO, it would be better to keep the lowest-level > apply_handle_XXX_internal() out of this, because presumably we're only > cleaning up the mess in higher-level callers. Somewhat related, one > of the intentions behind a04daa97a43, which removed > es_result_relation_info in favor of passing the ResultRelInfo > explicitly to the executor's lower-level functions, was to avoid bugs > caused by failing to set/reset that global field correctly in > higher-level callers. Yeah, that's a fair point, and after some reflection I think that repeatedly changing the "active" field of the struct is exactly what was bothering me about the v2 patch. So in the attached v3, I went back to passing that as an explicit argument. The state struct now has no fields that need to change after first being set. I did notice that we could remove some other random arguments by adding the LogicalRepRelMapEntry* to the state struct, so this also does that. regards, tom lane diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 452e5f3df7..6a30662f37 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -178,6 +178,18 @@ static HTAB *xidhash = NULL; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +typedef struct ApplyExecutionData +{ + EState *estate; /* executor state, used to track resources */ + + LogicalRepRelMapEntry *targetRel; /* replication target rel */ + ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */ + + /* These fields are used when the target relation is partitioned: */ + ModifyTableState *mtstate; /* dummy ModifyTable state */ + PartitionTupleRouting *proute; /* partition routing info */ +} ApplyExecutionData; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -226,24 +238,23 @@ static void apply_dispatch(StringInfo s); static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data); -static void apply_handle_insert_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot); -static void apply_handle_update_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot, - LogicalRepTupleData *newtup, - LogicalRepRelMapEntry *relmapentry); -static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, +static void apply_handle_insert_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot); +static void apply_handle_update_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, TupleTableSlot *remoteslot, - LogicalRepRelation *remoterel); + LogicalRepTupleData *newtup); +static void apply_handle_delete_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot); static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot); -static void apply_handle_tuple_routing(ResultRelInfo *relinfo, - EState *estate, +static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, - LogicalRepRelMapEntry *relmapentry, CmdType operation); /* @@ -336,18 +347,17 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) /* * Executor state preparation for evaluation of constraint expressions, - * indexes and triggers. + * indexes and triggers for the specified relation. * - * resultRelInfo is a ResultRelInfo for the relation to be passed to the - * executor routines. The caller must open and close any indexes to be - * updated independently of the relation registered here. + * Note that the caller must open and close any indexes to be updated. */ -static EState * -create_estate_for_relation(LogicalRepRelMapEntry *rel, - ResultRelInfo **resultRelInfo) +static ApplyExecutionData * +create_edata_for_relation(LogicalRepRelMapEntry *rel) { + ApplyExecutionData *edata; EState *estate; RangeTblEntry *rte; + ResultRelInfo *resultRelInfo; /* * Input functions may need an active snapshot, as may AFTER triggers @@ -356,7 +366,10 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel, */ PushActiveSnapshot(GetTransactionSnapshot()); - estate = CreateExecutorState(); + edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData)); + edata->targetRel = rel; + + edata->estate = estate = CreateExecutorState(); rte = makeNode(RangeTblEntry); rte->rtekind = RTE_RELATION; @@ -365,13 +378,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel, rte->rellockmode = AccessShareLock; ExecInitRangeTable(estate, list_make1(rte)); - *resultRelInfo = makeNode(ResultRelInfo); + edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo); /* * Use Relation opened by logicalrep_rel_open() instead of opening it * again. */ - InitResultRelInfo(*resultRelInfo, rel->localrel, 1, NULL, 0); + InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); /* * We put the ResultRelInfo in the es_opened_result_relations list, even @@ -384,29 +397,39 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel, * an apply operation being responsible for that. */ estate->es_opened_result_relations = - lappend(estate->es_opened_result_relations, *resultRelInfo); + lappend(estate->es_opened_result_relations, resultRelInfo); estate->es_output_cid = GetCurrentCommandId(true); /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); - return estate; + /* other fields of edata remain NULL for now */ + + return edata; } /* * Finish any operations related to the executor state created by - * create_estate_for_relation(). + * create_edata_for_relation(). */ static void -finish_estate(EState *estate) +finish_edata(ApplyExecutionData *edata) { + EState *estate = edata->estate; + /* Handle any queued AFTER triggers. */ AfterTriggerEndQuery(estate); + /* Shut down tuple routing, if any was done. */ + if (edata->proute) + ExecCleanupTupleRouting(edata->mtstate, edata->proute); + /* Cleanup. */ ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); + pfree(edata); + PopActiveSnapshot(); } @@ -1189,10 +1212,10 @@ GetRelationIdentityOrPK(Relation rel) static void apply_handle_insert(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepTupleData newtup; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; @@ -1215,7 +1238,8 @@ apply_handle_insert(StringInfo s) } /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1228,24 +1252,32 @@ apply_handle_insert(StringInfo s) /* For a partitioned table, insert the tuple into a partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, - remoteslot, NULL, rel, CMD_INSERT); + apply_handle_tuple_routing(edata, + remoteslot, NULL, CMD_INSERT); else - apply_handle_insert_internal(resultRelInfo, estate, + apply_handle_insert_internal(edata, edata->targetRelInfo, remoteslot); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); } -/* Workhorse for apply_handle_insert() */ +/* + * Workhorse for apply_handle_insert() + * relinfo is for the relation we're actually inserting into + * (could be a child partition of edata->targetRelInfo) + */ static void -apply_handle_insert_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot) +apply_handle_insert_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot) { + EState *estate = edata->estate; + + /* We must open indexes here. */ ExecOpenIndices(relinfo, false); /* Do the insert. */ @@ -1296,9 +1328,9 @@ check_relation_updatable(LogicalRepRelMapEntry *rel) static void apply_handle_update(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; LogicalRepTupleData oldtup; LogicalRepTupleData newtup; @@ -1329,7 +1361,8 @@ apply_handle_update(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1369,26 +1402,32 @@ apply_handle_update(StringInfo s) /* For a partitioned table, apply update to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, - remoteslot, &newtup, rel, CMD_UPDATE); + apply_handle_tuple_routing(edata, + remoteslot, &newtup, CMD_UPDATE); else - apply_handle_update_internal(resultRelInfo, estate, - remoteslot, &newtup, rel); + apply_handle_update_internal(edata, edata->targetRelInfo, + remoteslot, &newtup); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); } -/* Workhorse for apply_handle_update() */ +/* + * Workhorse for apply_handle_update() + * relinfo is for the relation we're actually updating in + * (could be a child partition of edata->targetRelInfo) + */ static void -apply_handle_update_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot, - LogicalRepTupleData *newtup, - LogicalRepRelMapEntry *relmapentry) +apply_handle_update_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup) { + EState *estate = edata->estate; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; TupleTableSlot *localslot; @@ -1447,10 +1486,10 @@ apply_handle_update_internal(ResultRelInfo *relinfo, static void apply_handle_delete(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepTupleData oldtup; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; @@ -1476,7 +1515,8 @@ apply_handle_delete(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1488,26 +1528,32 @@ apply_handle_delete(StringInfo s) /* For a partitioned table, apply delete to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, - remoteslot, NULL, rel, CMD_DELETE); + apply_handle_tuple_routing(edata, + remoteslot, NULL, CMD_DELETE); else - apply_handle_delete_internal(resultRelInfo, estate, - remoteslot, &rel->remoterel); + apply_handle_delete_internal(edata, edata->targetRelInfo, + remoteslot); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); CommandCounterIncrement(); } -/* Workhorse for apply_handle_delete() */ +/* + * Workhorse for apply_handle_delete() + * relinfo is for the relation we're actually deleting from + * (could be a child partition of edata->targetRelInfo) + */ static void -apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, - TupleTableSlot *remoteslot, - LogicalRepRelation *remoterel) +apply_handle_delete_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot) { + EState *estate = edata->estate; Relation localrel = relinfo->ri_RelationDesc; + LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; bool found; @@ -1577,16 +1623,17 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel, * This handles insert, update, delete on a partitioned table. */ static void -apply_handle_tuple_routing(ResultRelInfo *relinfo, - EState *estate, +apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, - LogicalRepRelMapEntry *relmapentry, CmdType operation) { + EState *estate = edata->estate; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; + ResultRelInfo *relinfo = edata->targetRelInfo; Relation parentrel = relinfo->ri_RelationDesc; - ModifyTableState *mtstate = NULL; - PartitionTupleRouting *proute = NULL; + ModifyTableState *mtstate; + PartitionTupleRouting *proute; ResultRelInfo *partrelinfo; Relation partrel; TupleTableSlot *remoteslot_part; @@ -1594,12 +1641,14 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, MemoryContext oldctx; /* ModifyTableState is needed for ExecFindPartition(). */ - mtstate = makeNode(ModifyTableState); + edata->mtstate = mtstate = makeNode(ModifyTableState); mtstate->ps.plan = NULL; mtstate->ps.state = estate; mtstate->operation = operation; mtstate->resultRelInfo = relinfo; - proute = ExecSetupPartitionTupleRouting(estate, parentrel); + + /* ... as is PartitionTupleRouting. */ + edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel); /* * Find the partition to which the "search tuple" belongs. @@ -1633,14 +1682,13 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, switch (operation) { case CMD_INSERT: - apply_handle_insert_internal(partrelinfo, estate, + apply_handle_insert_internal(edata, partrelinfo, remoteslot_part); break; case CMD_DELETE: - apply_handle_delete_internal(partrelinfo, estate, - remoteslot_part, - &relmapentry->remoterel); + apply_handle_delete_internal(edata, partrelinfo, + remoteslot_part); break; case CMD_UPDATE: @@ -1752,9 +1800,8 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, Assert(partrelinfo_new != partrelinfo); /* DELETE old tuple found in the old partition. */ - apply_handle_delete_internal(partrelinfo, estate, - localslot, - &relmapentry->remoterel); + apply_handle_delete_internal(edata, partrelinfo, + localslot); /* INSERT new tuple into the new partition. */ @@ -1782,7 +1829,7 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, slot_getallattrs(remoteslot); } MemoryContextSwitchTo(oldctx); - apply_handle_insert_internal(partrelinfo_new, estate, + apply_handle_insert_internal(edata, partrelinfo_new, remoteslot_part); } } @@ -1792,8 +1839,6 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, elog(ERROR, "unrecognized CmdType: %d", (int) operation); break; } - - ExecCleanupTupleRouting(mtstate, proute); } /*
В списке pgsql-hackers по дате отправления:
Предыдущее
От: Andres FreundДата:
Сообщение: Re: Move pg_attribute.attcompression to earlier in struct for reduced size?
Следующее
От: Andres FreundДата:
Сообщение: Re: Move pg_attribute.attcompression to earlier in struct for reduced size?