RE: Perform streaming logical transactions by background workers and parallel apply
От | wangw.fnst@fujitsu.com |
---|---|
Тема | RE: Perform streaming logical transactions by background workers and parallel apply |
Дата | |
Msg-id | OS3PR01MB6275F145878B4A44586C46CE9E499@OS3PR01MB6275.jpnprd01.prod.outlook.com обсуждение исходный текст |
Ответ на | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
Ответы |
RE: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply |
Список | pgsql-hackers |
On Thur, Sep 8, 2022 at 14:52 PM Amit Kapila <amit.kapila16@gmail.com> wrote: > On Mon, Sep 5, 2022 at 6:34 PM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > Attach the correct patch set this time. > > > > Few comments on v28-0001*: Thanks for your comments. > 1. > + /* Whether the worker is processing a transaction. */ > + bool in_use; > > I think this same comment applies to in_parallel_apply_xact flag as > well. How about: "Indicates whether the worker is available to be used > for parallel apply transaction?"? > > 2. > + /* > + * Set this flag in the leader instead of the parallel apply worker to > + * avoid the race condition where the leader has already started waiting > + * for the parallel apply worker to finish processing the transaction(set > + * the in_parallel_apply_xact to false) while the child process has not yet > + * processed the first STREAM_START and has not set the > + * in_parallel_apply_xact to true. > > I think part of this comment "(set the in_parallel_apply_xact to > false)" is not necessary. It will be clear without that. > > 3. > + /* Create entry for requested transaction. */ > + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_ENTER, &found); > + if (found) > + elog(ERROR, "hash table corrupted"); > ... > ... > + hash_search(ParallelApplyWorkersHash, &xid, HASH_REMOVE, NULL); > > It is better to have a similar elog for HASH_REMOVE case as well. We > normally seem to have such elog for HASH_REMOVE. > > 4. > * Parallel apply is not supported when subscribing to a publisher which > + * cannot provide the abort_time, abort_lsn and the column information > used > + * to verify the parallel apply safety. > > > In this comment, which column information are you referring to? > > 5. > + /* > + * Set in_parallel_apply_xact to true again as we only aborted the > + * subtransaction and the top transaction is still in progress. No > + * need to lock here because currently only the apply leader are > + * accessing this flag. > + */ > + winfo->shared->in_parallel_apply_xact = true; > > This theory sounds good to me but I think it is better to update/read > this flag under spinlock as the patch is doing at a few other places. > I think that will make the code easier to follow without worrying too > much about such special cases. There are a few asserts as well which > read this without lock, it would be better to change those as well. > > 6. > + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum > protocol version > + * with support for streaming large transactions using parallel apply > + * workers. Introduced in PG16. > > How about changing it to something like: > "LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum > protocol > version where we support applying large streaming transactions in > parallel. Introduced in PG16." > > 7. > + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; > + bool write_abort_lsn = (data->protocol_version >= > + LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM); > > /* > * The abort should happen outside streaming block, even for streamed > @@ -1856,7 +1859,8 @@ pgoutput_stream_abort(struct > LogicalDecodingContext *ctx, > Assert(rbtxn_is_streamed(toptxn)); > > OutputPluginPrepareWrite(ctx, true); > - logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); > + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn, abort_lsn, > + write_abort_lsn); > > I think we need to send additional information if the client has used > the parallel streaming option. Also, let's keep sending subxid as we > were doing previously and add additional parameters required. It may > be better to name write_abort_lsn as abort_info. > > 8. > + /* > + * Check whether the publisher sends abort_lsn and abort_time. > + * > + * Note that the paralle apply worker is only started when the publisher > + * sends abort_lsn and abort_time. > + */ > + if (am_parallel_apply_worker() || > + walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000) > + read_abort_lsn = true; > + > + logicalrep_read_stream_abort(s, &abort_data, read_abort_lsn); > > This check should match with the check for the write operation where > we are checking the protocol version as well. There is a typo as well > in the comments (/paralle/parallel). Improved as suggested. Attach the new patch set. Regards, Wang wei
Вложения
- v29-0001-Perform-streaming-logical-transactions-by-parall.patch
- v29-0002-Test-streaming-parallel-option-in-tap-test.patch
- v29-0003-Add-some-checks-before-using-parallel-apply-work.patch
- v29-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
- v29-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch
В списке pgsql-hackers по дате отправления: