Обсуждение: RE: Conflict detection for update_deleted in logical replication
On Fri, May 16, 2025 at 7:31 PM Amit Kapila wrote: > > A few more comments: > ================= > 3. > maybe_advance_nonremovable_xid(RetainConflictInfoData *data, > bool status_received) > { > /* Exit early if retaining conflict information is not required */ > if (!MySubscription->retainconflictinfo) > return; > > /* > * It is sufficient to manage non-removable transaction ID for a > * subscription by the main apply worker to detect update_deleted conflict > * even for table sync or parallel apply workers. > */ > if (!am_leader_apply_worker()) > return; > > /* Exit early if we have already stopped retaining */ > if (MyLogicalRepWorker->stop_conflict_info_retention) > return; > ... > > get_candidate_xid() > { > ... > if (!TimestampDifferenceExceeds(data->candidate_xid_time, now, > data->xid_advance_interval)) > return; > > Would it be better to encapsulate all these preliminary checks that > decide whether we can move to computing oldest_nonremovable_xid in a > separate function? The check in get_candidate_xid would require some > additional conditions because it is not required in every phase. > Additionally, we can move the core phase processing logic to compute > in a separate function. We can try this to see if the code looks > better with such a refactoring. I moved the switch case into a separate function process_rci_phase_transition() and call it in each phase handling function (get_candidate_xid etc). I also added a new function can_advance_nonremovable_xid to maintain the preliminary checks. But after re-thinking, the timer check cannot be moved into this function because it's needed even if being called from the new function process_rci_phase_transition()->get_candidate_xid() to ensure we do not get next xid too frequently. > > 4. > + /* > + * Check if all remote concurrent transactions that were active at the > + * first status request have now completed. If completed, proceed to the > + * next phase; otherwise, continue checking the publisher status until > + * these transactions finish. > + */ > + if (FullTransactionIdPrecedesOrEquals(data->last_phase_at, > + remote_full_xid)) > + data->phase = RCI_WAIT_FOR_LOCAL_FLUSH; > > I think there is a possibility of optimization here for cases where > there are no new transactions on the publisher side across the next > cycle of advancement of oldest_nonremovable_xid. We can simply set > candidate_xid as oldest_nonremovable_xid instead of going into > RCI_WAIT_FOR_LOCAL_FLUSH phase. If you want to keep the code simple > for the first version, then at least note that down in comments, but > OTOH, if it is simple to achieve, then let's do it now. I think to implement this optimization, it's needed to compare both remote_nextxid and remote_lsn across consecutive cycles. Although remote_nextxid might remain unchanged between cycles, old transactions might have been committed in between two cycles, not affecting nextxid. Therefore, maintaining two fields last_remote_nextxid and last_remote_lsn within the structure for comparison is required. Additionally, this optimization implies skipping the clock skew check in last phase, unless we move the check to a earlier place. Given that the cost of continuing in RCI_WAIT_FOR_LOCAL_FLUSH when there's no publisher activity is minimal, I personally prefer keeping the code simple in this version. Best Regards, Hou zj
Thanks you for v31 patch-set. Please find few comments on patch001: 1) wait_for_local_flush: + if (data->last_recv_time && + TimestampDifferenceExceeds(data->flushpos_update_time, + data->last_recv_time, WalWriterDelay)) + { + XLogRecPtr writepos; + XLogRecPtr flushpos; + bool have_pending_txes; + + /* Fetch the latest remote flush position */ + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + if (flushpos > last_flushpos) + last_flushpos = flushpos; + + data->flushpos_update_time = data->last_recv_time; + } We should only get new flush-position, if 'last_flushpos' is still lesser than 'data->remote_lsn'. Since 'last_flushpos' is also updated by 'send_feedback' and we do not update 'data->flushpos_update_time' there, it is possible that we have latest flush position but still TimestampDifferenceExceeds gives 'true', making it re-read the flush position unnecessarily. Having said that, I think the correct way will be to move 'flushpos_update_time' out of RetainConflictInfoData() similar to last_flushpos. Let it be a static variable, then we can update it in send_feedback as well. 2) get_candidate_xid: + /* Return if the oldest_nonremovable_xid cannot be advanced */ + if (FullTransactionIdFollowsOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + full_xid)) + { + adjust_xid_advance_interval(data, false); + return; + } I fail to think of a scenario where oldest_nonremovable_xid can be greater than current oldest txn-id. I think we should only check for 'equal' in the above condition and assert if oldest_nonremovable_xid is greater. 3) + elog(DEBUG2, "confirmed remote flush up to %X/%X: new oldest_nonremovable_xid %u", + LSN_FORMAT_ARGS(data->remote_lsn), + XidFromFullTransactionId(data->candidate_xid)); This message is confusing as we have not confirmed remote flush, instead it is local flush upton remote-lsn. We can say: confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", 4) Everywhere we are using variable name as 'data', it is a very generic name. Shall we change it to 'conflict_info_data' or 'retain_conf_info_data'? 5) + * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes + * there.). Note that DELAY_CHKPT_IN_COMMIT is used to find transactions + * in the critical commit section. We need to know about such transactions + * for conflict detection in logical replication. See + * GetOldestTransactionIdInCommit and its use. Please update the comment as 'GetOldestTransactionIdInCommit' is no longer there. 6) + * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes + * there.). Note that DELAY_CHKPT_IN_COMMIT is used to find transactions there.). --> there.) (another full stop not needed) thanks Shveta
On Fri, May 23, 2025 at 12:15 PM shveta malik <shveta.malik@gmail.com> wrote: > > Thanks you for v31 patch-set. Please find few comments on patch001: > > 1) > > wait_for_local_flush: > > + if (data->last_recv_time && > + TimestampDifferenceExceeds(data->flushpos_update_time, > + data->last_recv_time, WalWriterDelay)) > + { > + XLogRecPtr writepos; > + XLogRecPtr flushpos; > + bool have_pending_txes; > + > + /* Fetch the latest remote flush position */ > + get_flush_position(&writepos, &flushpos, &have_pending_txes); > + > + if (flushpos > last_flushpos) > + last_flushpos = flushpos; > + > + data->flushpos_update_time = data->last_recv_time; > + } > > We should only get new flush-position, if 'last_flushpos' is still > lesser than 'data->remote_lsn'. Since 'last_flushpos' is also updated > by 'send_feedback' and we do not update 'data->flushpos_update_time' > there, it is possible that we have latest flush position but still > TimestampDifferenceExceeds gives 'true', making it re-read the flush > position unnecessarily. > > Having said that, I think the correct way will be to move > 'flushpos_update_time' out of RetainConflictInfoData() similar to > last_flushpos. Let it be a static variable, then we can update it in > send_feedback as well. > But then we may sometimes need to call GetCurrentTimestamp to set its value, which is not required now. Because we expect to skip this check only in case when we are frequently applying the changes. -- With Regards, Amit Kapila.
On Fri, May 23, 2025 at 2:45 PM shveta malik wrote: > > Thanks you for v31 patch-set. Please find few comments on patch001: Thanks for the comments. > > 1) > > wait_for_local_flush: > > + if (data->last_recv_time && > + TimestampDifferenceExceeds(data->flushpos_update_time, > + data->last_recv_time, WalWriterDelay)) > + { > + XLogRecPtr writepos; > + XLogRecPtr flushpos; > + bool have_pending_txes; > + > + /* Fetch the latest remote flush position */ > + get_flush_position(&writepos, &flushpos, &have_pending_txes); > + > + if (flushpos > last_flushpos) > + last_flushpos = flushpos; > + > + data->flushpos_update_time = data->last_recv_time; > + } > > We should only get new flush-position, if 'last_flushpos' is still > lesser than 'data->remote_lsn'. Since 'last_flushpos' is also updated > by 'send_feedback' and we do not update 'data->flushpos_update_time' > there, it is possible that we have latest flush position but still > TimestampDifferenceExceeds gives 'true', making it re-read the flush > position unnecessarily. > > Having said that, I think the correct way will be to move > 'flushpos_update_time' out of RetainConflictInfoData() similar to > last_flushpos. Let it be a static variable, then we can update it in > send_feedback as well. I added the check to update the flush when last_flushpos is behind. But I tend to avoid adding more static variables if possible. The flushpos_update_time is only used to prevent fetching flush position too frequently which is specific to RCI logic. And even if in some cases, we might have already updated the flush position in send_feedback(), I feel it's not a big issue as the new flush position update logic is only used when applying changes in a loop where send_feedback() is rarely invoked. > 4) > Everywhere we are using variable name as 'data', it is a very generic > name. Shall we change it to 'conflict_info_data' or > 'retain_conf_info_data'? I changed them to rci_data which is shorter. Best Regards, Hou zj