RE: Perform streaming logical transactions by background workers and parallel apply
От | houzj.fnst@fujitsu.com |
---|---|
Тема | RE: Perform streaming logical transactions by background workers and parallel apply |
Дата | |
Msg-id | OS0PR01MB57161DD88F755587E8FE2C3B947F9@OS0PR01MB5716.jpnprd01.prod.outlook.com обсуждение исходный текст |
Ответ на | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Ответы |
RE: Perform streaming logical transactions by background workers and parallel apply
|
Список | pgsql-hackers |
On Friday, September 2, 2022 2:10 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > > On Thu, Sep 1, 2022 at 4:53 PM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > Review of v27-0001*: Thanks for the comments. > ================ > 1. I feel the usage of in_remote_transaction and in_use flags is slightly complex. > IIUC, the patch uses in_use flag to ensure commit ordering by waiting for it to > become false before proceeding in transaction finish commands in leader > apply worker. If so, I think it is better to name it in_parallel_apply_xact and set it > to true only when we start applying xact in parallel apply worker and set it to > false when we finish the xact in parallel apply worker. It can be initialized to false > while setting up DSM. Also, accordingly change the function > parallel_apply_wait_for_free() to parallel_apply_wait_for_xact_finish and > parallel_apply_set_idle to parallel_apply_set_xact_finish. We can change the > name of the in_remote_transaction flag to in_use. Agreed. One thing I found when addressing this is that there could be a race condition if we want to set the flag in parallel apply worker: where the leader has already started waiting for the parallel apply worker to finish processing the transaction(set the in_parallel_apply_xact to false) while the child process has not yet processed the first STREAM_START and has not set the in_parallel_apply_xact to true. > Please explain about these flags in the struct where they are declared. > > 2. The worker_id in ParallelApplyWorkerShared struct could have wrong > information after the worker is reused from the pool. Because we could have > removed some other worker from the ParallelApplyWorkersList which will > make the value of worker_id wrong. For error/debug messages, we can > probably use LSN if available or can oid of subscription if required. I thought of > using xid as well but I think it is better to avoid that in messages as it can > wraparound. See, if the patch uses xid in other messages, it is better to either > use it along with LSN or try to use only LSN. > 3. > elog(ERROR, "[Parallel Apply Worker #%u] unexpected message \"%c\"", > + shared->worker_id, c); > > Also, I am not sure whether the above style (use of []) of messages is good. Did > you follow the usage from some other place? > 4. > apply_handle_stream_stop(StringInfo s) > { > ... > + if (apply_action == TA_APPLY_IN_PARALLEL_WORKER) { elog(DEBUG1, > + "[Parallel Apply Worker #%u] ended processing streaming chunk, " > + "waiting on shm_mq_receive", MyParallelShared->worker_id); > ... > > I don't understand the relevance of "waiting on shm_mq_receive" in the > above message because AFAICS, here we are not waiting on any receive > call. > > 5. I suggest you please go through all the ERROR/LOG/DEBUG messages in > the patch and try to improve them based on the above comments. I removed the worker_id and also removed and improved some DEBUG/ERROR messages which I think is not clear or we don't have similar message in existing code. > 6. > + * The dynamic shared memory segment will contain (1) a shm_mq that can be > used > + * to send errors (and other messages reported via elog/ereport) from the > + * parallel apply worker to leader apply worker (2) another shm_mq that can > be > + * used to send changes in the transaction from leader apply worker to parallel > + * apply worker > > Here, it would be better to switch (1) and (2). I feel it is better to > explain first about how the main apply information is exchanged among > workers. Exchanged. > 7. > + /* Try to get a free parallel apply worker. */ > + foreach(lc, ParallelApplyWorkersList) > + { > + ParallelApplyWorkerInfo *tmp_winfo; > + > + tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc); > + > + if (tmp_winfo->error_mq_handle == NULL) > + { > + /* > + * Release the worker information and try next one if the parallel > + * apply worker exited cleanly. > + */ > + ParallelApplyWorkersList = > foreach_delete_current(ParallelApplyWorkersList, lc); > + shm_mq_detach(tmp_winfo->mq_handle); > + dsm_detach(tmp_winfo->dsm_seg); > + pfree(tmp_winfo); > + > + continue; > + } > + > + if (!tmp_winfo->in_remote_transaction) > + { > + winfo = tmp_winfo; > + break; > + } > + } > > Can we write it as if ... else if? If so, then we don't need to > continue in the first loop. And, can we add some more comments to > explain these cases? Changed. Attach the new version patch set which addressed above comments and also fixed another problem while subscriber to a low version publisher. Best regards, Hou zj
Вложения
- v28-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
- v28-0001-Perform-streaming-logical-transactions-by-parall.patch
- v28-0002-Test-streaming-parallel-option-in-tap-test.patch
- v28-0003-Add-some-checks-before-using-parallel-apply-work.patch
- v28-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
В списке pgsql-hackers по дате отправления: