diff -cr cvs.head/doc/src/sgml/catalogs.sgml cvs.build/doc/src/sgml/catalogs.sgml
*** cvs.head/doc/src/sgml/catalogs.sgml 2010-02-10 20:32:57.000000000 +0100
--- cvs.build/doc/src/sgml/catalogs.sgml 2010-02-14 16:04:25.000000000 +0100
***************
*** 169,179 ****
- pg_listener
- asynchronous notification support
-
-
- pg_namespaceschemas
--- 169,174 ----
***************
*** 3253,3320 ****
-
- pg_listener
-
-
- pg_listener
-
-
-
- The catalog pg_listener supports the
- and
-
- commands. A listener creates an entry in
- pg_listener for each notification name
- it is listening for. A notifier scans pg_listener
- and updates each matching entry to show that a notification has occurred.
- The notifier also sends a signal (using the PID recorded in the table)
- to awaken the listener from sleep.
-
-
-
- pg_listener> Columns
-
-
-
-
- Name
- Type
- Description
-
-
-
-
-
- relname
- name
-
- Notify condition name. (The name need not match any actual
- relation in the database; the name relname> is historical.)
-
-
-
-
- listenerpid
- int4
- PID of the server process that created this entry
-
-
-
- notification
- int4
-
- Zero if no event is pending for this listener. If an event is
- pending, the PID of the server process that sent the notification
-
-
-
-
-
-
-
-
-
pg_namespace
--- 3248,3253 ----
diff -cr cvs.head/doc/src/sgml/func.sgml cvs.build/doc/src/sgml/func.sgml
*** cvs.head/doc/src/sgml/func.sgml 2010-02-14 16:02:44.000000000 +0100
--- cvs.build/doc/src/sgml/func.sgml 2010-02-14 16:04:25.000000000 +0100
***************
*** 11574,11579 ****
--- 11574,11585 ----
+ pg_listening()
+ set of text
+ channels that the session is currently listening on
+
+
+ session_usernamesession user name
***************
*** 11740,11745 ****
--- 11746,11760 ----
+ pg_listening
+
+
+
+ pg_listening returns a set of channels that the
+ current session is listening to. See for more information.
+
+
+ version
diff -cr cvs.head/doc/src/sgml/libpq.sgml cvs.build/doc/src/sgml/libpq.sgml
*** cvs.head/doc/src/sgml/libpq.sgml 2010-02-10 20:32:59.000000000 +0100
--- cvs.build/doc/src/sgml/libpq.sgml 2010-02-14 16:04:25.000000000 +0100
***************
*** 4116,4125 ****
can stop listening with the UNLISTEN command). All
sessions listening on a particular condition will be notified
asynchronously when a NOTIFY command with that
! condition name is executed by any session. No additional information
! is passed from the notifier to the listener. Thus, typically, any
! actual data that needs to be communicated is transferred through a
! database table. Commonly, the condition name is the same as the
associated table, but it is not necessary for there to be any associated
table.
--- 4116,4124 ----
can stop listening with the UNLISTEN command). All
sessions listening on a particular condition will be notified
asynchronously when a NOTIFY command with that
! condition name is executed by any session. A notification parameter can be
! used to communicate additional data to a listener.
! Commonly, the condition name is the same as the
associated table, but it is not necessary for there to be any associated
table.
***************
*** 4153,4161 ****
PQfreemem. It is sufficient to free the
PGnotify pointer; the
relname and extra
! fields do not represent separate allocations. (At present, the
! extra field is unused and will always point
! to an empty string.)
--- 4152,4158 ----
PQfreemem. It is sufficient to free the
PGnotify pointer; the
relname and extra
! fields do not represent separate allocations.
diff -cr cvs.head/doc/src/sgml/protocol.sgml cvs.build/doc/src/sgml/protocol.sgml
*** cvs.head/doc/src/sgml/protocol.sgml 2010-02-10 20:32:59.000000000 +0100
--- cvs.build/doc/src/sgml/protocol.sgml 2010-02-14 16:04:25.000000000 +0100
***************
*** 3192,3199 ****
Additional information passed from the notifying process.
- (Currently, this feature is unimplemented so the field
- is always an empty string.)
--- 3192,3197 ----
diff -cr cvs.head/doc/src/sgml/ref/listen.sgml cvs.build/doc/src/sgml/ref/listen.sgml
*** cvs.head/doc/src/sgml/ref/listen.sgml 2008-11-14 11:22:47.000000000 +0100
--- cvs.build/doc/src/sgml/ref/listen.sgml 2010-02-14 16:04:25.000000000 +0100
***************
*** 74,79 ****
--- 74,86 ----
+ Notes
+
+ A transaction that has executed LISTEN cannot be prepared for a two-phase commit.
+
+
+
+ Parameters
***************
*** 98,104 ****
LISTEN virtual;
NOTIFY virtual;
! Asynchronous notification "virtual" received from server process with PID 8448.
--- 105,111 ----
LISTEN virtual;
NOTIFY virtual;
! Asynchronous notification "virtual" () received from server process with PID 8448.
diff -cr cvs.head/doc/src/sgml/ref/notify.sgml cvs.build/doc/src/sgml/ref/notify.sgml
*** cvs.head/doc/src/sgml/ref/notify.sgml 2008-11-14 11:22:47.000000000 +0100
--- cvs.build/doc/src/sgml/ref/notify.sgml 2010-02-14 18:03:14.000000000 +0100
***************
*** 21,27 ****
! NOTIFY name
--- 21,27 ----
! NOTIFY name [ parameter ]
***************
*** 29,36 ****
Description
! The NOTIFY command sends a notification event to each
! client application that has previously executed
LISTEN name
for the specified notification name in the current database.
--- 29,37 ----
Description
! The NOTIFY command sends a notification event together
! with an optional notification parameter to each client application that has
! previously executed
LISTEN name
for the specified notification name in the current database.
***************
*** 39,54 ****
NOTIFY provides a simple form of signal or
interprocess communication mechanism for a collection of processes
accessing the same PostgreSQL database.
! Higher-level mechanisms can be built by using tables in the database to
! pass additional data (beyond a mere notification name) from notifier to
! listener(s).
The information passed to the client for a notification event includes the notification
! name and the notifying session's server process PID>. It is up to the
! database designer to define the notification names that will be used in a given
! database and what each one means.
--- 40,56 ----
NOTIFY provides a simple form of signal or
interprocess communication mechanism for a collection of processes
accessing the same PostgreSQL database.
! A notification parameter can be sent along with the notification and
! higher-level mechanisms for passing structured data can be built by using
! tables in the database to pass additional data from notifier to listener(s).
The information passed to the client for a notification event includes the notification
! name, the notifying session's server process PID> and the
! notification parameter (payload) which is an empty string if it has not been specified.
! It is up to the database designer to define the notification names that will
! be used in a given database and what each one means.
***************
*** 89,102 ****
! NOTIFY behaves like Unix signals in one important
! respect: if the same notification name is signaled multiple times in quick
! succession, recipients might get only one notification event for several executions
! of NOTIFY. So it is a bad idea to depend on the number
! of notifications received. Instead, use NOTIFY to wake up
! applications that need to pay attention to something, and use a database
! object (such as a sequence) to keep track of what happened or how many times
! it happened.
--- 91,111 ----
! If the same notification name is signaled multiple times from the same
! transaction and with an identical notification parameter (or an empty one), the
! database server can decide to deliver a single notification only.
! On the other hand, notifications with distinct notification parameters will
! always be delivered as distinct notifications. Similarly, notifications from
! different transactions will never get folded into one notification.
! NOTIFY also guarantees that notifications from the same
! transaction get delivered in the order they were sent.
!
!
!
! An alternative to specifying a notification parameter is to use NOTIFY to
! wake up applications that need to pay attention to something, and use a
! database object (such as a sequence) to keep track of what happened or how many
! times it happened.
***************
*** 111,122 ****
notification event message) is the same as one's own session's
PID> (available from libpq>). When they
are the same, the notification event is one's own work bouncing
! back, and can be ignored. (Despite what was said in the preceding
! paragraph, this is a safe technique.
! PostgreSQL keeps self-notifications
! separate from notifications arriving from other sessions, so you
! cannot miss an outside notification by ignoring your own
! notifications.)
--- 120,154 ----
notification event message) is the same as one's own session's
PID> (available from libpq>). When they
are the same, the notification event is one's own work bouncing
! back, and can be ignored.
!
!
!
!
! Notes
!
! A transaction that has executed LISTEN cannot be prepared for a two-phase commit.
!
!
! To send a notification you can also use the function
! pg_notify(text,
! text). The function takes the channel name as the
! first argument and the payload as the second. This could be more convenient
! to use in triggers and you can also use a non-constant channel name and
! parameter value.
!
!
! There is a limited queue for notifications. In case this queue is full, all
! transactions calling NOTIFY will fail.
!
!
! The queue it is quite large and should be sufficiently sized for almost
! every use case. However, no cleanup can take place if one of your backends
! executes LISTEN and then enters a transaction for a very
! long time. Once the queue is half full you will see warnings in the log file
! pointing you to the backend that is preventing cleanup. In this case you should
! make sure that this backend ends its current transaction so that cleanup can
! proceed.
***************
*** 132,137 ****
--- 164,183 ----
+
+ parameter
+
+
+ The notification parameter (payload) to be communicated along with the
+ notification. The character string is only allowed to consist of pure
+ ASCII 7-bit characters and must be shorter than 8000 characters.
+ Specifying a longer payload will cause an error. If you need to send
+ other characters or binary data, you need to take care of the encoding
+ and decoding (like base64) on your own. Alternatively you can store the
+ information in a database table and send the key of the record.
+
+
+
***************
*** 145,151 ****
LISTEN virtual;
NOTIFY virtual;
! Asynchronous notification "virtual" received from server process with PID 8448.
--- 191,203 ----
LISTEN virtual;
NOTIFY virtual;
! Asynchronous notification "virtual" () received from server process with PID 8448.
! NOTIFY virtual 'This is the payload';
! Asynchronous notification "virtual" (This is the payload) received from server process with PID 8448.
!
! LISTEN foo;
! SELECT pg_notify((SELECT 'fo' || 'o'), (SELECT 'pay' || 'load'));
! Asynchronous notification "foo" (payload) received from server process with PID 20801.
diff -cr cvs.head/doc/src/sgml/ref/unlisten.sgml cvs.build/doc/src/sgml/ref/unlisten.sgml
*** cvs.head/doc/src/sgml/ref/unlisten.sgml 2008-11-14 11:22:47.000000000 +0100
--- cvs.build/doc/src/sgml/ref/unlisten.sgml 2010-02-14 16:04:25.000000000 +0100
***************
*** 83,88 ****
--- 83,92 ----
At the end of each session, UNLISTEN * is
automatically executed.
+
+
+ A transaction that has executed LISTEN cannot be prepared for a two-phase commit.
+
***************
*** 94,100 ****
LISTEN virtual;
NOTIFY virtual;
! Asynchronous notification "virtual" received from server process with PID 8448.
--- 98,104 ----
LISTEN virtual;
NOTIFY virtual;
! Asynchronous notification "virtual" () received from server process with PID 8448.
diff -cr cvs.head/src/backend/access/transam/slru.c cvs.build/src/backend/access/transam/slru.c
*** cvs.head/src/backend/access/transam/slru.c 2010-01-05 12:39:22.000000000 +0100
--- cvs.build/src/backend/access/transam/slru.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 58,83 ****
#include "storage/shmem.h"
#include "miscadmin.h"
-
- /*
- * Define segment size. A page is the same BLCKSZ as is used everywhere
- * else in Postgres. The segment size can be chosen somewhat arbitrarily;
- * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
- * or 64K transactions for SUBTRANS.
- *
- * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
- * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
- * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
- * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need
- * take no explicit notice of that fact in this module, except when comparing
- * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
- *
- * Note: this file currently assumes that segment file names will be four
- * hex digits. This sets a lower bound on the segment size (64K transactions
- * for 32-bit TransactionIds).
- */
- #define SLRU_PAGES_PER_SEGMENT 32
-
#define SlruFileName(ctl, path, seg) \
snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
--- 58,63 ----
diff -cr cvs.head/src/backend/access/transam/twophase_rmgr.c cvs.build/src/backend/access/transam/twophase_rmgr.c
*** cvs.head/src/backend/access/transam/twophase_rmgr.c 2010-01-05 12:39:22.000000000 +0100
--- cvs.build/src/backend/access/transam/twophase_rmgr.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 25,31 ****
{
NULL, /* END ID */
lock_twophase_recover, /* Lock */
- NULL, /* notify/listen */
NULL, /* pgstat */
multixact_twophase_recover /* MultiXact */
};
--- 25,30 ----
***************
*** 34,40 ****
{
NULL, /* END ID */
lock_twophase_postcommit, /* Lock */
- notify_twophase_postcommit, /* notify/listen */
pgstat_twophase_postcommit, /* pgstat */
multixact_twophase_postcommit /* MultiXact */
};
--- 33,38 ----
***************
*** 43,49 ****
{
NULL, /* END ID */
lock_twophase_postabort, /* Lock */
- NULL, /* notify/listen */
pgstat_twophase_postabort, /* pgstat */
multixact_twophase_postabort /* MultiXact */
};
--- 41,46 ----
***************
*** 52,58 ****
{
NULL, /* END ID */
lock_twophase_standby_recover, /* Lock */
- NULL, /* notify/listen */
NULL, /* pgstat */
NULL /* MultiXact */
};
--- 49,54 ----
diff -cr cvs.head/src/backend/access/transam/xact.c cvs.build/src/backend/access/transam/xact.c
*** cvs.head/src/backend/access/transam/xact.c 2010-02-14 16:02:46.000000000 +0100
--- cvs.build/src/backend/access/transam/xact.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 1736,1743 ****
/* close large objects before lower-level cleanup */
AtEOXact_LargeObject(true);
! /* NOTIFY commit must come before lower-level cleanup */
! AtCommit_Notify();
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
--- 1736,1743 ----
/* close large objects before lower-level cleanup */
AtEOXact_LargeObject(true);
! /* Insert notifications sent by the NOTIFY command into the queue */
! AtCommit_NotifyBeforeCommit();
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
***************
*** 1815,1820 ****
--- 1815,1825 ----
AtEOXact_MultiXact();
+ /*
+ * Clean up Notify buffers and signal listening backends.
+ */
+ AtCommit_NotifyAfterCommit();
+
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
true, true);
diff -cr cvs.head/src/backend/catalog/Makefile cvs.build/src/backend/catalog/Makefile
*** cvs.head/src/backend/catalog/Makefile 2010-01-06 22:30:05.000000000 +0100
--- cvs.build/src/backend/catalog/Makefile 2010-02-14 16:04:25.000000000 +0100
***************
*** 30,36 ****
pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
! pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \
pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
--- 30,36 ----
pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \
pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \
! pg_statistic.h pg_rewrite.h pg_trigger.h pg_description.h \
pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \
pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \
pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \
diff -cr cvs.head/src/backend/commands/async.c cvs.build/src/backend/commands/async.c
*** cvs.head/src/backend/commands/async.c 2010-01-05 12:39:22.000000000 +0100
--- cvs.build/src/backend/commands/async.c 2010-02-14 18:09:42.000000000 +0100
***************
*** 13,44 ****
*/
/*-------------------------------------------------------------------------
! * New Async Notification Model:
! * 1. Multiple backends on same machine. Multiple backends listening on
! * one relation. (Note: "listening on a relation" is not really the
! * right way to think about it, since the notify names need not have
! * anything to do with the names of relations actually in the database.
! * But this terminology is all over the code and docs, and I don't feel
! * like trying to replace it.)
! *
! * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
! * ie, each relname/listenerPID pair. The "notification" field of the
! * tuple is zero when no NOTIFY is pending for that listener, or the PID
! * of the originating backend when a cross-backend NOTIFY is pending.
! * (We skip writing to pg_listener when doing a self-NOTIFY, so the
! * notification field should never be equal to the listenerPID field.)
! *
! * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
! * relname to a list of outstanding NOTIFY requests. Actual processing
! * happens if and only if we reach transaction commit. At that time (in
! * routine AtCommit_Notify) we scan pg_listener for matching relnames.
! * If the listenerPID in a matching tuple is ours, we just send a notify
! * message to our own front end. If it is not ours, and "notification"
! * is not already nonzero, we set notification to our own PID and send a
! * PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by
! * listenerPID).
! * BTW: if the signal operation fails, we presume that the listener backend
! * crashed without removing this tuple, and remove the tuple for it.
*
* 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* can call inbound-notify processing immediately if this backend is idle
--- 13,73 ----
*/
/*-------------------------------------------------------------------------
! * Async Notification Model as of 9.0:
! *
! * 1. Multiple backends on same machine. Multiple backends listening on
! * several channels. (This was previously called a "relation" even though it
! * is just an identifier and has nothing to do with a database relation.)
! *
! * 2. There is one central queue in the form of Slru backed file based storage
! * (directory pg_notify/), with several pages mapped into shared memory.
! * All listening backends read from that queue and all notifications are
! * placed in this queue, see the data structure AsyncQueueEntry.
! *
! * There is no central storage of which backend listens on which channel,
! * every backend has its own list.
! *
! * Every backend that is listening on at least one channel registers by
! * entering its Pid into the array of all backends. It then scans all
! * incoming notifications in the central queue and first compares the
! * database oid of the notification with its own database oid and then
! * compares the notified channel with the list of channels that it listens
! * to. All notifications without a match are just skipped.
! *
! * In case there is a match it delivers the corresponding notification to
! * its frontend.
! *
! * 3. The NOTIFY statement (routine Async_Notify) stores the notification
! * in a list which will not be processed until at transaction end. Every
! * notification can additionally send a "payload" which is an extra text
! * parameter to convey arbitrary information to the recipient.
! *
! * Duplicate notifications from the same transaction are sent out as one
! * notification only. This is done to save work when for example a trigger
! * on a 2 million row table fires a notification for each row that has been
! * changed. If the application needs to receive every single notification
! * that has been sent, it can easily add some unique string into the extra
! * payload parameter.
! *
! * Once the transaction commits, AtCommit_NotifyBeforeCommit() performs the
! * required changes regarding listeners (Listen/Unlisten) and then adds the
! * pending notifications to the head of the queue. The head pointer of the
! * queue always points to the next free position and a position is just a
! * page number and the offset in that page. This is done before marking the
! * transaction as committed in clog. If we run into problems writing the
! * notifications, we can still call elog(ERROR, ...) and the transaction
! * will roll back.
! *
! * Once we have put all of the notifications into the queue, we return to
! * CommitTransaction() which will then commit to clog.
! *
! * After clog commit we are called another time
! * (AtCommit_NotifyAfterCommit()). Here we check if we need to signal the
! * backends. In SignalBackends() we scan the list of listening backends and
! * send a PROCSIG_NOTIFY_INTERRUPT to every backend that has set its Pid (we
! * don't know which backend is listening on which channel so we need to send
! * a signal to every listening backend). We can exclude backends that are
! * already up to date.
*
* 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* can call inbound-notify processing immediately if this backend is idle
***************
*** 46,84 ****
* block). Otherwise the handler may only set a flag, which will cause the
* processing to occur just before we next go idle.
*
! * 5. Inbound-notify processing consists of scanning pg_listener for tuples
! * matching our own listenerPID and having nonzero notification fields.
! * For each such tuple, we send a message to our frontend and clear the
! * notification field. BTW: this routine has to start/commit its own
! * transaction, since by assumption it is only called from outside any
! * transaction.
! *
! * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
! * of pending actions. If we reach transaction commit, the changes are
! * applied to pg_listener just before executing any pending NOTIFYs. This
! * method is necessary because to avoid race conditions, we must hold lock
! * on pg_listener from when we insert a new listener tuple until we commit.
! * To do that and not create undue hazard of deadlock, we don't want to
! * touch pg_listener until we are otherwise done with the transaction;
! * in particular it'd be uncool to still be taking user-commanded locks
! * while holding the pg_listener lock.
! *
! * Although we grab ExclusiveLock on pg_listener for any operation,
! * the lock is never held very long, so it shouldn't cause too much of
! * a performance problem. (Previously we used AccessExclusiveLock, but
! * there's no real reason to forbid concurrent reads.)
*
! * An application that listens on the same relname it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
* by comparing be_pid in the NOTIFY message to the application's own backend's
! * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
* frontend during startup.) The above design guarantees that notifies from
! * other backends will never be missed by ignoring self-notifies. Note,
! * however, that we do *not* guarantee that a separate frontend message will
! * be sent for every outside NOTIFY. Since there is only room for one
! * originating PID in pg_listener, outside notifies occurring at about the
! * same time may be collapsed into a single message bearing the PID of the
! * first outside backend to perform the NOTIFY.
*-------------------------------------------------------------------------
*/
--- 75,96 ----
* block). Otherwise the handler may only set a flag, which will cause the
* processing to occur just before we next go idle.
*
! * Inbound-notify processing consists of reading all of the notifications
! * that have arrived since scanning last time. We read every notification
! * until we reach either a notification from an uncommitted transaction or
! * the head pointer's position. Then we check if we were the laziest
! * backend: if our pointer is set to the same position as the global tail
! * pointer is set, then we set it further to the second-laziest backend (We
! * can identify it by inspecting the positions of all other backends'
! * pointers). Whenever we move the tail pointer we also truncate now unused
! * pages (i.e. delete files in pg_notify/ that are no longer used).
*
! * An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
* by comparing be_pid in the NOTIFY message to the application's own backend's
! * Pid. (As of FE/BE protocol 2.0, the backend's Pid is provided to the
* frontend during startup.) The above design guarantees that notifies from
! * other backends will never be missed by ignoring self-notifies.
*-------------------------------------------------------------------------
*/
***************
*** 88,97 ****
#include
#include "access/heapam.h"
! #include "access/twophase_rmgr.h"
#include "access/xact.h"
! #include "catalog/pg_listener.h"
#include "commands/async.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
--- 100,111 ----
#include
#include "access/heapam.h"
! #include "access/slru.h"
! #include "access/transam.h"
#include "access/xact.h"
! #include "catalog/pg_type.h"
#include "commands/async.h"
+ #include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
***************
*** 108,115 ****
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
! * all actions requested in the current transaction. As explained above,
! * we don't actually modify pg_listener until we reach transaction commit.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
--- 122,129 ----
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
! * all actions requested in the current transaction. As explained above,
! * we don't actually send notifications until we reach transaction commit.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
***************
*** 126,132 ****
typedef struct
{
ListenActionKind action;
! char condname[1]; /* actually, as long as needed */
} ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */
--- 140,146 ----
typedef struct
{
ListenActionKind action;
! char channel[1]; /* actually, as long as needed */
} ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */
***************
*** 134,140 ****
static List *upperPendingActions = NIL; /* list of upper-xact lists */
/*
! * State for outbound notifies consists of a list of all relnames NOTIFYed
* in the current transaction. We do not actually perform a NOTIFY until
* and unless the transaction commits. pendingNotifies is NIL if no
* NOTIFYs have been done in the current transaction.
--- 148,154 ----
static List *upperPendingActions = NIL; /* list of upper-xact lists */
/*
! * State for outbound notifies consists of a list of all channels NOTIFYed
* in the current transaction. We do not actually perform a NOTIFY until
* and unless the transaction commits. pendingNotifies is NIL if no
* NOTIFYs have been done in the current transaction.
***************
*** 149,160 ****
* condition name, it will get a self-notify at commit. This is a bit odd
* but is consistent with our historical behavior.
*/
- static List *pendingNotifies = NIL; /* list of C strings */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
/*
! * State for inbound notifies consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessIncomingNotify
* directly, and one saying whether the signal has occurred but the handler
* was not allowed to call ProcessIncomingNotify at the time.
--- 163,285 ----
* condition name, it will get a self-notify at commit. This is a bit odd
* but is consistent with our historical behavior.
*/
+ typedef struct QueuePosition
+ {
+ int page;
+ int offset;
+ } QueuePosition;
+
+ typedef struct Notification
+ {
+ char *channel;
+ char *payload;
+ TransactionId xid;
+ int32 srcPid;
+ } Notification;
+
+ typedef struct AsyncQueueEntry
+ {
+ /*
+ * this record has the maximal length, but usually we limit it to
+ * AsyncQueueEntryEmptySize + strlen(payload).
+ */
+ Size length;
+ Oid dboid;
+ TransactionId xid;
+ int32 srcPid;
+ char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
+ } AsyncQueueEntry;
+ #define AsyncQueueEntryEmptySize \
+ (sizeof(AsyncQueueEntry) - NOTIFY_PAYLOAD_MAX_LENGTH + 1 \
+ - NAMEDATALEN + 1)
+
+ #define InvalidPid (-1)
+ #define QUEUE_POS_PAGE(x) ((x).page)
+ #define QUEUE_POS_OFFSET(x) ((x).offset)
+ #define QUEUE_POS_EQUAL(x,y) \
+ ((x).page == (y).page ? (x).offset == (y).offset : false)
+ #define SET_QUEUE_POS(x,y,z) \
+ do { \
+ (x).page = (y); \
+ (x).offset = (z); \
+ } while (0);
+ /* does page x logically precede page y with z = HEAD ? */
+ #define QUEUE_POS_MIN(x,y,z) \
+ asyncQueuePagePrecedesLogically((x).page, (y).page, (z).page) ? (x) : \
+ asyncQueuePagePrecedesLogically((y).page, (x).page, (z).page) ? (y) : \
+ (x).offset < (y).offset ? (x) : \
+ (y)
+ #define QUEUE_BACKEND_POS(i) asyncQueueControl->backend[(i)].pos
+ #define QUEUE_BACKEND_PID(i) asyncQueueControl->backend[(i)].pid
+ #define QUEUE_HEAD asyncQueueControl->head
+ #define QUEUE_TAIL asyncQueueControl->tail
+
+ typedef struct QueueBackendStatus
+ {
+ int32 pid;
+ QueuePosition pos;
+ } QueueBackendStatus;
+
+ /*
+ * The AsyncQueueControl structure is protected by the AsyncQueueLock.
+ *
+ * In SHARED mode, backends will only inspect their own entries as well as
+ * head and tail pointers. Consequently we can allow a backend to update its
+ * own record while holding only a shared lock (since no other backend will
+ * inspect it).
+ *
+ * In EXCLUSIVE mode, backends can inspect the entries of other backends and
+ * also change head and tail pointers.
+ *
+ * In order to avoid deadlocks, whenever we need both locks, we always first
+ * get AsyncQueueLock and then AsyncCtlLock.
+ */
+ typedef struct AsyncQueueControl
+ {
+ QueuePosition head; /* head points to the next free location */
+ QueuePosition tail; /* the global tail is equivalent to the
+ tail of the "slowest" backend */
+ TimestampTz lastQueueFillWarn; /* when the queue is full we only
+ want to log that once in a
+ while */
+ QueueBackendStatus backend[1]; /* actually this one has as many entries as
+ * connections are allowed (MaxBackends) */
+ /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
+ } AsyncQueueControl;
+
+ static AsyncQueueControl *asyncQueueControl;
+ static SlruCtlData AsyncCtlData;
+
+ #define AsyncCtl (&AsyncCtlData)
+ #define QUEUE_PAGESIZE BLCKSZ
+ #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
+
+ /*
+ * slru.c currently assumes that all filenames are four characters of hex
+ * digits. That means that we can use segments 0000 through FFFF.
+ * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us
+ * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0xFFFF.
+ *
+ * It's of course easy to enhance slru.c but those pages give us so much
+ * space already that it doesn't seem worth the trouble...
+ *
+ * It's an interesting test case to define QUEUE_MAX_PAGE to a very small
+ * multiple of SLRU_PAGES_PER_SEGMENT to test queue full behaviour.
+ */
+ #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0xFFFF)
+
+ static List *pendingNotifies = NIL; /* list of Notifications */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
+ static List *listenChannels = NIL; /* list of channels we are listening to */
+
+ /* has this backend sent notifications in the current transaction ? */
+ static bool backendHasSentNotifications = false;
+ /* has this backend executed a LISTEN in the current transaction ? */
+ static bool backendHasExecutedInitialListen = false;
/*
! * State for inbound notifications consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessIncomingNotify
* directly, and one saying whether the signal has occurred but the handler
* was not allowed to call ProcessIncomingNotify at the time.
***************
*** 171,224 ****
bool Trace_notify = false;
!
! static void queue_listen(ListenActionKind action, const char *condname);
static void Async_UnlistenOnExit(int code, Datum arg);
! static void Exec_Listen(Relation lRel, const char *relname);
! static void Exec_Unlisten(Relation lRel, const char *relname);
! static void Exec_UnlistenAll(Relation lRel);
! static void Send_Notify(Relation lRel);
static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
! static bool AsyncExistsPendingNotify(const char *relname);
static void ClearPendingActionsAndNotifies(void);
/*
* Async_Notify
*
* This is executed by the SQL notify command.
*
! * Adds the relation to the list of pending notifies.
* Actual notification happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
! Async_Notify(const char *relname)
{
if (Trace_notify)
! elog(DEBUG1, "Async_Notify(%s)", relname);
! /* no point in making duplicate entries in the list ... */
! if (!AsyncExistsPendingNotify(relname))
{
! /*
! * The name list needs to live until end of transaction, so store it
! * in the transaction context.
! */
! MemoryContext oldcontext;
! oldcontext = MemoryContextSwitchTo(CurTransactionContext);
! /*
! * Ordering of the list isn't important. We choose to put new entries
! * on the front, as this might make duplicate-elimination a tad faster
! * when the same condition is signaled many times in a row.
! */
! pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
! MemoryContextSwitchTo(oldcontext);
! }
}
/*
--- 296,520 ----
bool Trace_notify = false;
! static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
! static bool IsListeningOn(const char *channel);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
! static void Exec_ListenBeforeCommit(const char *channel);
! static void Exec_ListenAfterCommit(const char *channel);
! static void Exec_UnlistenAfterCommit(const char *channel);
! static void Exec_UnlistenAllAfterCommit(void);
! static void SignalBackends(void);
! static bool asyncQueuePagePrecedesPhysically(int p, int q);
! static bool asyncQueuePagePrecedesLogically(int p, int q, int head);
! static bool asyncQueueAdvance(QueuePosition *position, int entryLength);
! static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
! static void asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n);
! static List *asyncQueueAddEntries(List *notifications);
! static bool asyncQueueGetEntriesByPage(QueuePosition *current,
! QueuePosition stop,
! List **notifications);
! static void asyncQueueReadAllNotifications(void);
! static void asyncQueueAdvanceTail(void);
! static void asyncQueueUnregister(void);
! static void asyncQueueFillWarning(void);
! static bool asyncQueueIsFull(void);
static void ProcessIncomingNotify(void);
! static void NotifyMyFrontEnd(const char *channel,
! const char *payload,
! int32 srcPid);
! static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void);
+ /*
+ * We will work on the page range of 0..(SLRU_PAGES_PER_SEGMENT * 0xFFFF).
+ * asyncQueuePagePrecedesPhysically just checks numerically without any magic
+ * if one page precedes another one.
+ *
+ * On the other hand, when asyncQueuePagePrecedesLogically does that check, it
+ * takes the current head page number into account. If we have wrapped
+ * around, it can happen that p precedes q, even though p > q (if the head page
+ * is in between the two).
+ */
+ static bool
+ asyncQueuePagePrecedesPhysically(int p, int q)
+ {
+ return p < q;
+ }
+
+ static bool
+ asyncQueuePagePrecedesLogically(int p, int q, int head)
+ {
+ if (p <= head && q <= head)
+ return p < q;
+ if (p > head && q > head)
+ return p < q;
+ if (p <= head)
+ {
+ Assert(q > head);
+ /* q is older */
+ return false;
+ }
+ else
+ {
+ Assert(p > head && q <= head);
+ /* p is older */
+ return true;
+ }
+ }
+
+ void
+ AsyncShmemInit(void)
+ {
+ bool found;
+ int slotno;
+ Size size;
+
+ /*
+ * Remember that sizeof(AsyncQueueControl) already contains one member of
+ * QueueBackendStatus, so we only need to add the status space requirement
+ * for MaxBackends-1 backends.
+ */
+ size = mul_size(MaxBackends-1, sizeof(QueueBackendStatus));
+ size = add_size(size, sizeof(AsyncQueueControl));
+
+ asyncQueueControl = (AsyncQueueControl *)
+ ShmemInitStruct("Async Queue Control", size, &found);
+
+ if (!asyncQueueControl)
+ elog(ERROR, "out of memory");
+
+ if (!found)
+ {
+ int i;
+ SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
+ SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+ for (i = 0; i < MaxBackends; i++)
+ {
+ SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ QUEUE_BACKEND_PID(i) = InvalidPid;
+ }
+ }
+
+ AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically;
+ SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
+ AsyncCtlLock, "pg_notify");
+ AsyncCtl->do_fsync = false;
+ asyncQueueControl->lastQueueFillWarn = GetCurrentTimestamp();
+
+ if (!found)
+ {
+ SlruScanDirectory(AsyncCtl,
+ QUEUE_MAX_PAGE + SLRU_PAGES_PER_SEGMENT,
+ true);
+
+ LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+ slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+ SimpleLruWritePage(AsyncCtl, slotno, NULL);
+ LWLockRelease(AsyncCtlLock);
+ }
+ }
+
+
+ /*
+ * pg_notify -
+ * Send a notification to listening clients
+ */
+ Datum
+ pg_notify(PG_FUNCTION_ARGS)
+ {
+ const char *channel;
+ const char *payload;
+
+ if (PG_ARGISNULL(0))
+ channel = "";
+ else
+ channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+ if (PG_ARGISNULL(1))
+ payload = "";
+ else
+ payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
+
+ Async_Notify(channel, payload);
+
+ PG_RETURN_VOID();
+ }
+
/*
* Async_Notify
*
* This is executed by the SQL notify command.
*
! * Adds the channel to the list of pending notifies.
* Actual notification happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
! Async_Notify(const char *channel, const char *payload)
{
+ Notification *n;
+ MemoryContext oldcontext;
+ int i;
+
if (Trace_notify)
! elog(DEBUG1, "Async_Notify(%s)", channel);
! /* a channel name must be specified */
! if (!channel || !strlen(channel))
! ereport(ERROR,
! (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
! errmsg("channel name cannot be empty")));
!
! if (strlen(channel) >= NAMEDATALEN)
! ereport(ERROR,
! (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
! errmsg("channel name too long")));
!
! if (payload)
{
! if (strlen(payload) > NOTIFY_PAYLOAD_MAX_LENGTH - 1)
! ereport(ERROR,
! (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
! errmsg("payload string too long")));
!
! for (i = 0; i < strlen(payload); i++)
! if (payload[i] < 32 || payload[i] > 126)
! ereport(ERROR,
! (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
! errmsg("invalid character in payload"),
! errdetail("only 7-bit ASCII characters allowed")));
! }
! /* no point in making duplicate entries in the list ... */
! if (AsyncExistsPendingNotify(channel, payload))
! return;
! /*
! * The name list needs to live until end of transaction, so store it
! * in the transaction context.
! */
! oldcontext = MemoryContextSwitchTo(CurTransactionContext);
! n = (Notification *) palloc(sizeof(Notification));
! n->channel = pstrdup(channel);
! if (payload)
! n->payload = pstrdup(payload);
! else
! n->payload = "";
!
! /* will set the xid and the srcPid later... */
! n->xid = InvalidTransactionId;
! n->srcPid = InvalidPid;
!
! /*
! * We want to preserve the order so we need to append every
! * notification. See comments at AsyncExistsPendingNotify().
! */
! pendingNotifies = lappend(pendingNotifies, n);
!
! MemoryContextSwitchTo(oldcontext);
}
/*
***************
*** 226,236 ****
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
! * Actual update of pg_listener happens during transaction commit.
! * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
static void
! queue_listen(ListenActionKind action, const char *condname)
{
MemoryContext oldcontext;
ListenAction *actrec;
--- 522,532 ----
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
! * Actual update of the notification queue happens during transaction
! * commit.
*/
static void
! queue_listen(ListenActionKind action, const char *channel)
{
MemoryContext oldcontext;
ListenAction *actrec;
***************
*** 244,252 ****
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/* space for terminating null is included in sizeof(ListenAction) */
! actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname));
actrec->action = action;
! strcpy(actrec->condname, condname);
pendingActions = lappend(pendingActions, actrec);
--- 540,548 ----
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/* space for terminating null is included in sizeof(ListenAction) */
! actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel));
actrec->action = action;
! strcpy(actrec->channel, channel);
pendingActions = lappend(pendingActions, actrec);
***************
*** 259,270 ****
* This is executed by the SQL listen command.
*/
void
! Async_Listen(const char *relname)
{
if (Trace_notify)
! elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
! queue_listen(LISTEN_LISTEN, relname);
}
/*
--- 555,566 ----
* This is executed by the SQL listen command.
*/
void
! Async_Listen(const char *channel)
{
if (Trace_notify)
! elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
! queue_listen(LISTEN_LISTEN, channel);
}
/*
***************
*** 273,288 ****
* This is executed by the SQL unlisten command.
*/
void
! Async_Unlisten(const char *relname)
{
if (Trace_notify)
! elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NIL && !unlistenExitRegistered)
return;
! queue_listen(LISTEN_UNLISTEN, relname);
}
/*
--- 569,584 ----
* This is executed by the SQL unlisten command.
*/
void
! Async_Unlisten(const char *channel)
{
if (Trace_notify)
! elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NIL && !unlistenExitRegistered)
return;
! queue_listen(LISTEN_UNLISTEN, channel);
}
/*
***************
*** 306,313 ****
/*
* Async_UnlistenOnExit
*
- * Clean up the pg_listener table at backend exit.
- *
* This is executed if we have done any LISTENs in this backend.
* It might not be necessary anymore, if the user UNLISTENed everything,
* but we don't try to detect that case.
--- 602,607 ----
***************
*** 315,331 ****
static void
Async_UnlistenOnExit(int code, Datum arg)
{
- /*
- * We need to start/commit a transaction for the unlisten, but if there is
- * already an active transaction we had better abort that one first.
- * Otherwise we'd end up committing changes that probably ought to be
- * discarded.
- */
AbortOutOfAnyTransaction();
! /* Now we can do the unlisten */
! StartTransactionCommand();
! Async_UnlistenAll();
! CommitTransactionCommand();
}
/*
--- 609,616 ----
static void
Async_UnlistenOnExit(int code, Datum arg)
{
AbortOutOfAnyTransaction();
! Exec_UnlistenAllAfterCommit();
}
/*
***************
*** 337,386 ****
void
AtPrepare_Notify(void)
{
- ListCell *p;
-
/* It's not sensible to have any pending LISTEN/UNLISTEN actions */
! if (pendingActions)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN")));
!
! /* We can deal with pending NOTIFY though */
! foreach(p, pendingNotifies)
! {
! const char *relname = (const char *) lfirst(p);
!
! RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
! relname, strlen(relname) + 1);
! }
!
! /*
! * We can clear the state immediately, rather than needing a separate
! * PostPrepare call, because if the transaction fails we'd just discard
! * the state anyway.
! */
! ClearPendingActionsAndNotifies();
}
/*
! * AtCommit_Notify
*
! * This is called at transaction commit.
*
! * If there are pending LISTEN/UNLISTEN actions, insert or delete
! * tuples in pg_listener accordingly.
*
! * If there are outbound notify requests in the pendingNotifies list,
! * scan pg_listener for matching tuples, and either signal the other
! * backend or send a message to our own frontend.
! *
! * NOTE: we are still inside the current transaction, therefore can
! * piggyback on its committing of changes.
*/
void
! AtCommit_Notify(void)
{
- Relation lRel;
ListCell *p;
if (pendingActions == NIL && pendingNotifies == NIL)
--- 622,649 ----
void
AtPrepare_Notify(void)
{
/* It's not sensible to have any pending LISTEN/UNLISTEN actions */
! if (pendingActions || pendingNotifies)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("cannot PREPARE a transaction that has executed LISTEN/UNLISTEN or NOTIFY")));
}
/*
! * AtCommit_NotifyBeforeCommit
*
! * This is called at transaction commit, before actually committing to
! * clog.
*
! * If there are pending LISTEN/UNLISTEN actions, update our
! * "listenChannels" list.
*
! * If there are outbound notify requests in the pendingNotifies list, add
! * them to the global queue and signal any backend that is listening.
*/
void
! AtCommit_NotifyBeforeCommit(void)
{
ListCell *p;
if (pendingActions == NIL && pendingNotifies == NIL)
***************
*** 397,406 ****
}
if (Trace_notify)
! elog(DEBUG1, "AtCommit_Notify");
! /* Acquire ExclusiveLock on pg_listener */
! lRel = heap_open(ListenerRelationId, ExclusiveLock);
/* Perform any pending listen/unlisten actions */
foreach(p, pendingActions)
--- 660,669 ----
}
if (Trace_notify)
! elog(DEBUG1, "AtCommit_NotifyBeforeCommit");
! Assert(backendHasSentNotifications == false);
! Assert(backendHasExecutedInitialListen == false);
/* Perform any pending listen/unlisten actions */
foreach(p, pendingActions)
***************
*** 410,508 ****
switch (actrec->action)
{
case LISTEN_LISTEN:
! Exec_Listen(lRel, actrec->condname);
break;
case LISTEN_UNLISTEN:
! Exec_Unlisten(lRel, actrec->condname);
break;
case LISTEN_UNLISTEN_ALL:
! Exec_UnlistenAll(lRel);
break;
}
-
- /* We must CCI after each action in case of conflicting actions */
- CommandCounterIncrement();
}
- /* Perform any pending notifies */
- if (pendingNotifies)
- Send_Notify(lRel);
-
/*
! * We do NOT release the lock on pg_listener here; we need to hold it
! * until end of transaction (which is about to happen, anyway) to ensure
! * that notified backends see our tuple updates when they look. Else they
! * might disregard the signal, which would make the application programmer
! * very unhappy. Also, this prevents race conditions when we have just
! * inserted a listening tuple.
*/
! heap_close(lRel, NoLock);
!
! ClearPendingActionsAndNotifies();
! if (Trace_notify)
! elog(DEBUG1, "AtCommit_Notify: done");
}
/*
! * Exec_Listen --- subroutine for AtCommit_Notify
*
! * Register the current backend as listening on the specified relation.
*/
! static void
! Exec_Listen(Relation lRel, const char *relname)
{
! HeapScanDesc scan;
! HeapTuple tuple;
! Datum values[Natts_pg_listener];
! bool nulls[Natts_pg_listener];
! NameData condname;
! bool alreadyListener = false;
! if (Trace_notify)
! elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
! /* Detect whether we are already listening on this relname */
! scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
! Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
! if (listener->listenerpid == MyProcPid &&
! strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
! alreadyListener = true;
! /* No need to scan the rest of the table */
! break;
}
}
- heap_endscan(scan);
! if (alreadyListener)
! return;
! /*
! * OK to insert a new tuple
! */
! memset(nulls, false, sizeof(nulls));
! namestrcpy(&condname, relname);
! values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
! values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid);
! values[Anum_pg_listener_notification - 1] = Int32GetDatum(0); /* no notifies pending */
! tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);
! simple_heap_insert(lRel, tuple);
! #ifdef NOT_USED /* currently there are no indexes */
! CatalogUpdateIndexes(lRel, tuple);
! #endif
! heap_freetuple(tuple);
/*
! * now that we are listening, make sure we will unlisten before dying.
*/
if (!unlistenExitRegistered)
{
--- 673,886 ----
switch (actrec->action)
{
case LISTEN_LISTEN:
! Exec_ListenBeforeCommit(actrec->channel);
break;
case LISTEN_UNLISTEN:
! /* there is no Exec_UnlistenBeforeCommit() */
break;
case LISTEN_UNLISTEN_ALL:
! /* there is no Exec_UnlistenAllBeforeCommit() */
break;
}
}
/*
! * Perform any pending notifies.
*/
! if (pendingNotifies)
! {
! backendHasSentNotifications = true;
! while (pendingNotifies != NIL)
! {
! /*
! * Add the pending notifications to the queue.
! *
! * A full queue is very uncommon and should really not happen,
! * given that we have so much space available in the slru pages.
! * Nevertheless we need to deal with this possibility. Note that
! * when we get here we are in the process of committing our
! * transaction, we have not yet committed to clog but this would be
! * the next step. So at this point in time we can still roll the
! * transaction back.
! */
! LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! asyncQueueFillWarning();
! if (asyncQueueIsFull())
! {
! LWLockRelease(AsyncQueueLock);
! ereport(ERROR,
! (errcode(ERRCODE_TOO_MANY_ENTRIES),
! errmsg("Too many notifications in the queue")));
! }
! pendingNotifies = asyncQueueAddEntries(pendingNotifies);
! LWLockRelease(AsyncQueueLock);
! }
! }
}
/*
! * AtCommit_NotifyAfterCommit
! *
! * This is called at transaction commit, after committing to clog.
*
! * Notify the listening backends.
*/
! void
! AtCommit_NotifyAfterCommit(void)
{
! ListCell *p;
! /* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
! * return as soon as possible */
! if (!pendingActions && !backendHasSentNotifications)
! return;
! /* Perform any pending listen/unlisten actions */
! foreach(p, pendingActions)
{
! ListenAction *actrec = (ListenAction *) lfirst(p);
! switch (actrec->action)
{
! case LISTEN_LISTEN:
! Exec_ListenAfterCommit(actrec->channel);
! break;
! case LISTEN_UNLISTEN:
! Exec_UnlistenAfterCommit(actrec->channel);
! break;
! case LISTEN_UNLISTEN_ALL:
! Exec_UnlistenAllAfterCommit();
! break;
}
}
! if (backendHasSentNotifications)
! SignalBackends();
! ClearPendingActionsAndNotifies();
!
! if (Trace_notify)
! elog(DEBUG1, "AtCommit_NotifyAfterCommit: done");
! }
!
! /*
! * This function is executed for every notification found in the queue in order
! * to check if the current backend is listening on that channel. Not sure if we
! * should further optimize this, for example convert to a sorted array and
! * allow binary search on it...
! */
! static bool
! IsListeningOn(const char *channel)
! {
! ListCell *p;
! char *lchan;
!
! foreach(p, listenChannels)
! {
! lchan = (char *) lfirst(p);
! if (strcmp(lchan, channel) == 0)
! return true;
! }
! return false;
! }
! Datum
! pg_listening(PG_FUNCTION_ARGS)
! {
! FuncCallContext *funcctx;
! ListCell **lcp;
! /* stuff done only on the first call of the function */
! if (SRF_IS_FIRSTCALL())
! {
! MemoryContext oldcontext;
! /* create a function context for cross-call persistence */
! funcctx = SRF_FIRSTCALL_INIT();
! /*
! * switch to memory context appropriate for multiple function calls
! */
! oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
!
! /* allocate memory for user context */
! lcp = (ListCell **) palloc(sizeof(ListCell **));
! if (listenChannels != NIL)
! *lcp = list_head(listenChannels);
! else
! *lcp = NULL;
! funcctx->user_fctx = (void *) lcp;
! MemoryContextSwitchTo(oldcontext);
! }
!
! /* stuff done on every call of the function */
! funcctx = SRF_PERCALL_SETUP();
! lcp = (ListCell **) funcctx->user_fctx;
!
! while (*lcp != NULL)
! {
! char *channel = (char *) lfirst(*lcp);
!
! *lcp = (*lcp)->next;
! SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
! }
!
! SRF_RETURN_DONE(funcctx);
! }
!
! /*
! * Exec_ListenBeforeCommit --- subroutine for AtCommit_NotifyBeforeCommit
! *
! * Note that we do only set our pointer here and do not yet add the channel to
! * listenChannels. Since our transaction could still roll back we do this only
! * after commit. We know that our tail pointer won't move between here and
! * directly after commit, so we won't miss a notification.
! */
! static void
! Exec_ListenBeforeCommit(const char *channel)
! {
! if (Trace_notify)
! elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
!
! /* Detect whether we are already listening to something. */
! if (listenChannels != NIL)
! return;
/*
! * We need this variable to detect an aborted initial LISTEN.
! * In that case we would set up our pointer but not listen on any channel.
! * This state gets cleaned up again in AtAbort_Notify().
! */
! backendHasExecutedInitialListen = true;
!
! /*
! * This is our first LISTEN, establish our pointer.
! * We set our pointer to the global tail pointer, this way we make
! * sure that we get all of the notifications. We might get a few more
! * but that doesn't hurt.
! */
! LWLockAcquire(AsyncQueueLock, LW_SHARED);
! QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
! QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
! LWLockRelease(AsyncQueueLock);
!
! /*
! * Try to move our pointer forward as far as possible. This will skip
! * over already committed notifications. Still, we could get
! * notifications that have already committed before we started to
! * LISTEN.
! *
! * Note that we are not yet listening on anything, so we won't deliver
! * any notification.
! *
! * This will also advance the global tail pointer if necessary.
! */
! asyncQueueReadAllNotifications();
!
! /*
! * Now that we are listening, make sure we will unlisten before dying.
*/
if (!unlistenExitRegistered)
{
***************
*** 512,550 ****
}
/*
! * Exec_Unlisten --- subroutine for AtCommit_Notify
*
! * Remove the current backend from the list of listening backends
! * for the specified relation.
*/
static void
! Exec_Unlisten(Relation lRel, const char *relname)
{
! HeapScanDesc scan;
! HeapTuple tuple;
if (Trace_notify)
! elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
! scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
! while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
! Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
!
! if (listener->listenerpid == MyProcPid &&
! strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
! /* Found the matching tuple, delete it */
! simple_heap_delete(lRel, &tuple->t_self);
!
! /*
! * We assume there can be only one match, so no need to scan the
! * rest of the table
! */
break;
}
}
! heap_endscan(scan);
/*
* We do not complain about unlistening something not being listened;
--- 890,942 ----
}
/*
! * Exec_ListenAfterCommit --- subroutine for AtCommit_NotifyAfterCommit
! *
! * Add the channel to the list of channels we are listening on.
! */
! static void
! Exec_ListenAfterCommit(const char *channel)
! {
! MemoryContext oldcontext;
!
! /* Detect whether we are already listening on this channel */
! if (IsListeningOn(channel))
! return;
!
! oldcontext = MemoryContextSwitchTo(TopMemoryContext);
! listenChannels = lappend(listenChannels, pstrdup(channel));
! MemoryContextSwitchTo(oldcontext);
! }
!
! /*
! * Exec_UnlistenAfterCommit --- subroutine for AtCommit_NotifyAfterCommit
*
! * Remove a specified channel from "listenChannels".
*/
static void
! Exec_UnlistenAfterCommit(const char *channel)
{
! ListCell *q;
! ListCell *prev;
if (Trace_notify)
! elog(DEBUG1, "Exec_UnlistenAfterCommit(%s,%d)", channel, MyProcPid);
! prev = NULL;
! foreach(q, listenChannels)
{
! char *lchan = (char *) lfirst(q);
! if (strcmp(lchan, channel) == 0)
{
! pfree(lchan);
! listenChannels = list_delete_cell(listenChannels, q, prev);
break;
}
+ prev = q;
}
!
! if (listenChannels == NIL)
! asyncQueueUnregister();
/*
* We do not complain about unlistening something not being listened;
***************
*** 553,690 ****
}
/*
! * Exec_UnlistenAll --- subroutine for AtCommit_Notify
*
! * Update pg_listener to unlisten all relations for this backend.
*/
static void
! Exec_UnlistenAll(Relation lRel)
{
- HeapScanDesc scan;
- HeapTuple lTuple;
- ScanKeyData key[1];
-
if (Trace_notify)
! elog(DEBUG1, "Exec_UnlistenAll");
! /* Find and delete all entries with my listenerPID */
! ScanKeyInit(&key[0],
! Anum_pg_listener_listenerpid,
! BTEqualStrategyNumber, F_INT4EQ,
! Int32GetDatum(MyProcPid));
! scan = heap_beginscan(lRel, SnapshotNow, 1, key);
! while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! simple_heap_delete(lRel, &lTuple->t_self);
! heap_endscan(scan);
}
/*
! * Send_Notify --- subroutine for AtCommit_Notify
! *
! * Scan pg_listener for tuples matching our pending notifies, and
! * either signal the other backend or send a message to our own frontend.
*/
static void
! Send_Notify(Relation lRel)
{
! TupleDesc tdesc = RelationGetDescr(lRel);
! HeapScanDesc scan;
! HeapTuple lTuple,
! rTuple;
! Datum value[Natts_pg_listener];
! bool repl[Natts_pg_listener],
! nulls[Natts_pg_listener];
!
! /* preset data to update notify column to MyProcPid */
! memset(nulls, false, sizeof(nulls));
! memset(repl, false, sizeof(repl));
! repl[Anum_pg_listener_notification - 1] = true;
! memset(value, 0, sizeof(value));
! value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid);
!
! scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
!
! while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! {
! Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! char *relname = NameStr(listener->relname);
! int32 listenerPID = listener->listenerpid;
! if (!AsyncExistsPendingNotify(relname))
! continue;
! if (listenerPID == MyProcPid)
{
! /*
! * Self-notify: no need to bother with table update. Indeed, we
! * *must not* clear the notification field in this path, or we
! * could lose an outside notify, which'd be bad for applications
! * that ignore self-notify messages.
! */
! if (Trace_notify)
! elog(DEBUG1, "AtCommit_Notify: notifying self");
! NotifyMyFrontEnd(relname, listenerPID);
}
else
{
- if (Trace_notify)
- elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
- listenerPID);
-
/*
! * If someone has already notified this listener, we don't bother
! * modifying the table, but we do still send a NOTIFY_INTERRUPT
! * signal, just in case that backend missed the earlier signal for
! * some reason. It's OK to send the signal first, because the
! * other guy can't read pg_listener until we unlock it.
! *
! * Note: we don't have the other guy's BackendId available, so
! * this will incur a search of the ProcSignal table. That's
! * probably not worth worrying about.
*/
! if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT,
! InvalidBackendId) < 0)
{
! /*
! * Get rid of pg_listener entry if it refers to a PID that no
! * longer exists. Presumably, that backend crashed without
! * deleting its pg_listener entries. This code used to only
! * delete the entry if errno==ESRCH, but as far as I can see
! * we should just do it for any failure (certainly at least
! * for EPERM too...)
! */
! simple_heap_delete(lRel, &lTuple->t_self);
}
! else if (listener->notification == 0)
{
! /* Rewrite the tuple with my PID in notification column */
! rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! simple_heap_update(lRel, &lTuple->t_self, rTuple);
!
! #ifdef NOT_USED /* currently there are no indexes */
! CatalogUpdateIndexes(lRel, rTuple);
! #endif
}
}
}
! heap_endscan(scan);
}
/*
* AtAbort_Notify
*
! * This is called at transaction abort.
*
! * Gets rid of pending actions and outbound notifies that we would have
! * executed if the transaction got committed.
*/
void
AtAbort_Notify(void)
{
ClearPendingActionsAndNotifies();
}
--- 945,1325 ----
}
/*
! * Exec_UnlistenAllAfterCommit --- subroutine for AtCommit_Notify
*
! * Unlisten on all channels for this backend.
*/
static void
! Exec_UnlistenAllAfterCommit(void)
{
if (Trace_notify)
! elog(DEBUG1, "Exec_UnlistenAllAferCommit(%d)", MyProcPid);
!
! list_free_deep(listenChannels);
! listenChannels = NIL;
!
! asyncQueueUnregister();
! }
!
! static void
! asyncQueueUnregister(void)
! {
! bool advanceTail = false;
! LWLockAcquire(AsyncQueueLock, LW_SHARED);
! QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
! /*
! * If we have been the last backend, advance the tail pointer.
! */
! if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL))
! advanceTail = true;
! LWLockRelease(AsyncQueueLock);
! if (advanceTail)
! asyncQueueAdvanceTail();
! }
! static bool
! asyncQueueIsFull(void)
! {
! QueuePosition lookahead = QUEUE_HEAD;
! Size remain = QUEUE_PAGESIZE - QUEUE_POS_OFFSET(lookahead) - 1;
! Size advance = Min(remain, NOTIFY_PAYLOAD_MAX_LENGTH);
!
! /*
! * Check what happens if we wrote a maximally sized entry. Would we go to a
! * new page? If not, then our queue can not be full (because we can still
! * fill at least the current page with at least one more entry).
! */
! if (!asyncQueueAdvance(&lookahead, advance))
! return false;
!
! /*
! * The queue is full if with a switch to a new page we reach the page
! * of the tail pointer.
! */
! return QUEUE_POS_PAGE(lookahead) == QUEUE_POS_PAGE(QUEUE_TAIL);
}
/*
! * The function advances the position to the next entry. In case we jump to
! * a new page the function returns true, else false.
*/
+ static bool
+ asyncQueueAdvance(QueuePosition *position, int entryLength)
+ {
+ int pageno = QUEUE_POS_PAGE(*position);
+ int offset = QUEUE_POS_OFFSET(*position);
+ bool pageJump = false;
+
+ /*
+ * Move to the next writing position: First jump over what we have just
+ * written or read.
+ */
+ offset += entryLength;
+ Assert(offset < QUEUE_PAGESIZE);
+
+ /*
+ * In a second step check if another entry can be written to the page. If
+ * it does, stay here, we have reached the next position. If not, then we
+ * need to move on to the next page.
+ */
+ if (offset + AsyncQueueEntryEmptySize >= QUEUE_PAGESIZE)
+ {
+ pageno++;
+ if (pageno > QUEUE_MAX_PAGE)
+ /* wrap around */
+ pageno = 0;
+ offset = 0;
+ pageJump = true;
+ }
+
+ SET_QUEUE_POS(*position, pageno, offset);
+ return pageJump;
+ }
+
+ static void
+ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
+ {
+ Assert(n->channel != NULL);
+ Assert(n->payload != NULL);
+ Assert(strlen(n->payload) < NOTIFY_PAYLOAD_MAX_LENGTH);
+ Assert(strlen(n->channel) < NAMEDATALEN);
+
+ /* The terminators are already included in AsyncQueueEntryEmptySize */
+ qe->length = AsyncQueueEntryEmptySize + strlen(n->payload)
+ + strlen(n->channel);
+ qe->srcPid = MyProcPid;
+ qe->dboid = MyDatabaseId;
+ qe->xid = GetCurrentTransactionId();
+ strcpy(qe->data, n->channel);
+ Assert(*(qe->data + strlen(n->channel)) == '\0');
+ strcpy(qe->data + strlen(n->channel) + 1, n->payload);
+ }
+
static void
! asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n)
{
! n->channel = pstrdup(qe->data);
! Assert(*(qe->data + strlen(qe->data)) == '\0');
! n->payload = pstrdup(qe->data + strlen(qe->data) + 1);
! n->srcPid = qe->srcPid;
! n->xid = qe->xid;
! }
!
! /*
! * Add the notifications to the queue: we go page by page here, i.e. we stop
! * once we have to go to a new page but we will be called again and then fill
! * that next page. If an entry does not fit to a page anymore, we write a dummy
! * entry with an InvalidOid as the database oid in order to fill the page. So
! * every page is always used up to the last byte which simplifies reading the
! * page later.
! *
! * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock
! * here in this function.
! *
! * We are passed the list of notifications to write and return the
! * not-yet-written notifications back. Eventually we will return NIL.
! */
! static List *
! asyncQueueAddEntries(List *notifications)
! {
! AsyncQueueEntry qe;
! int pageno;
! int offset;
! int slotno;
!
! /*
! * Note that we are holding exclusive AsyncQueueLock already.
! */
! LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
! pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
! slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
! AsyncCtl->shared->page_dirty[slotno] = true;
! do
! {
! Notification *n;
! if (asyncQueueIsFull())
{
! /* document that we will not go into the if-block further down */
! Assert(QUEUE_POS_OFFSET(QUEUE_HEAD) != 0);
! break;
! }
!
! n = (Notification *) linitial(notifications);
! asyncQueueNotificationToEntry(n, &qe);
!
! offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
! /*
! * Check whether or not the entry still fits on the current page.
! */
! if (offset + qe.length < QUEUE_PAGESIZE)
! {
! notifications = list_delete_first(notifications);
}
else
{
/*
! * Write a dummy entry to fill up the page. Actually readers will
! * only check dboid and since it won't match any reader's database
! * oid, they will ignore this entry and move on.
*/
! qe.length = QUEUE_PAGESIZE - offset - 1;
! qe.dboid = InvalidOid;
! qe.data[0] = '\0'; /* empty channel */
! qe.data[1] = '\0'; /* empty payload */
! qe.xid = InvalidTransactionId;
! }
! memcpy((char*) AsyncCtl->shared->page_buffer[slotno] + offset,
! &qe, qe.length);
!
! } while (!asyncQueueAdvance(&(QUEUE_HEAD), qe.length)
! && notifications != NIL);
!
! if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
! {
! /*
! * we need to go to continue on a new page, stop here but prepare that
! * page already.
! */
! slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
! AsyncCtl->shared->page_dirty[slotno] = true;
! }
! LWLockRelease(AsyncCtlLock);
!
! return notifications;
! }
!
! /*
! * Here we calculate how full our queue already is. As the queue is quite
! * large, we would probably only see a high filling degree with a long running
! * idle transaction. We don't emit anything if the queue is less than half
! * full.
! *
! * In case it is between 50% to 75% full, we log a warning and calculate the
! * "slowest" backend to give a hint on which backend is preventing cleanup.
! *
! * There's a similar warning in case our queue is more than 75% full.
! *
! * The warnings show up only once every QUEUE_FULL_WARN_INTERVAL.
! */
! static void
! asyncQueueFillWarning(void)
! {
! /*
! * Caller must hold exclusive AsyncQueueLock.
! */
! TimestampTz t;
! double fillDegree;
! int occupied;
! int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
! int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
!
! occupied = headPage - tailPage;
!
! if (occupied == 0)
! return;
!
! if (!asyncQueuePagePrecedesPhysically(tailPage, headPage))
! /* head has wrapped around, tail not yet */
! occupied += QUEUE_MAX_PAGE;
!
! fillDegree = (float) occupied / (float) QUEUE_MAX_PAGE;
!
! if (fillDegree < 0.5)
! return;
!
! t = GetCurrentTimestamp();
!
! if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
! t, QUEUE_FULL_WARN_INTERVAL))
! {
! QueuePosition min = QUEUE_HEAD;
! int32 minPid = InvalidPid;
! int i;
!
! for (i = 0; i < MaxBackends; i++)
! if (QUEUE_BACKEND_PID(i) != InvalidPid)
{
! min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
! if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
! minPid = QUEUE_BACKEND_PID(i);
}
!
! if (fillDegree < 0.75)
! ereport(WARNING, (errmsg("pg_notify queue is more than 50%% full. "
! "Among the slowest backends: %d", minPid),
! errdetail("Cleanup can only proceed if "
! "this backend ends its current "
! "transaction")));
! else
! ereport(WARNING, (errmsg("pg_notify queue is more than 75%% full. "
! "Among the slowest backends: %d", minPid),
! errdetail("Cleanup can only proceed if "
! "this backend ends its current "
! "transaction")));
!
! asyncQueueControl->lastQueueFillWarn = t;
! }
! }
!
! /*
! * Send signals to all listening backends. Since we have EXCLUSIVE lock anyway
! * we also check the position of the other backends and in case that anyone is
! * already up-to-date we don't signal it. This can happen if concurrent
! * notifying transactions have sent a signal and the signaled backend has read
! * the other notifications and ours in the same step.
! *
! * Since we know the BackendId and the Pid the signalling is quite cheap.
! */
! static void
! SignalBackends(void)
! {
! QueuePosition pos;
! ListCell *p1, *p2;
! int i;
! int32 pid;
! List *pids = NIL;
! List *ids = NIL;
! int count = 0;
!
! /* Signal everybody who is LISTENing to any channel. */
! LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
! for (i = 0; i < MaxBackends; i++)
! {
! pid = QUEUE_BACKEND_PID(i);
! if (pid != InvalidPid)
! {
! count++;
! pos = QUEUE_BACKEND_POS(i);
! if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
{
! pids = lappend_int(pids, pid);
! ids = lappend_int(ids, i);
}
}
}
+ LWLockRelease(AsyncQueueLock);
! forboth(p1, pids, p2, ids)
! {
! pid = (int32) lfirst_int(p1);
! i = lfirst_int(p2);
! /*
! * Should we check for failure? Can it happen that a backend
! * has crashed without the postmaster starting over?
! */
! if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, i) < 0)
! elog(WARNING, "Error signalling backend %d", pid);
! }
!
! if (count == 0)
! {
! /* No backend is listening at all, try to clean up the queue.
! * Even if by now (after we determined count to be 0 and now)
! * a backend has started to listen, advancing the tail does not
! * hurt. Our notifications are committed already and a newly
! * listening backend would skip over them anyway. */
! asyncQueueAdvanceTail();
! }
}
/*
* AtAbort_Notify
*
! * This is called at transaction abort.
*
! * Gets rid of pending actions and outbound notifies that we would have
! * executed if the transaction got committed.
! *
! * Even though we have not committed, we need to signal the listening backends
! * because our notifications might block readers from processing the queue.
! * Now that the transaction has aborted, they can go on and skip over our
! * notifications. They could find notifications past ours that they need to
! * deliver.
*/
void
AtAbort_Notify(void)
{
+ if (backendHasSentNotifications)
+ SignalBackends();
+
+ /*
+ * If we LISTEN but then roll back the transaction we have set our pointer
+ * but have not made the entry in listenChannels. In that case, remove
+ * our pointer again.
+ */
+ if (backendHasExecutedInitialListen)
+ /*
+ * Checking listenChannels should be redundant but it can't hurt doing
+ * it for safety reasons.
+ */
+ if (listenChannels == NIL)
+ asyncQueueUnregister();
+
ClearPendingActionsAndNotifies();
}
***************
*** 940,968 ****
}
/*
* ProcessIncomingNotify
*
* Deal with arriving NOTIFYs from other backends.
* This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
* signal handler, or the next time control reaches the outer idle loop.
! * Scan pg_listener for arriving notifies, report them to my front end,
! * and clear the notification field in pg_listener until next time.
*
! * NOTE: since we are outside any transaction, we must create our own.
*/
static void
ProcessIncomingNotify(void)
{
! Relation lRel;
! TupleDesc tdesc;
! ScanKeyData key[1];
! HeapScanDesc scan;
! HeapTuple lTuple,
! rTuple;
! Datum value[Natts_pg_listener];
! bool repl[Natts_pg_listener],
! nulls[Natts_pg_listener];
! bool catchup_enabled;
/* Must prevent catchup interrupt while I am running */
catchup_enabled = DisableCatchupInterrupt();
--- 1575,1809 ----
}
/*
+ * This function will ask for a page with ReadOnly access and once we have the
+ * lock, we read the whole content and pass back the list of notifications
+ * that the calling function will deliver then. The list will contain all
+ * notifications from transactions that have already committed.
+ *
+ * We stop if we have either reached the stop position or go to a new page.
+ *
+ * The function returns true once we have reached the end or a notification of
+ * a transaction that is still running and false if we have finished with
+ * the page. In other words: once it returns true there is no point in calling
+ * it again.
+ */
+ static bool
+ asyncQueueGetEntriesByPage(QueuePosition *current,
+ QueuePosition stop,
+ List **notifications)
+ {
+ AsyncQueueEntry qe;
+ Notification *n;
+ int slotno;
+ bool reachedStop = false;
+
+ if (QUEUE_POS_EQUAL(*current, stop))
+ return true;
+
+ slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, current->page,
+ InvalidTransactionId);
+ do {
+ char *readPtr = (char *) (AsyncCtl->shared->page_buffer[slotno]);
+
+ if (QUEUE_POS_EQUAL(*current, stop))
+ {
+ reachedStop = true;
+ break;
+ }
+
+ readPtr += current->offset;
+ /* at first we only read the header of the notification */
+ memcpy(&qe, readPtr, AsyncQueueEntryEmptySize);
+
+ if (qe.dboid == MyDatabaseId)
+ {
+ if (TransactionIdDidCommit(qe.xid))
+ {
+ memcpy(&qe, readPtr, qe.length);
+ /* qe.data is the NUL terminated channel name */
+ if (IsListeningOn(qe.data))
+ {
+ n = (Notification *) palloc(sizeof(Notification));
+ asyncQueueEntryToNotification(&qe, n);
+ *notifications = lappend(*notifications, n);
+ }
+ }
+ else
+ {
+ if (!TransactionIdDidAbort(qe.xid))
+ {
+ /*
+ * The transaction has neither committed nor aborted so
+ * far.
+ */
+ reachedStop = true;
+ break;
+ }
+ /*
+ * Here we know that the transaction has aborted, we just
+ * ignore its notifications.
+ */
+ }
+ }
+ /*
+ * The call to asyncQueueAdvance just jumps over what we have
+ * just read. If there is no more space for the next record on the
+ * current page, it will also switch to the beginning of the next page.
+ */
+ } while(!asyncQueueAdvance(current, qe.length));
+
+ /*
+ * Release the lock that we implicitly got from
+ * SimpleLruReadPage_ReadOnly().
+ */
+ LWLockRelease(AsyncCtlLock);
+
+ if (QUEUE_POS_EQUAL(*current, stop))
+ reachedStop = true;
+
+ return reachedStop;
+ }
+
+
+ static void
+ asyncQueueReadAllNotifications(void)
+ {
+ QueuePosition pos;
+ QueuePosition oldpos;
+ QueuePosition head;
+ List *notifications;
+ ListCell *lc;
+ Notification *n;
+ bool advanceTail = false;
+ bool reachedStop;
+
+ LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
+ head = QUEUE_HEAD;
+ LWLockRelease(AsyncQueueLock);
+
+ if (QUEUE_POS_EQUAL(pos, head))
+ {
+ /* Nothing to do, we have read all notifications already. */
+ return;
+ }
+
+ do
+ {
+ /*
+ * Our stop position is what we found to be the head's position when
+ * we entered this function. It might have changed already. But if it
+ * has, we will receive (or have already received and queued) another
+ * signal and come here again.
+ *
+ * We are not holding AsyncQueueLock here! The queue can only extend
+ * beyond the head pointer (see above) and we leave our backend's
+ * pointer where it is so nobody will truncate or rewrite pages under
+ * us. Especially we don't want to hold a lock while sending the
+ * notifications to the frontend.
+ */
+ reachedStop = false;
+
+ notifications = NIL;
+ reachedStop = asyncQueueGetEntriesByPage(&pos, head, ¬ifications);
+
+ /*
+ * Note that we deliver everything that we see in the queue and that
+ * matches our _current_ listening state.
+ * Especially we do not take into account different commit times.
+ *
+ * See the following example:
+ *
+ * Backend 1: Backend 2:
+ *
+ * transaction starts
+ * NOTIFY foo;
+ * commit starts
+ * transaction starts
+ * LISTEN foo;
+ * commit starts
+ * commit to clog
+ * commit to clog
+ *
+ * It could happen that backend 2 sees the notification from
+ * backend 1 in the queue and even though the notifying transaction
+ * committed before the listening transaction, we still deliver the
+ * notification.
+ *
+ * The idea is that an additional notification does not do any
+ * harm we just need to make sure that we do not miss a
+ * notification.
+ */
+ foreach(lc, notifications)
+ {
+ n = (Notification *) lfirst(lc);
+ NotifyMyFrontEnd(n->channel, n->payload, n->srcPid);
+ }
+ } while (!reachedStop);
+
+ LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ QUEUE_BACKEND_POS(MyBackendId) = pos;
+ if (QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL))
+ advanceTail = true;
+ LWLockRelease(AsyncQueueLock);
+
+ if (advanceTail)
+ /* Move forward the tail pointer and try to truncate. */
+ asyncQueueAdvanceTail();
+ }
+
+ static void
+ asyncQueueAdvanceTail(void)
+ {
+ QueuePosition min;
+ int i;
+ int tailp;
+ int headp;
+
+ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+ min = QUEUE_HEAD;
+ for (i = 0; i < MaxBackends; i++)
+ if (QUEUE_BACKEND_PID(i) != InvalidPid)
+ min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD);
+
+ tailp = QUEUE_POS_PAGE(QUEUE_TAIL);
+ headp = QUEUE_POS_PAGE(QUEUE_HEAD);
+ QUEUE_TAIL = min;
+ LWLockRelease(AsyncQueueLock);
+
+ /* This is our wraparound check */
+ if ((asyncQueuePagePrecedesLogically(tailp, QUEUE_POS_PAGE(min), headp)
+ && asyncQueuePagePrecedesPhysically(tailp, headp))
+ || tailp == QUEUE_POS_PAGE(min))
+ {
+ /*
+ * SimpleLruTruncate() will ask for AsyncCtlLock but will also
+ * release the lock again.
+ *
+ * XXX this could be optimized, to call SimpleLruTruncate only when we
+ * know that we can truncate something.
+ */
+ SimpleLruTruncate(AsyncCtl, QUEUE_POS_PAGE(min));
+ }
+ }
+
+ /*
* ProcessIncomingNotify
*
* Deal with arriving NOTIFYs from other backends.
* This is called either directly from the PROCSIG_NOTIFY_INTERRUPT
* signal handler, or the next time control reaches the outer idle loop.
! * Scan the queue for arriving notifications and report them to my front
! * end.
*
! * NOTE: we are outside of any transaction here.
*/
static void
ProcessIncomingNotify(void)
{
! bool catchup_enabled;
!
! Assert(GetCurrentTransactionIdIfAny() == InvalidTransactionId);
/* Must prevent catchup interrupt while I am running */
catchup_enabled = DisableCatchupInterrupt();
***************
*** 974,1037 ****
notifyInterruptOccurred = 0;
! StartTransactionCommand();
!
! lRel = heap_open(ListenerRelationId, ExclusiveLock);
! tdesc = RelationGetDescr(lRel);
!
! /* Scan only entries with my listenerPID */
! ScanKeyInit(&key[0],
! Anum_pg_listener_listenerpid,
! BTEqualStrategyNumber, F_INT4EQ,
! Int32GetDatum(MyProcPid));
! scan = heap_beginscan(lRel, SnapshotNow, 1, key);
!
! /* Prepare data for rewriting 0 into notification field */
! memset(nulls, false, sizeof(nulls));
! memset(repl, false, sizeof(repl));
! repl[Anum_pg_listener_notification - 1] = true;
! memset(value, 0, sizeof(value));
! value[Anum_pg_listener_notification - 1] = Int32GetDatum(0);
!
! while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
! {
! Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
! char *relname = NameStr(listener->relname);
! int32 sourcePID = listener->notification;
!
! if (sourcePID != 0)
! {
! /* Notify the frontend */
!
! if (Trace_notify)
! elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
! relname, (int) sourcePID);
!
! NotifyMyFrontEnd(relname, sourcePID);
!
! /*
! * Rewrite the tuple with 0 in notification column.
! */
! rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
! simple_heap_update(lRel, &lTuple->t_self, rTuple);
!
! #ifdef NOT_USED /* currently there are no indexes */
! CatalogUpdateIndexes(lRel, rTuple);
! #endif
! }
! }
! heap_endscan(scan);
!
! /*
! * We do NOT release the lock on pg_listener here; we need to hold it
! * until end of transaction (which is about to happen, anyway) to ensure
! * that other backends see our tuple updates when they look. Otherwise, a
! * transaction started after this one might mistakenly think it doesn't
! * need to send this backend a new NOTIFY.
! */
! heap_close(lRel, NoLock);
!
! CommitTransactionCommand();
/*
* Must flush the notify messages to ensure frontend gets them promptly.
--- 1815,1821 ----
notifyInterruptOccurred = 0;
! asyncQueueReadAllNotifications();
/*
* Must flush the notify messages to ensure frontend gets them promptly.
***************
*** 1051,1070 ****
* Send NOTIFY message to my front end.
*/
static void
! NotifyMyFrontEnd(char *relname, int32 listenerPID)
{
if (whereToSendOutput == DestRemote)
{
StringInfoData buf;
pq_beginmessage(&buf, 'A');
! pq_sendint(&buf, listenerPID, sizeof(int32));
! pq_sendstring(&buf, relname);
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! {
! /* XXX Add parameter string here later */
! pq_sendstring(&buf, "");
! }
pq_endmessage(&buf);
/*
--- 1835,1851 ----
* Send NOTIFY message to my front end.
*/
static void
! NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
{
if (whereToSendOutput == DestRemote)
{
StringInfoData buf;
pq_beginmessage(&buf, 'A');
! pq_sendint(&buf, srcPid, sizeof(int32));
! pq_sendstring(&buf, channel);
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
! pq_sendstring(&buf, payload);
pq_endmessage(&buf);
/*
***************
*** 1074,1096 ****
*/
}
else
! elog(INFO, "NOTIFY for %s", relname);
}
! /* Does pendingNotifies include the given relname? */
static bool
! AsyncExistsPendingNotify(const char *relname)
{
ListCell *p;
! foreach(p, pendingNotifies)
! {
! const char *prelname = (const char *) lfirst(p);
! if (strcmp(prelname, relname) == 0)
return true;
}
return false;
}
--- 1855,1919 ----
*/
}
else
! elog(INFO, "NOTIFY for %s", channel);
}
! /* Does pendingNotifies include the given channel/payload? */
static bool
! AsyncExistsPendingNotify(const char *channel, const char *payload)
{
ListCell *p;
+ Notification *n;
! if (pendingNotifies == NIL)
! return false;
!
! if (payload == NULL)
! payload = "";
! /*
! * We need to append new elements to the end of the list in order to keep
! * the order. However, on the other hand we'd like to check the list
! * backwards in order to make duplicate-elimination a tad faster when the
! * same condition is signaled many times in a row. So as a compromise we
! * check the tail element first which we can access directly. If this
! * doesn't match, we check the rest of whole list.
! *
! * As we are not checking our parents' lists, we can still get duplicates
! * in combination with subtransactions, like in:
! *
! * begin;
! * notify foo '1';
! * savepoint foo;
! * notify foo '1';
! * commit;
! */
! n = (Notification *) llast(pendingNotifies);
! if (strcmp(n->channel, channel) == 0)
! {
! Assert(n->payload != NULL);
! if (strcmp(n->payload, payload) == 0)
return true;
}
+ /*
+ * Note the difference to foreach(). We stop if p is the last element
+ * already. So we don't check the last element, we have checked it already.
+ */
+ for(p = list_head(pendingNotifies);
+ p != list_tail(pendingNotifies);
+ p = lnext(p))
+ {
+ n = (Notification *) lfirst(p);
+
+ if (strcmp(n->channel, channel) == 0)
+ {
+ Assert(n->payload != NULL);
+ if (strcmp(n->payload, payload) == 0)
+ return true;
+ }
+ }
+
return false;
}
***************
*** 1107,1128 ****
*/
pendingActions = NIL;
pendingNotifies = NIL;
- }
! /*
! * 2PC processing routine for COMMIT PREPARED case.
! *
! * (We don't have to do anything for ROLLBACK PREPARED.)
! */
! void
! notify_twophase_postcommit(TransactionId xid, uint16 info,
! void *recdata, uint32 len)
! {
! /*
! * Set up to issue the NOTIFY at the end of my own current transaction.
! * (XXX this has some issues if my own transaction later rolls back, or if
! * there is any significant delay before I commit. OK for now because we
! * disallow COMMIT PREPARED inside a transaction block.)
! */
! Async_Notify((char *) recdata);
}
--- 1930,1937 ----
*/
pendingActions = NIL;
pendingNotifies = NIL;
! backendHasSentNotifications = false;
! backendHasExecutedInitialListen = false;
}
+
diff -cr cvs.head/src/backend/nodes/copyfuncs.c cvs.build/src/backend/nodes/copyfuncs.c
*** cvs.head/src/backend/nodes/copyfuncs.c 2010-02-14 16:02:46.000000000 +0100
--- cvs.build/src/backend/nodes/copyfuncs.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 2777,2782 ****
--- 2777,2783 ----
NotifyStmt *newnode = makeNode(NotifyStmt);
COPY_STRING_FIELD(conditionname);
+ COPY_STRING_FIELD(payload);
return newnode;
}
diff -cr cvs.head/src/backend/nodes/equalfuncs.c cvs.build/src/backend/nodes/equalfuncs.c
*** cvs.head/src/backend/nodes/equalfuncs.c 2010-02-14 16:02:46.000000000 +0100
--- cvs.build/src/backend/nodes/equalfuncs.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 1325,1330 ****
--- 1325,1331 ----
_equalNotifyStmt(NotifyStmt *a, NotifyStmt *b)
{
COMPARE_STRING_FIELD(conditionname);
+ COMPARE_STRING_FIELD(payload);
return true;
}
diff -cr cvs.head/src/backend/nodes/outfuncs.c cvs.build/src/backend/nodes/outfuncs.c
*** cvs.head/src/backend/nodes/outfuncs.c 2010-02-14 16:02:46.000000000 +0100
--- cvs.build/src/backend/nodes/outfuncs.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 1820,1825 ****
--- 1820,1826 ----
WRITE_NODE_TYPE("NOTIFY");
WRITE_STRING_FIELD(conditionname);
+ WRITE_STRING_FIELD(payload);
}
static void
diff -cr cvs.head/src/backend/nodes/readfuncs.c cvs.build/src/backend/nodes/readfuncs.c
*** cvs.head/src/backend/nodes/readfuncs.c 2010-02-14 16:02:46.000000000 +0100
--- cvs.build/src/backend/nodes/readfuncs.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 231,236 ****
--- 231,237 ----
READ_LOCALS(NotifyStmt);
READ_STRING_FIELD(conditionname);
+ READ_STRING_FIELD(payload);
READ_DONE();
}
diff -cr cvs.head/src/backend/parser/gram.y cvs.build/src/backend/parser/gram.y
*** cvs.head/src/backend/parser/gram.y 2010-02-14 16:02:47.000000000 +0100
--- cvs.build/src/backend/parser/gram.y 2010-02-14 16:04:25.000000000 +0100
***************
*** 400,406 ****
%type Iconst SignedIconst
%type Iconst_list
! %type Sconst comment_text
%type RoleId opt_granted_by opt_boolean ColId_or_Sconst
%type var_list
%type ColId ColLabel var_name type_function_name param_name
--- 400,406 ----
%type Iconst SignedIconst
%type Iconst_list
! %type Sconst comment_text notify_payload
%type RoleId opt_granted_by opt_boolean ColId_or_Sconst
%type var_list
%type ColId ColLabel var_name type_function_name param_name
***************
*** 6123,6132 ****
*
*****************************************************************************/
! NotifyStmt: NOTIFY ColId
{
NotifyStmt *n = makeNode(NotifyStmt);
n->conditionname = $2;
$$ = (Node *)n;
}
;
--- 6123,6138 ----
*
*****************************************************************************/
! notify_payload:
! Sconst { $$ = $1; }
! | /*EMPTY*/ { $$ = NULL; }
! ;
!
! NotifyStmt: NOTIFY ColId notify_payload
{
NotifyStmt *n = makeNode(NotifyStmt);
n->conditionname = $2;
+ n->payload = $3;
$$ = (Node *)n;
}
;
diff -cr cvs.head/src/backend/storage/ipc/ipci.c cvs.build/src/backend/storage/ipc/ipci.c
*** cvs.head/src/backend/storage/ipc/ipci.c 2010-01-20 20:08:27.000000000 +0100
--- cvs.build/src/backend/storage/ipc/ipci.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 20,25 ****
--- 20,26 ----
#include "access/nbtree.h"
#include "access/subtrans.h"
#include "access/twophase.h"
+ #include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
***************
*** 225,230 ****
--- 226,232 ----
*/
BTreeShmemInit();
SyncScanShmemInit();
+ AsyncShmemInit();
#ifdef EXEC_BACKEND
diff -cr cvs.head/src/backend/storage/lmgr/lwlock.c cvs.build/src/backend/storage/lmgr/lwlock.c
*** cvs.head/src/backend/storage/lmgr/lwlock.c 2010-01-05 12:39:29.000000000 +0100
--- cvs.build/src/backend/storage/lmgr/lwlock.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 24,29 ****
--- 24,30 ----
#include "access/clog.h"
#include "access/multixact.h"
#include "access/subtrans.h"
+ #include "commands/async.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "storage/ipc.h"
***************
*** 174,179 ****
--- 175,183 ----
/* multixact.c needs two SLRU areas */
numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
+ /* async.c needs one per page for the AsyncQueue */
+ numLocks += NUM_ASYNC_BUFFERS;
+
/*
* Add any requested by loadable modules; for backwards-compatibility
* reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
diff -cr cvs.head/src/backend/tcop/utility.c cvs.build/src/backend/tcop/utility.c
*** cvs.head/src/backend/tcop/utility.c 2010-01-30 22:06:36.000000000 +0100
--- cvs.build/src/backend/tcop/utility.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 930,936 ****
NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery();
! Async_Notify(stmt->conditionname);
}
break;
--- 930,936 ----
NotifyStmt *stmt = (NotifyStmt *) parsetree;
PreventCommandDuringRecovery();
! Async_Notify(stmt->conditionname, stmt->payload);
}
break;
diff -cr cvs.head/src/bin/initdb/initdb.c cvs.build/src/bin/initdb/initdb.c
*** cvs.head/src/bin/initdb/initdb.c 2010-01-30 22:06:37.000000000 +0100
--- cvs.build/src/bin/initdb/initdb.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 2458,2463 ****
--- 2458,2464 ----
"pg_xlog",
"pg_xlog/archive_status",
"pg_clog",
+ "pg_notify",
"pg_subtrans",
"pg_twophase",
"pg_multixact/members",
diff -cr cvs.head/src/bin/psql/common.c cvs.build/src/bin/psql/common.c
*** cvs.head/src/bin/psql/common.c 2010-01-05 12:39:33.000000000 +0100
--- cvs.build/src/bin/psql/common.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 555,562 ****
while ((notify = PQnotifies(pset.db)))
{
! fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"),
! notify->relname, notify->be_pid);
fflush(pset.queryFout);
PQfreemem(notify);
}
--- 555,562 ----
while ((notify = PQnotifies(pset.db)))
{
! fprintf(pset.queryFout, _("Asynchronous notification \"%s\" (%s) received from server process with PID %d.\n"),
! notify->relname, notify->extra, notify->be_pid);
fflush(pset.queryFout);
PQfreemem(notify);
}
diff -cr cvs.head/src/bin/psql/tab-complete.c cvs.build/src/bin/psql/tab-complete.c
*** cvs.head/src/bin/psql/tab-complete.c 2010-01-30 22:06:37.000000000 +0100
--- cvs.build/src/bin/psql/tab-complete.c 2010-02-14 16:04:25.000000000 +0100
***************
*** 1852,1858 ****
/* NOTIFY */
else if (pg_strcasecmp(prev_wd, "NOTIFY") == 0)
! COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s'");
/* OPTIONS */
else if (pg_strcasecmp(prev_wd, "OPTIONS") == 0)
--- 1852,1858 ----
/* NOTIFY */
else if (pg_strcasecmp(prev_wd, "NOTIFY") == 0)
! COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s'");
/* OPTIONS */
else if (pg_strcasecmp(prev_wd, "OPTIONS") == 0)
***************
*** 2093,2099 ****
/* UNLISTEN */
else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0)
! COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s' UNION SELECT '*'");
/* UPDATE */
/* If prev. word is UPDATE suggest a list of tables */
--- 2093,2099 ----
/* UNLISTEN */
else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0)
! COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s' UNION SELECT '*'");
/* UPDATE */
/* If prev. word is UPDATE suggest a list of tables */
diff -cr cvs.head/src/include/access/slru.h cvs.build/src/include/access/slru.h
*** cvs.head/src/include/access/slru.h 2010-01-05 12:39:34.000000000 +0100
--- cvs.build/src/include/access/slru.h 2010-02-14 16:04:25.000000000 +0100
***************
*** 16,21 ****
--- 16,40 ----
#include "access/xlogdefs.h"
#include "storage/lwlock.h"
+ /*
+ * Define segment size. A page is the same BLCKSZ as is used everywhere
+ * else in Postgres. The segment size can be chosen somewhat arbitrarily;
+ * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG
+ * or 64K transactions for SUBTRANS.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where
+ * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at
+ * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need
+ * take no explicit notice of that fact in this module, except when comparing
+ * segment and page numbers in SimpleLruTruncate (see PagePrecedes()).
+ *
+ * Note: this file currently assumes that segment file names will be four
+ * hex digits. This sets a lower bound on the segment size (64K transactions
+ * for 32-bit TransactionIds).
+ */
+ #define SLRU_PAGES_PER_SEGMENT 32
+
/*
* Page status codes. Note that these do not include the "dirty" bit.
diff -cr cvs.head/src/include/access/twophase_rmgr.h cvs.build/src/include/access/twophase_rmgr.h
*** cvs.head/src/include/access/twophase_rmgr.h 2010-01-05 12:39:34.000000000 +0100
--- cvs.build/src/include/access/twophase_rmgr.h 2010-02-14 16:04:25.000000000 +0100
***************
*** 23,31 ****
*/
#define TWOPHASE_RM_END_ID 0
#define TWOPHASE_RM_LOCK_ID 1
! #define TWOPHASE_RM_NOTIFY_ID 2
! #define TWOPHASE_RM_PGSTAT_ID 3
! #define TWOPHASE_RM_MULTIXACT_ID 4
#define TWOPHASE_RM_MAX_ID TWOPHASE_RM_MULTIXACT_ID
extern const TwoPhaseCallback twophase_recover_callbacks[];
--- 23,30 ----
*/
#define TWOPHASE_RM_END_ID 0
#define TWOPHASE_RM_LOCK_ID 1
! #define TWOPHASE_RM_PGSTAT_ID 2
! #define TWOPHASE_RM_MULTIXACT_ID 3
#define TWOPHASE_RM_MAX_ID TWOPHASE_RM_MULTIXACT_ID
extern const TwoPhaseCallback twophase_recover_callbacks[];
diff -cr cvs.head/src/include/catalog/pg_proc.h cvs.build/src/include/catalog/pg_proc.h
*** cvs.head/src/include/catalog/pg_proc.h 2010-02-10 20:33:17.000000000 +0100
--- cvs.build/src/include/catalog/pg_proc.h 2010-02-14 16:04:25.000000000 +0100
***************
*** 4127,4132 ****
--- 4127,4136 ----
DESCR("get the prepared statements for this session");
DATA(insert OID = 2511 ( pg_cursor PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,16,16,16,1184}" "{o,o,o,o,o,o}" "{name,statement,is_holdable,is_binary,is_scrollable,creation_time}" _null_ pg_cursor _null_ _null_ _null_ ));
DESCR("get the open cursors for this session");
+ DATA(insert OID = 3036 ( pg_listening PGNSP PGUID 12 1 10 0 f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ pg_listening _null_ _null_ _null_ ));
+ DESCR("get the channels that the current backend listens to");
+ DATA(insert OID = 3035 ( pg_notify PGNSP PGUID 12 1 0 0 f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_));
+ DESCR("send a notification to clients");
DATA(insert OID = 2599 ( pg_timezone_abbrevs PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,1186,16}" "{o,o,o}" "{abbrev,utc_offset,is_dst}" _null_ pg_timezone_abbrevs _null_ _null_ _null_ ));
DESCR("get the available time zone abbreviations");
DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,1186,16}" "{o,o,o,o}" "{name,abbrev,utc_offset,is_dst}" _null_ pg_timezone_names _null_ _null_ _null_ ));
diff -cr cvs.head/src/include/commands/async.h cvs.build/src/include/commands/async.h
*** cvs.head/src/include/commands/async.h 2010-01-05 12:39:35.000000000 +0100
--- cvs.build/src/include/commands/async.h 2010-02-14 16:48:57.000000000 +0100
***************
*** 13,28 ****
#ifndef ASYNC_H
#define ASYNC_H
extern bool Trace_notify;
/* notify-related SQL statements */
! extern void Async_Notify(const char *relname);
extern void Async_Listen(const char *relname);
extern void Async_Unlisten(const char *relname);
extern void Async_UnlistenAll(void);
/* perform (or cancel) outbound notify processing at transaction commit */
! extern void AtCommit_Notify(void);
extern void AtAbort_Notify(void);
extern void AtSubStart_Notify(void);
extern void AtSubCommit_Notify(void);
--- 13,41 ----
#ifndef ASYNC_H
#define ASYNC_H
+ /*
+ * Maximum size of the payload, including terminating NULL.
+ */
+ #define NOTIFY_PAYLOAD_MAX_LENGTH 8000
+
+ /*
+ * The number of page slots that we reserve.
+ */
+ #define NUM_ASYNC_BUFFERS 8
+
extern bool Trace_notify;
+ extern void AsyncShmemInit(void);
+
/* notify-related SQL statements */
! extern void Async_Notify(const char *relname, const char *payload);
extern void Async_Listen(const char *relname);
extern void Async_Unlisten(const char *relname);
extern void Async_UnlistenAll(void);
/* perform (or cancel) outbound notify processing at transaction commit */
! extern void AtCommit_NotifyBeforeCommit(void);
! extern void AtCommit_NotifyAfterCommit(void);
extern void AtAbort_Notify(void);
extern void AtSubStart_Notify(void);
extern void AtSubCommit_Notify(void);
***************
*** 43,46 ****
--- 56,62 ----
extern void notify_twophase_postcommit(TransactionId xid, uint16 info,
void *recdata, uint32 len);
+ extern Datum pg_listening(PG_FUNCTION_ARGS);
+ extern Datum pg_notify(PG_FUNCTION_ARGS);
+
#endif /* ASYNC_H */
diff -cr cvs.head/src/include/nodes/parsenodes.h cvs.build/src/include/nodes/parsenodes.h
*** cvs.head/src/include/nodes/parsenodes.h 2010-02-14 16:02:49.000000000 +0100
--- cvs.build/src/include/nodes/parsenodes.h 2010-02-14 16:04:25.000000000 +0100
***************
*** 2097,2102 ****
--- 2097,2103 ----
{
NodeTag type;
char *conditionname; /* condition name to notify */
+ char *payload; /* the payload string to be conveyed */
} NotifyStmt;
/* ----------------------
diff -cr cvs.head/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h
*** cvs.head/src/include/storage/lwlock.h 2010-02-10 20:33:18.000000000 +0100
--- cvs.build/src/include/storage/lwlock.h 2010-02-14 16:04:25.000000000 +0100
***************
*** 68,73 ****
--- 68,75 ----
AutovacuumScheduleLock,
SyncScanLock,
RelationMappingLock,
+ AsyncCtlLock,
+ AsyncQueueLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff -cr cvs.head/src/include/utils/errcodes.h cvs.build/src/include/utils/errcodes.h
*** cvs.head/src/include/utils/errcodes.h 2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/utils/errcodes.h 2010-02-14 16:04:25.000000000 +0100
***************
*** 318,323 ****
--- 318,324 ----
#define ERRCODE_STATEMENT_TOO_COMPLEX MAKE_SQLSTATE('5','4', '0','0','1')
#define ERRCODE_TOO_MANY_COLUMNS MAKE_SQLSTATE('5','4', '0','1','1')
#define ERRCODE_TOO_MANY_ARGUMENTS MAKE_SQLSTATE('5','4', '0','2','3')
+ #define ERRCODE_TOO_MANY_ENTRIES MAKE_SQLSTATE('5','4', '0','3','1')
/* Class 55 - Object Not In Prerequisite State (class borrowed from DB2) */
#define ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE MAKE_SQLSTATE('5','5', '0','0','0')
diff -cr cvs.head/src/test/regress/expected/guc.out cvs.build/src/test/regress/expected/guc.out
*** cvs.head/src/test/regress/expected/guc.out 2009-11-22 06:20:41.000000000 +0100
--- cvs.build/src/test/regress/expected/guc.out 2010-02-14 16:04:25.000000000 +0100
***************
*** 532,540 ****
CREATE ROLE temp_reset_user;
SET SESSION AUTHORIZATION temp_reset_user;
-- look changes
! SELECT relname FROM pg_listener;
! relname
! -----------
foo_event
(1 row)
--- 532,540 ----
CREATE ROLE temp_reset_user;
SET SESSION AUTHORIZATION temp_reset_user;
-- look changes
! SELECT pg_listening();
! pg_listening
! --------------
foo_event
(1 row)
***************
*** 571,579 ****
-- discard everything
DISCARD ALL;
-- look again
! SELECT relname FROM pg_listener;
! relname
! ---------
(0 rows)
SELECT name FROM pg_prepared_statements;
--- 571,579 ----
-- discard everything
DISCARD ALL;
-- look again
! SELECT pg_listening();
! pg_listening
! --------------
(0 rows)
SELECT name FROM pg_prepared_statements;
diff -cr cvs.head/src/test/regress/expected/sanity_check.out cvs.build/src/test/regress/expected/sanity_check.out
*** cvs.head/src/test/regress/expected/sanity_check.out 2010-01-20 20:08:32.000000000 +0100
--- cvs.build/src/test/regress/expected/sanity_check.out 2010-02-14 16:04:25.000000000 +0100
***************
*** 107,113 ****
pg_language | t
pg_largeobject | t
pg_largeobject_metadata | t
- pg_listener | f
pg_namespace | t
pg_opclass | t
pg_operator | t
--- 107,112 ----
***************
*** 154,160 ****
timetz_tbl | f
tinterval_tbl | f
varchar_tbl | f
! (143 rows)
--
-- another sanity check: every system catalog that has OIDs should have
--- 153,159 ----
timetz_tbl | f
tinterval_tbl | f
varchar_tbl | f
! (142 rows)
--
-- another sanity check: every system catalog that has OIDs should have
diff -cr cvs.head/src/test/regress/sql/guc.sql cvs.build/src/test/regress/sql/guc.sql
*** cvs.head/src/test/regress/sql/guc.sql 2009-10-21 22:38:58.000000000 +0200
--- cvs.build/src/test/regress/sql/guc.sql 2010-02-14 16:04:25.000000000 +0100
***************
*** 165,171 ****
CREATE ROLE temp_reset_user;
SET SESSION AUTHORIZATION temp_reset_user;
-- look changes
! SELECT relname FROM pg_listener;
SELECT name FROM pg_prepared_statements;
SELECT name FROM pg_cursors;
SHOW vacuum_cost_delay;
--- 165,171 ----
CREATE ROLE temp_reset_user;
SET SESSION AUTHORIZATION temp_reset_user;
-- look changes
! SELECT pg_listening();
SELECT name FROM pg_prepared_statements;
SELECT name FROM pg_cursors;
SHOW vacuum_cost_delay;
***************
*** 174,180 ****
-- discard everything
DISCARD ALL;
-- look again
! SELECT relname FROM pg_listener;
SELECT name FROM pg_prepared_statements;
SELECT name FROM pg_cursors;
SHOW vacuum_cost_delay;
--- 174,180 ----
-- discard everything
DISCARD ALL;
-- look again
! SELECT pg_listening();
SELECT name FROM pg_prepared_statements;
SELECT name FROM pg_cursors;
SHOW vacuum_cost_delay;