Re: Time delayed LR (WAS Re: logical replication restrictions)
От | Peter Smith |
---|---|
Тема | Re: Time delayed LR (WAS Re: logical replication restrictions) |
Дата | |
Msg-id | CAHut+PsydEoqQaDv861qS_366hBovYAE8RRoDu3=+sBM=yo3Dg@mail.gmail.com обсуждение исходный текст |
Ответ на | RE: Time delayed LR (WAS Re: logical replication restrictions) ("Takamichi Osumi (Fujitsu)" <osumi.takamichi@fujitsu.com>) |
Ответы |
Re: Time delayed LR (WAS Re: logical replication restrictions)
(Peter Smith <smithpb2250@gmail.com>)
Re: Time delayed LR (WAS Re: logical replication restrictions) (Peter Smith <smithpb2250@gmail.com>) RE: Time delayed LR (WAS Re: logical replication restrictions) ("Takamichi Osumi (Fujitsu)" <osumi.takamichi@fujitsu.com>) |
Список | pgsql-hackers |
Here are my review comments for the latest patch v16-0001. (excluding the test code) ====== General 1. Since the value of min_apply_delay cannot be < 0, I was thinking probably it should have been declared everywhere in this patch as a uint64 instead of an int64, right? ====== Commit message 2. If the subscription sets min_apply_delay parameter, the logical replication worker will delay the transaction commit for min_apply_delay milliseconds. ~ IMO there should be another sentence before this just to say that a new parameter is being added: e.g. This patch implements a new subscription parameter called 'min_apply_delay'. ====== doc/src/sgml/config.sgml 3. + <para> + For time-delayed logical replication, the apply worker sends a Standby + Status Update message to the corresponding publisher per the indicated + time of this parameter. Therefore, if this parameter is longer than + <literal>wal_sender_timeout</literal> on the publisher, then the + walsender doesn't get any update message during the delay and repeatedly + terminates due to the timeout errors. Hence, make sure this parameter is + shorter than the <literal>wal_sender_timeout</literal> of the publisher. + If this parameter is set to zero with time-delayed replication, the + apply worker doesn't send any feedback messages during the + <literal>min_apply_delay</literal>. + </para> This paragraph seemed confusing. I think it needs to be reworded to change all of the "this parameter" references because there are at least 3 different parameters mentioned in this paragraph. e.g. maybe just change them to explicitly name the parameter you are talking about. I also think it needs to mention the ‘min_apply_delay’ subscription parameter up-front and then refer to it appropriately. The end result might be something like I wrote below (this is just my guess – probably you can word it better). SUGGESTION For time-delayed logical replication (i.e. when the subscription is created with parameter min_apply_delay > 0), the apply worker sends a Standby Status Update message to the publisher with a period of wal_receiver_status_interval . Make sure to set wal_receiver_status_interval less than the wal_sender_timeout on the publisher, otherwise, the walsender will repeatedly terminate due to the timeout errors. If wal_receiver_status_interval is set to zero, the apply worker doesn't send any feedback messages during the subscriber’s min_apply_delay period. ====== doc/src/sgml/ref/create_subscription.sgml 4. + <para> + By default, the subscriber applies changes as soon as possible. As + with the physical replication feature + (<xref linkend="guc-recovery-min-apply-delay"/>), it can be useful to + have a time-delayed logical replica. This parameter lets the user to + delay the application of changes by a specified amount of time. If this + value is specified without units, it is taken as milliseconds. The + default is zero(no delay). + </para> 4a. As with the physical replication feature (recovery_min_apply_delay), it can be useful to have a time-delayed logical replica. IMO not sure that the above sentence is necessary. It seems only to be saying that this parameter can be useful. Why do we need to say that? ~ 4b. "This parameter lets the user to delay" -> "This parameter lets the user delay" OR "This parameter lets the user to delay" -> "This parameter allows the user to delay" ~ 4c. "If this value is specified without units" -> "If the value is specified without units" ~ 4d. "zero(no delay)." -> "zero (no delay)." ---- 5. + <para> + The delay occurs only on WAL records for transaction begins and after + the initial table synchronization. It is possible that the + replication delay between publisher and subscriber exceeds the value + of this parameter, in which case no delay is added. Note that the + delay is calculated between the WAL time stamp as written on + publisher and the current time on the subscriber. Time spent in logical + decoding and in transferring the transaction may reduce the actual wait + time. If the system clocks on publisher and subscriber are not + synchronized, this may lead to apply changes earlier than expected, + but this is not a major issue because this parameter is typically much + larger than the time deviations between servers. Note that if this + parameter is set to a long delay, the replication will stop if the + replication slot falls behind the current LSN by more than + <link linkend="guc-max-slot-wal-keep-size"><literal>max_slot_wal_keep_size</literal></link>. + </para> I think the first part can be reworded slightly. See what you think about the suggestion below. SUGGESTION Any delay occurs only on WAL records for transaction begins after all initial table synchronization has finished. The delay is calculated between the WAL timestamp as written on the publisher and the current time on the subscriber. Any overhead of time spent in logical decoding and in transferring the transaction may reduce the actual wait time. It is also possible that the overhead already exceeds the requested 'min_apply_delay' value, in which case no additional wait is necessary. If the system clocks... ---- 6. + <para> + Setting streaming to <literal>parallel</literal> mode and <literal>min_apply_delay</literal> + simultaneously is not supported. + </para> SUGGESTION A non-zero min_apply_delay parameter is not allowed when streaming in parallel mode. ====== src/backend/commands/subscriptioncmds.c 7. parse_subscription_options @@ -404,6 +445,17 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, "slot_name = NONE", "create_slot = false"))); } } + + /* Test the combination of streaming mode and min_apply_delay */ + if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) && + opts->min_apply_delay > 0) + { + if (opts->streaming == LOGICALREP_STREAM_PARALLEL) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "min_apply_delay > 0", "streaming = parallel")); + } SUGGESTION (comment) The combination of parallel streaming mode and min_apply_delay is not allowed. ~~~ 8. AlterSubscription (general) I observed during testing there are 3 different errors…. At subscription CREATE time you can get this error: ERROR: min_apply_delay > 0 and streaming = parallel are mutually exclusive options If you try to ALTER the min_apply_delay when already streaming = parallel you can get this error: ERROR: cannot enable min_apply_delay for subscription in streaming = parallel mode If you try to ALTER the streaming to be parallel if there is already a min_apply_delay > 0 then you can get this error: ERROR: cannot enable streaming = parallel mode for subscription with min_apply_delay ~ IMO there is no need to have 3 different error message texts. I think all these cases are explained by just the first text (ERROR: min_apply_delay > 0 and streaming = parallel are mutually exclusive options) ~~~ 9. AlterSubscription @@ -1098,6 +1152,18 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { + /* + * Test the combination of streaming mode and + * min_apply_delay + */ + if (opts.streaming == LOGICALREP_STREAM_PARALLEL) + if ((IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) && opts.min_apply_delay > 0) || + (!IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) && sub->minapplydelay > 0)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable %s mode for subscription with %s", + "streaming = parallel", "min_apply_delay")); + 9a. SUGGESTION (comment) The combination of parallel streaming mode and min_apply_delay is not allowed. ~ 9b. (see AlterSubscription general review comment #8 above) Here you can use the same comment error message that says min_apply_delay > 0 and streaming = parallel are mutually exclusive options. ~~~ 10. AlterSubscription @@ -1111,6 +1177,25 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY)) + { + /* + * Test the combination of streaming mode and + * min_apply_delay + */ + if (opts.min_apply_delay > 0) + if ((IsSet(opts.specified_opts, SUBOPT_STREAMING) && opts.streaming == LOGICALREP_STREAM_PARALLEL) || + (!IsSet(opts.specified_opts, SUBOPT_STREAMING) && sub->stream == LOGICALREP_STREAM_PARALLEL)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable %s for subscription in %s mode", + "min_apply_delay", "streaming = parallel")); + + values[Anum_pg_subscription_subminapplydelay - 1] = + Int64GetDatum(opts.min_apply_delay); + replaces[Anum_pg_subscription_subminapplydelay - 1] = true; + } 10a. SUGGESTION (comment) The combination of parallel streaming mode and min_apply_delay is not allowed. ~ 10b. (see AlterSubscription general review comment #8 above) Here you can use the same comment error message that says min_apply_delay > 0 and streaming = parallel are mutually exclusive options. ====== .../replication/logical/applyparallelworker.c 11. @@ -704,7 +704,8 @@ pa_process_spooled_messages_if_required(void) { apply_spooled_messages(&MyParallelShared->fileset, MyParallelShared->xid, - InvalidXLogRecPtr); + InvalidXLogRecPtr, + 0); IMO this passing of 0 is a bit strange because it is currently acting like a dummy value since the apply_spooled_messages will never make use of the 'finish_ts' anyway (since this call is from a parallel apply worker). I think a better way to code this might be to pass the 0 (same as you are doing here) but inside the apply_spooled_messages change the code: FROM if (!am_parallel_apply_worker()) maybe_delay_apply(finish_ts); TO if (finish_ts) maybe_delay_apply(finish_ts); That does 2 things. - It makes the passed-in 0 have some meaning - It simplifies the apply_spooled_messages code ====== src/backend/replication/logical/worker.c 12. @@ -318,6 +318,17 @@ static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +/* + * In order to avoid walsender's timeout during time-delayed replication, + * it's necessary to keep sending feedback messages during the delay from the + * worker process. Meanwhile, the feature delays the apply before starting the + * transaction and thus we don't write WALs for the suspended changes during + * the wait. Hence, in the case the worker process sends a feedback message + * during the delay, we should not make positions of the flushed and apply LSN + * overwritten by the last received latest LSN. See send_feedback() for details. + */ +static XLogRecPtr last_received = InvalidXLogRecPtr; 12a. Suggest a small change to the first sentence of the comment. BEFORE In order to avoid walsender's timeout during time-delayed replication, it's necessary to keep sending feedback messages during the delay from the worker process. AFTER In order to avoid walsender timeout for time-delayed replication the worker process keeps sending feedback messages during the delay period. ~ 12b. "Hence, in the case" -> "When" ~~~ 13. forward declare -static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply, + bool in_delaying_apply); Change the param name: "in_delaying_apply" -> "in_delayed_apply” (??) ~~~ 14. maybe_delay_apply + /* Nothing to do if no delay set */ + if (MySubscription->minapplydelay <= 0) + return; IIUC min_apply_delay cannot be < 0 so this condition could simply be: if (!MySubscription->minapplydelay) return; ~~~ 15. maybe_delay_apply + /* + * The min_apply_delay parameter is ignored until all tablesync workers + * have reached READY state. If we allow the delay during the catchup + * phase, once we reach the limit of tablesync workers, it will impose a + * delay for each subsequent worker. It means it will take a long time to + * finish the initial table synchronization. + */ + if (!AllTablesyncsReady()) + return; SUGGESTION (slight rewording) The min_apply_delay parameter is ignored until all tablesync workers have reached READY state. This is because if we allowed the delay during the catchup phase, then once we reached the limit of tablesync workers it would impose a delay for each subsequent worker. That would cause initial table synchronization completion to take a long time. ~~~ 16. maybe_delay_apply + while (true) + { + long diffms; + + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); IMO there should be some small explanatory comment here at the top of the while loop. ~~~ 17. apply_spooled_messages @@ -2024,6 +2141,21 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, int fileno; off_t offset; + /* + * Should we delay the current transaction? + * + * Unlike the regular (non-streamed) cases, the delay is applied in a + * STREAM COMMIT/STREAM PREPARE message for streamed transactions. The + * STREAM START message does not contain a commit/prepare time (it will be + * available when the in-progress transaction finishes). Hence, it's not + * appropriate to apply a delay at that time. + * + * It's not allowed to execute time-delayed replication with parallel + * apply feature. + */ + if (!am_parallel_apply_worker()) + maybe_delay_apply(finish_ts); That whole comment part "Unlike the regular (non-streamed) cases" seems misplaced here. Perhaps this part of the comment is better put into the function header where the meaning of 'finish_ts' is explained? ~~~ 18. apply_spooled_messages + * It's not allowed to execute time-delayed replication with parallel + * apply feature. + */ + if (!am_parallel_apply_worker()) + maybe_delay_apply(finish_ts); As was mentioned in comment #11 above this code could be changed like if (finish_ts) maybe_delay_apply(finish_ts); then you don't even need to make mention of "parallel apply" at all here. OTOH if you want to still have the parallel apply comment then maybe reword it like this: "It is not allowed to combine time-delayed replication with the parallel apply feature." ~~~ 19. apply_spooled_messages If you chose not to do my suggestion from comment #11, then there are 2 identical conditions (!am_parallel_apply_worker()); In this case, I was wondering if it would be better to refactor to use a single condition instead. ~~~ 20. send_feedback (same as comment #13) Maybe change the new param name to “in_delayed_apply”? ~~~ 21. @@ -3737,8 +3869,15 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) /* * No outstanding transactions to flush, we can report the latest received * position. This is important for synchronous replication. + * + * During the delay of time-delayed replication, do not tell the publisher + * that the received latest LSN is already applied and flushed at this + * stage, since we don't apply the transaction yet. If we do so, it leads + * to a wrong assumption of logical replication progress on the publisher + * side. Here, we just send a feedback message to avoid publisher's + * timeout during the delay. */ Minor rewording of the comment SUGGESTION If the subscriber side apply is delayed (because of time-delayed replication) then do not tell the publisher that the received latest LSN is already applied and flushed, otherwise, it leads to the publisher side making a wrong assumption of logical replication progress. Instead, we just send a feedback message to avoid a publisher timeout during the delay. ====== src/bin/pg_dump/pg_dump.c 22. @@ -4546,9 +4547,14 @@ getSubscriptions(Archive *fout) LOGICALREP_TWOPHASE_STATE_DISABLED); if (fout->remoteVersion >= 160000) - appendPQExpBufferStr(query, " s.suborigin\n"); + appendPQExpBufferStr(query, + " s.suborigin,\n" + " s.subminapplydelay\n"); else - appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY); + { + appendPQExpBuffer(query, " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY); + appendPQExpBufferStr(query, " 0 AS subminapplydelay\n"); + } Can’t those appends in the else part can be combined to a single appendPQExpBuffer appendPQExpBuffer(query, " '%s' AS suborigin,\n" " 0 AS subminapplydelay\n" LOGICALREP_ORIGIN_ANY); ====== src/include/catalog/pg_subscription.h 23. @@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW XLogRecPtr subskiplsn; /* All changes finished at this LSN are * skipped */ + int64 subminapplydelay; /* Replication apply delay */ + NameData subname; /* Name of the subscription */ Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */ SUGGESTION (for comment) Replication apply delay (ms) ~~ 24. @@ -120,6 +122,7 @@ typedef struct Subscription * in */ XLogRecPtr skiplsn; /* All changes finished at this LSN are * skipped */ + int64 minapplydelay; /* Replication apply delay */ SUGGESTION (for comment) Replication apply delay (ms) ------ Kind Regards, Peter Smith. Fujitsu Australia
В списке pgsql-hackers по дате отправления:
Предыдущее
От: Michael PaquierДата:
Сообщение: Re: Generating code for query jumbling through gen_node_support.pl
Следующее
От: Michael PaquierДата:
Сообщение: Re: Modify the document of Logical Replication configuration settings