Обсуждение: RE: Conflict detection for update_deleted in logical replication

Поиск
Список
Период
Сортировка

RE: Conflict detection for update_deleted in logical replication

От
"Zhijie Hou (Fujitsu)"
Дата:
On Fri, May 16, 2025 at 7:31 PM Amit Kapila wrote:
> 
> A few more comments:
> =================
> 3.
> maybe_advance_nonremovable_xid(RetainConflictInfoData *data,
>    bool status_received)
> {
> /* Exit early if retaining conflict information is not required */
> if (!MySubscription->retainconflictinfo)
> return;
> 
> /*
> * It is sufficient to manage non-removable transaction ID for a
> * subscription by the main apply worker to detect update_deleted conflict
> * even for table sync or parallel apply workers.
> */
> if (!am_leader_apply_worker())
> return;
> 
> /* Exit early if we have already stopped retaining */
> if (MyLogicalRepWorker->stop_conflict_info_retention)
> return;
> ...
> 
> get_candidate_xid()
> {
> ...
> if (!TimestampDifferenceExceeds(data->candidate_xid_time, now,
> data->xid_advance_interval))
> return;
> 
> Would it be better to encapsulate all these preliminary checks that
> decide whether we can move to computing oldest_nonremovable_xid in a
> separate function? The check in get_candidate_xid would require some
> additional conditions because it is not required in every phase.
> Additionally, we can move the core phase processing logic to compute
> in a separate function. We can try this to see if the code looks
> better with such a refactoring.

I moved the switch case into a separate function process_rci_phase_transition()
and call it in each phase handling function (get_candidate_xid etc).

I also added a new function can_advance_nonremovable_xid to maintain the
preliminary checks. But after re-thinking, the timer check cannot be moved into
this function because it's needed even if being called from the new function
process_rci_phase_transition()->get_candidate_xid() to ensure we do not get
next xid too frequently.

> 
> 4.
> + /*
> + * Check if all remote concurrent transactions that were active at the
> + * first status request have now completed. If completed, proceed to the
> + * next phase; otherwise, continue checking the publisher status until
> + * these transactions finish.
> + */
> + if (FullTransactionIdPrecedesOrEquals(data->last_phase_at,
> +   remote_full_xid))
> + data->phase = RCI_WAIT_FOR_LOCAL_FLUSH;
> 
> I think there is a possibility of optimization here for cases where
> there are no new transactions on the publisher side across the next
> cycle of advancement of oldest_nonremovable_xid. We can simply set
> candidate_xid as oldest_nonremovable_xid instead of going into
> RCI_WAIT_FOR_LOCAL_FLUSH phase. If you want to keep the code simple
> for the first version, then at least note that down in comments, but
> OTOH, if it is simple to achieve, then let's do it now.

I think to implement this optimization, it's needed to compare both
remote_nextxid and remote_lsn across consecutive cycles. Although
remote_nextxid might remain unchanged between cycles, old transactions might
have been committed in between two cycles, not affecting nextxid. Therefore,
maintaining two fields last_remote_nextxid and last_remote_lsn within the
structure for comparison is required. Additionally, this optimization implies
skipping the clock skew check in last phase, unless we move the check to a
earlier place. Given that the cost of continuing in RCI_WAIT_FOR_LOCAL_FLUSH
when there's no publisher activity is minimal, I personally prefer keeping the
code simple in this version.

Best Regards,
Hou zj

Re: Conflict detection for update_deleted in logical replication

От
shveta malik
Дата:
Thanks you for v31 patch-set. Please find few comments on patch001:

1)

wait_for_local_flush:

+ if (data->last_recv_time &&
+ TimestampDifferenceExceeds(data->flushpos_update_time,
+    data->last_recv_time, WalWriterDelay))
+ {
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
+ bool have_pending_txes;
+
+ /* Fetch the latest remote flush position */
+ get_flush_position(&writepos, &flushpos, &have_pending_txes);
+
+ if (flushpos > last_flushpos)
+ last_flushpos = flushpos;
+
+ data->flushpos_update_time = data->last_recv_time;
+ }

We should only get new flush-position, if 'last_flushpos' is still
lesser than 'data->remote_lsn'. Since 'last_flushpos' is also updated
by 'send_feedback' and we do not update 'data->flushpos_update_time'
there, it is possible that we have latest flush position but still
TimestampDifferenceExceeds gives 'true', making it re-read the flush
position unnecessarily.

Having said that, I think the correct way will be to move
'flushpos_update_time' out of RetainConflictInfoData() similar to
last_flushpos. Let it be a static variable, then we can update it in
send_feedback as well.


2)

get_candidate_xid:

+ /* Return if the oldest_nonremovable_xid cannot be advanced */
+ if (FullTransactionIdFollowsOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ full_xid))
+ {
+ adjust_xid_advance_interval(data, false);
+ return;
+ }

I fail to think of a scenario where oldest_nonremovable_xid can be
greater than current oldest txn-id. I think we should only check for
'equal' in the above condition and assert if oldest_nonremovable_xid
is greater.

3)

+ elog(DEBUG2, "confirmed remote flush up to %X/%X: new
oldest_nonremovable_xid %u",
+ LSN_FORMAT_ARGS(data->remote_lsn),
+ XidFromFullTransactionId(data->candidate_xid));

This message is confusing as we have not confirmed remote flush,
instead it is local flush upton remote-lsn. We can say:
confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",


4)
Everywhere we are using variable name as 'data', it is a very generic
name. Shall we change it to 'conflict_info_data' or
'retain_conf_info_data'?


5)
+ * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
+ * there.). Note that DELAY_CHKPT_IN_COMMIT is used to find transactions
+ * in the critical commit section. We need to know about such transactions
+ * for conflict detection in logical replication. See
+ * GetOldestTransactionIdInCommit and its use.

Please update the comment as 'GetOldestTransactionIdInCommit' is no
longer there.

6)
+ * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
+ * there.). Note that DELAY_CHKPT_IN_COMMIT is used to find transactions

there.). --> there.) (another full stop not needed)


thanks
Shveta



Re: Conflict detection for update_deleted in logical replication

От
Amit Kapila
Дата:
On Fri, May 23, 2025 at 12:15 PM shveta malik <shveta.malik@gmail.com> wrote:
>
> Thanks you for v31 patch-set. Please find few comments on patch001:
>
> 1)
>
> wait_for_local_flush:
>
> + if (data->last_recv_time &&
> + TimestampDifferenceExceeds(data->flushpos_update_time,
> +    data->last_recv_time, WalWriterDelay))
> + {
> + XLogRecPtr writepos;
> + XLogRecPtr flushpos;
> + bool have_pending_txes;
> +
> + /* Fetch the latest remote flush position */
> + get_flush_position(&writepos, &flushpos, &have_pending_txes);
> +
> + if (flushpos > last_flushpos)
> + last_flushpos = flushpos;
> +
> + data->flushpos_update_time = data->last_recv_time;
> + }
>
> We should only get new flush-position, if 'last_flushpos' is still
> lesser than 'data->remote_lsn'. Since 'last_flushpos' is also updated
> by 'send_feedback' and we do not update 'data->flushpos_update_time'
> there, it is possible that we have latest flush position but still
> TimestampDifferenceExceeds gives 'true', making it re-read the flush
> position unnecessarily.
>
> Having said that, I think the correct way will be to move
> 'flushpos_update_time' out of RetainConflictInfoData() similar to
> last_flushpos. Let it be a static variable, then we can update it in
> send_feedback as well.
>

But then we may sometimes need to call GetCurrentTimestamp to set its
value, which is not required now. Because we expect to skip this check
only in case when we are frequently applying the changes.

--
With Regards,
Amit Kapila.



RE: Conflict detection for update_deleted in logical replication

От
"Zhijie Hou (Fujitsu)"
Дата:
On Fri, May 23, 2025 at 2:45 PM shveta malik wrote:

> 
> Thanks you for v31 patch-set. Please find few comments on patch001:

Thanks for the comments.

> 
> 1)
> 
> wait_for_local_flush:
> 
> + if (data->last_recv_time &&
> + TimestampDifferenceExceeds(data->flushpos_update_time,
> +    data->last_recv_time, WalWriterDelay))
> + {
> + XLogRecPtr writepos;
> + XLogRecPtr flushpos;
> + bool have_pending_txes;
> +
> + /* Fetch the latest remote flush position */
> + get_flush_position(&writepos, &flushpos, &have_pending_txes);
> +
> + if (flushpos > last_flushpos)
> + last_flushpos = flushpos;
> +
> + data->flushpos_update_time = data->last_recv_time;
> + }
> 
> We should only get new flush-position, if 'last_flushpos' is still
> lesser than 'data->remote_lsn'. Since 'last_flushpos' is also updated
> by 'send_feedback' and we do not update 'data->flushpos_update_time'
> there, it is possible that we have latest flush position but still
> TimestampDifferenceExceeds gives 'true', making it re-read the flush
> position unnecessarily.
> 
> Having said that, I think the correct way will be to move
> 'flushpos_update_time' out of RetainConflictInfoData() similar to
> last_flushpos. Let it be a static variable, then we can update it in
> send_feedback as well.

I added the check to update the flush when last_flushpos is behind.

But I tend to avoid adding more static variables if possible.
The flushpos_update_time is only used to prevent fetching flush position too
frequently which is specific to RCI logic. And even if in some cases, we might
have already updated the flush position in send_feedback(), I feel it's not a
big issue as the new flush position update logic is only used when applying
changes in a loop where send_feedback() is rarely invoked.

> 4)
> Everywhere we are using variable name as 'data', it is a very generic
> name. Shall we change it to 'conflict_info_data' or
> 'retain_conf_info_data'?

I changed them to rci_data which is shorter.

Best Regards,
Hou zj