Multiple message delivery on logical replication
От | Christophe Pettus |
---|---|
Тема | Multiple message delivery on logical replication |
Дата | |
Msg-id | A7E70A6B-04AB-4D26-BD7E-5E44DC0DA96D@thebuild.com обсуждение исходный текст |
Ответы |
Re: Multiple message delivery on logical replication
|
Список | psycopg |
I'm working with the logical replication support in psycopg2, and have found something surprising... this may be my error,of course! My sample program is below. It works wonderfully, but in the case when it starts, it re-receives the last message that ithandled, even with flushing it. Example: postgres@localhost:~/wal2pubsub$ python waltest.py {"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]} ^C postgres@localhost:~/wal2pubsub$ python waltest.py {"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]} There was no database activity in that period; it just replayed the same message. Shouldn't it have flushed to the end ofthe WAL stream and not reprocessed the last message? -- import psycopg2 from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL conn = psycopg2.connect('dbname=postgres', connection_factory=LogicalReplicationConnection) cur = conn.cursor() cur.start_replication(slot_name='test_slot', slot_type=REPLICATION_LOGICAL) from select import select from datetime import datetime def consume(msg): print(msg.payload) msg.cursor.send_feedback(flush_lsn=msg.data_start) try: cur.consume_stream(consume) except: pass -- -- Christophe Pettus xof@thebuild.com
В списке psycopg по дате отправления: