RE: Perform streaming logical transactions by background workers and parallel apply
От | houzj.fnst@fujitsu.com |
---|---|
Тема | RE: Perform streaming logical transactions by background workers and parallel apply |
Дата | |
Msg-id | OS0PR01MB57160B0C0FDDCED638639696942B9@OS0PR01MB5716.jpnprd01.prod.outlook.com обсуждение исходный текст |
Ответ на | Re: Perform streaming logical transactions by background workers and parallel apply (Peter Smith <smithpb2250@gmail.com>) |
Ответы |
RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply |
Список | pgsql-hackers |
On Tuesday, October 18, 2022 10:36 AM Peter Smith <smithpb2250@gmail.com> wrote: > Hi, here are my review comments for patch v38-0001. Thanks for your comments. > ====== > > .../replication/logical/applyparallelworker.c > > 1. parallel_apply_start_worker > > + /* Try to get a free parallel apply worker. */ foreach(lc, > + ParallelApplyWorkersList) { ParallelApplyWorkerInfo *tmp_winfo; > + > + tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc); > + > + if (!tmp_winfo->in_use) > + { > + /* Found a worker that has not been assigned a transaction. */ winfo > + = tmp_winfo; break; } } > > The "Found a worker..." comment seems redundant because it's already > clear from the prior comment and the 'in_use' member what this code is > doing. Removed. > ~~~ > > 2. LogicalParallelApplyLoop > > + void *data; > + Size len; > + int c; > + int rc; > + StringInfoData s; > + MemoryContext oldctx; > > Several of these vars (like 'c', 'rc', 's') can be declared deeper - > e.g. only in the scope where they are actually used. Changed. > ~~~ > > 3. > > + /* Ensure we are reading the data into our memory context. */ oldctx > + = MemoryContextSwitchTo(ApplyMessageContext); > > Doesn't something need to switch back to this 'oldctx' prior to > breaking out of the for(;;) loop? > > ~~~ > > 4. > > + apply_dispatch(&s); > + > + MemoryContextReset(ApplyMessageContext); > > Isn't this broken now? Since you've removed the > MemoryContextSwitchTo(oldctx), so next iteration will switch to > ApplyMessageContext again which will overwrite and lose knowledge of > the original 'oldctx' (??) Sorry for the miss, fixed. > ~~ > > 5. > > Maybe this is a silly idea, I'm not sure. Because this is an infinite > loop, then instead of the multiple calls to > MemoryContextReset(ApplyMessageContext) maybe there can be just a > single call to it immediately before you switch to that context in the > first place. The effect will be the same, won't it? > > e.g. > + /* Ensure we are reading the data into our memory context. */ > + MemoryContextReset(ApplyMessageContext); <=== THIS oldctx = > + MemoryContextSwitchTo(ApplyMessageContext); In SHM_MQ_WOULD_BLOCK branch, we would invoke WaitLatch, so I feel we'd better reset the memory context before waiting to avoid keeping no longer useful memory context for more time (although it doesn’t matter too much in practice). So, I didn't change this for now. > ~~~ > > 6. > > The code logic keeps flip-flopping for several versions. I think if > you are going to check all the return types of shm_mq_receive then > using a switch(shmq_res) might be a better way than having multiple > if/else with some Asserts. Changed. > ====== > > src/backend/replication/logical/launcher.c > > 7. logicalrep_worker_launch > > Previously I'd suggested ([1] #12) that the process name should change > for consistency, and AFAIK Amit also said [2] that would be OK, but > this change is still not done in the current patch. Changed. > ====== > > src/backend/replication/logical/worker.c > > 8. should_apply_changes_for_rel > > * Should this worker apply changes for given relation. > * > * This is mainly needed for initial relation data sync as that runs > in > * separate worker process running in parallel and we need some way to > skip > * changes coming to the main apply worker during the sync of a table. > > This existing comment refers to the "main apply worker". IMO it should > say "leader apply worker" to keep all the terminology consistent. Changed. > ~~~ > > 9. apply_handle_stream_start > > + * > + * XXX We can avoid sending pairs of the START/STOP messages to the > + parallel > + * worker because unlike apply worker it will process only one > + transaction at a > + * time. However, it is not clear whether that is worth the effort > + because it > + * is sent after logical_decoding_work_mem changes. > */ > static void > apply_handle_stream_start(StringInfo s) > > As previously mentioned ([1] #13b) it's not obvious to me what that > last sentence means. e.g. "because it is sent" - what is "it"? Changed as Amit's suggestion in [1]. > ~~~ > > 11. > > + /* > + * Assign the appropriate streaming flag according to the 'streaming' > + mode > + * and the publisher's ability to support that mode. > + */ > > Maybe "streaming flag" -> "streaming string/flag". (sorry, it was my > bad suggestion last time) Improved. Attach the version patch set. [1] - https://www.postgresql.org/message-id/CAA4eK1%2BqwbD419%3DKgRTLRVj5zQhbM%3Dbfi-cvWG3HkORktb4-YA%40mail.gmail.com Best Regards Hou Zhijie
Вложения
- v39-0001-Perform-streaming-logical-transactions-by-parall.patch
- v39-0002-Test-streaming-parallel-option-in-tap-test.patch
- v39-0003-Add-some-checks-before-using-parallel-apply-work.patch
- v39-0004-Retrict-parallel-for-partitioned-table.patch
- v39-0005-Retry-to-apply-streaming-xact-only-in-apply-work.patch
- v39-0006-Add-a-main_worker_pid-to-pg_stat_subscription.patch
В списке pgsql-hackers по дате отправления: