diff -cr cvs.head/doc/src/sgml/catalogs.sgml cvs.build/doc/src/sgml/catalogs.sgml *** cvs.head/doc/src/sgml/catalogs.sgml 2010-02-10 20:32:57.000000000 +0100 --- cvs.build/doc/src/sgml/catalogs.sgml 2010-02-14 16:04:25.000000000 +0100 *************** *** 169,179 **** - pg_listener - asynchronous notification support - - - pg_namespace schemas --- 169,174 ---- *************** *** 3253,3320 **** - - <structname>pg_listener</structname> - - - pg_listener - - - - The catalog pg_listener supports the - and - - commands. A listener creates an entry in - pg_listener for each notification name - it is listening for. A notifier scans pg_listener - and updates each matching entry to show that a notification has occurred. - The notifier also sends a signal (using the PID recorded in the table) - to awaken the listener from sleep. - - - - <structname>pg_listener</> Columns - - - - - Name - Type - Description - - - - - - relname - name - - Notify condition name. (The name need not match any actual - relation in the database; the name relname is historical.) - - - - - listenerpid - int4 - PID of the server process that created this entry - - - - notification - int4 - - Zero if no event is pending for this listener. If an event is - pending, the PID of the server process that sent the notification - - - - -
- -
- - <structname>pg_namespace</structname> --- 3248,3253 ---- diff -cr cvs.head/doc/src/sgml/func.sgml cvs.build/doc/src/sgml/func.sgml *** cvs.head/doc/src/sgml/func.sgml 2010-02-14 16:02:44.000000000 +0100 --- cvs.build/doc/src/sgml/func.sgml 2010-02-14 16:04:25.000000000 +0100 *************** *** 11574,11579 **** --- 11574,11585 ---- + pg_listening() + set of text + channels that the session is currently listening on + + + session_user name session user name *************** *** 11740,11745 **** --- 11746,11760 ---- + pg_listening + + + + pg_listening returns a set of channels that the + current session is listening to. See for more information. + + + version diff -cr cvs.head/doc/src/sgml/libpq.sgml cvs.build/doc/src/sgml/libpq.sgml *** cvs.head/doc/src/sgml/libpq.sgml 2010-02-10 20:32:59.000000000 +0100 --- cvs.build/doc/src/sgml/libpq.sgml 2010-02-14 16:04:25.000000000 +0100 *************** *** 4116,4125 **** can stop listening with the UNLISTEN command). All sessions listening on a particular condition will be notified asynchronously when a NOTIFY command with that ! condition name is executed by any session. No additional information ! is passed from the notifier to the listener. Thus, typically, any ! actual data that needs to be communicated is transferred through a ! database table. Commonly, the condition name is the same as the associated table, but it is not necessary for there to be any associated table. --- 4116,4124 ---- can stop listening with the UNLISTEN command). All sessions listening on a particular condition will be notified asynchronously when a NOTIFY command with that ! condition name is executed by any session. A notification parameter can be ! used to communicate additional data to a listener. ! Commonly, the condition name is the same as the associated table, but it is not necessary for there to be any associated table. *************** *** 4153,4161 **** PQfreemem. It is sufficient to free the PGnotify pointer; the relname and extra ! fields do not represent separate allocations. (At present, the ! extra field is unused and will always point ! to an empty string.) --- 4152,4158 ---- PQfreemem. It is sufficient to free the PGnotify pointer; the relname and extra ! fields do not represent separate allocations. diff -cr cvs.head/doc/src/sgml/protocol.sgml cvs.build/doc/src/sgml/protocol.sgml *** cvs.head/doc/src/sgml/protocol.sgml 2010-02-10 20:32:59.000000000 +0100 --- cvs.build/doc/src/sgml/protocol.sgml 2010-02-14 16:04:25.000000000 +0100 *************** *** 3192,3199 **** Additional information passed from the notifying process. - (Currently, this feature is unimplemented so the field - is always an empty string.) --- 3192,3197 ---- diff -cr cvs.head/doc/src/sgml/ref/listen.sgml cvs.build/doc/src/sgml/ref/listen.sgml *** cvs.head/doc/src/sgml/ref/listen.sgml 2008-11-14 11:22:47.000000000 +0100 --- cvs.build/doc/src/sgml/ref/listen.sgml 2010-02-14 16:04:25.000000000 +0100 *************** *** 74,79 **** --- 74,86 ---- + Notes + + A transaction that has executed LISTEN cannot be prepared for a two-phase commit. + + + + Parameters *************** *** 98,104 **** LISTEN virtual; NOTIFY virtual; ! Asynchronous notification "virtual" received from server process with PID 8448. --- 105,111 ---- LISTEN virtual; NOTIFY virtual; ! Asynchronous notification "virtual" () received from server process with PID 8448. diff -cr cvs.head/doc/src/sgml/ref/notify.sgml cvs.build/doc/src/sgml/ref/notify.sgml *** cvs.head/doc/src/sgml/ref/notify.sgml 2008-11-14 11:22:47.000000000 +0100 --- cvs.build/doc/src/sgml/ref/notify.sgml 2010-02-14 18:03:14.000000000 +0100 *************** *** 21,27 **** ! NOTIFY name --- 21,27 ---- ! NOTIFY name [ parameter ] *************** *** 29,36 **** Description ! The NOTIFY command sends a notification event to each ! client application that has previously executed LISTEN name for the specified notification name in the current database. --- 29,37 ---- Description ! The NOTIFY command sends a notification event together ! with an optional notification parameter to each client application that has ! previously executed LISTEN name for the specified notification name in the current database. *************** *** 39,54 **** NOTIFY provides a simple form of signal or interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. ! Higher-level mechanisms can be built by using tables in the database to ! pass additional data (beyond a mere notification name) from notifier to ! listener(s). The information passed to the client for a notification event includes the notification ! name and the notifying session's server process PID. It is up to the ! database designer to define the notification names that will be used in a given ! database and what each one means. --- 40,56 ---- NOTIFY provides a simple form of signal or interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. ! A notification parameter can be sent along with the notification and ! higher-level mechanisms for passing structured data can be built by using ! tables in the database to pass additional data from notifier to listener(s). The information passed to the client for a notification event includes the notification ! name, the notifying session's server process PID and the ! notification parameter (payload) which is an empty string if it has not been specified. ! It is up to the database designer to define the notification names that will ! be used in a given database and what each one means. *************** *** 89,102 **** ! NOTIFY behaves like Unix signals in one important ! respect: if the same notification name is signaled multiple times in quick ! succession, recipients might get only one notification event for several executions ! of NOTIFY. So it is a bad idea to depend on the number ! of notifications received. Instead, use NOTIFY to wake up ! applications that need to pay attention to something, and use a database ! object (such as a sequence) to keep track of what happened or how many times ! it happened. --- 91,111 ---- ! If the same notification name is signaled multiple times from the same ! transaction and with an identical notification parameter (or an empty one), the ! database server can decide to deliver a single notification only. ! On the other hand, notifications with distinct notification parameters will ! always be delivered as distinct notifications. Similarly, notifications from ! different transactions will never get folded into one notification. ! NOTIFY also guarantees that notifications from the same ! transaction get delivered in the order they were sent. ! ! ! ! An alternative to specifying a notification parameter is to use NOTIFY to ! wake up applications that need to pay attention to something, and use a ! database object (such as a sequence) to keep track of what happened or how many ! times it happened. *************** *** 111,122 **** notification event message) is the same as one's own session's PID (available from libpq). When they are the same, the notification event is one's own work bouncing ! back, and can be ignored. (Despite what was said in the preceding ! paragraph, this is a safe technique. ! PostgreSQL keeps self-notifications ! separate from notifications arriving from other sessions, so you ! cannot miss an outside notification by ignoring your own ! notifications.) --- 120,154 ---- notification event message) is the same as one's own session's PID (available from libpq). When they are the same, the notification event is one's own work bouncing ! back, and can be ignored. ! ! ! ! ! Notes ! ! A transaction that has executed LISTEN cannot be prepared for a two-phase commit. ! ! ! To send a notification you can also use the function ! pg_notify(text, ! text). The function takes the channel name as the ! first argument and the payload as the second. This could be more convenient ! to use in triggers and you can also use a non-constant channel name and ! parameter value. ! ! ! There is a limited queue for notifications. In case this queue is full, all ! transactions calling NOTIFY will fail. ! ! ! The queue it is quite large and should be sufficiently sized for almost ! every use case. However, no cleanup can take place if one of your backends ! executes LISTEN and then enters a transaction for a very ! long time. Once the queue is half full you will see warnings in the log file ! pointing you to the backend that is preventing cleanup. In this case you should ! make sure that this backend ends its current transaction so that cleanup can ! proceed. *************** *** 132,137 **** --- 164,183 ---- + + parameter + + + The notification parameter (payload) to be communicated along with the + notification. The character string is only allowed to consist of pure + ASCII 7-bit characters and must be shorter than 8000 characters. + Specifying a longer payload will cause an error. If you need to send + other characters or binary data, you need to take care of the encoding + and decoding (like base64) on your own. Alternatively you can store the + information in a database table and send the key of the record. + + + *************** *** 145,151 **** LISTEN virtual; NOTIFY virtual; ! Asynchronous notification "virtual" received from server process with PID 8448. --- 191,203 ---- LISTEN virtual; NOTIFY virtual; ! Asynchronous notification "virtual" () received from server process with PID 8448. ! NOTIFY virtual 'This is the payload'; ! Asynchronous notification "virtual" (This is the payload) received from server process with PID 8448. ! ! LISTEN foo; ! SELECT pg_notify((SELECT 'fo' || 'o'), (SELECT 'pay' || 'load')); ! Asynchronous notification "foo" (payload) received from server process with PID 20801. diff -cr cvs.head/doc/src/sgml/ref/unlisten.sgml cvs.build/doc/src/sgml/ref/unlisten.sgml *** cvs.head/doc/src/sgml/ref/unlisten.sgml 2008-11-14 11:22:47.000000000 +0100 --- cvs.build/doc/src/sgml/ref/unlisten.sgml 2010-02-14 16:04:25.000000000 +0100 *************** *** 83,88 **** --- 83,92 ---- At the end of each session, UNLISTEN * is automatically executed. + + + A transaction that has executed LISTEN cannot be prepared for a two-phase commit. + *************** *** 94,100 **** LISTEN virtual; NOTIFY virtual; ! Asynchronous notification "virtual" received from server process with PID 8448. --- 98,104 ---- LISTEN virtual; NOTIFY virtual; ! Asynchronous notification "virtual" () received from server process with PID 8448. diff -cr cvs.head/src/backend/access/transam/slru.c cvs.build/src/backend/access/transam/slru.c *** cvs.head/src/backend/access/transam/slru.c 2010-01-05 12:39:22.000000000 +0100 --- cvs.build/src/backend/access/transam/slru.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 58,83 **** #include "storage/shmem.h" #include "miscadmin.h" - - /* - * Define segment size. A page is the same BLCKSZ as is used everywhere - * else in Postgres. The segment size can be chosen somewhat arbitrarily; - * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG - * or 64K transactions for SUBTRANS. - * - * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, - * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where - * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at - * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need - * take no explicit notice of that fact in this module, except when comparing - * segment and page numbers in SimpleLruTruncate (see PagePrecedes()). - * - * Note: this file currently assumes that segment file names will be four - * hex digits. This sets a lower bound on the segment size (64K transactions - * for 32-bit TransactionIds). - */ - #define SLRU_PAGES_PER_SEGMENT 32 - #define SlruFileName(ctl, path, seg) \ snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg) --- 58,63 ---- diff -cr cvs.head/src/backend/access/transam/twophase_rmgr.c cvs.build/src/backend/access/transam/twophase_rmgr.c *** cvs.head/src/backend/access/transam/twophase_rmgr.c 2010-01-05 12:39:22.000000000 +0100 --- cvs.build/src/backend/access/transam/twophase_rmgr.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 25,31 **** { NULL, /* END ID */ lock_twophase_recover, /* Lock */ - NULL, /* notify/listen */ NULL, /* pgstat */ multixact_twophase_recover /* MultiXact */ }; --- 25,30 ---- *************** *** 34,40 **** { NULL, /* END ID */ lock_twophase_postcommit, /* Lock */ - notify_twophase_postcommit, /* notify/listen */ pgstat_twophase_postcommit, /* pgstat */ multixact_twophase_postcommit /* MultiXact */ }; --- 33,38 ---- *************** *** 43,49 **** { NULL, /* END ID */ lock_twophase_postabort, /* Lock */ - NULL, /* notify/listen */ pgstat_twophase_postabort, /* pgstat */ multixact_twophase_postabort /* MultiXact */ }; --- 41,46 ---- *************** *** 52,58 **** { NULL, /* END ID */ lock_twophase_standby_recover, /* Lock */ - NULL, /* notify/listen */ NULL, /* pgstat */ NULL /* MultiXact */ }; --- 49,54 ---- diff -cr cvs.head/src/backend/access/transam/xact.c cvs.build/src/backend/access/transam/xact.c *** cvs.head/src/backend/access/transam/xact.c 2010-02-14 16:02:46.000000000 +0100 --- cvs.build/src/backend/access/transam/xact.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 1736,1743 **** /* close large objects before lower-level cleanup */ AtEOXact_LargeObject(true); ! /* NOTIFY commit must come before lower-level cleanup */ ! AtCommit_Notify(); /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); --- 1736,1743 ---- /* close large objects before lower-level cleanup */ AtEOXact_LargeObject(true); ! /* Insert notifications sent by the NOTIFY command into the queue */ ! AtCommit_NotifyBeforeCommit(); /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); *************** *** 1815,1820 **** --- 1815,1825 ---- AtEOXact_MultiXact(); + /* + * Clean up Notify buffers and signal listening backends. + */ + AtCommit_NotifyAfterCommit(); + ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_LOCKS, true, true); diff -cr cvs.head/src/backend/catalog/Makefile cvs.build/src/backend/catalog/Makefile *** cvs.head/src/backend/catalog/Makefile 2010-01-06 22:30:05.000000000 +0100 --- cvs.build/src/backend/catalog/Makefile 2010-02-14 16:04:25.000000000 +0100 *************** *** 30,36 **** pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \ pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \ ! pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \ pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \ pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \ pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \ --- 30,36 ---- pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \ pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \ ! pg_statistic.h pg_rewrite.h pg_trigger.h pg_description.h \ pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \ pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \ pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \ diff -cr cvs.head/src/backend/commands/async.c cvs.build/src/backend/commands/async.c *** cvs.head/src/backend/commands/async.c 2010-01-05 12:39:22.000000000 +0100 --- cvs.build/src/backend/commands/async.c 2010-02-14 18:09:42.000000000 +0100 *************** *** 13,44 **** */ /*------------------------------------------------------------------------- ! * New Async Notification Model: ! * 1. Multiple backends on same machine. Multiple backends listening on ! * one relation. (Note: "listening on a relation" is not really the ! * right way to think about it, since the notify names need not have ! * anything to do with the names of relations actually in the database. ! * But this terminology is all over the code and docs, and I don't feel ! * like trying to replace it.) ! * ! * 2. There is a tuple in relation "pg_listener" for each active LISTEN, ! * ie, each relname/listenerPID pair. The "notification" field of the ! * tuple is zero when no NOTIFY is pending for that listener, or the PID ! * of the originating backend when a cross-backend NOTIFY is pending. ! * (We skip writing to pg_listener when doing a self-NOTIFY, so the ! * notification field should never be equal to the listenerPID field.) ! * ! * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target ! * relname to a list of outstanding NOTIFY requests. Actual processing ! * happens if and only if we reach transaction commit. At that time (in ! * routine AtCommit_Notify) we scan pg_listener for matching relnames. ! * If the listenerPID in a matching tuple is ours, we just send a notify ! * message to our own front end. If it is not ours, and "notification" ! * is not already nonzero, we set notification to our own PID and send a ! * PROCSIG_NOTIFY_INTERRUPT signal to the receiving process (indicated by ! * listenerPID). ! * BTW: if the signal operation fails, we presume that the listener backend ! * crashed without removing this tuple, and remove the tuple for it. * * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * can call inbound-notify processing immediately if this backend is idle --- 13,73 ---- */ /*------------------------------------------------------------------------- ! * Async Notification Model as of 9.0: ! * ! * 1. Multiple backends on same machine. Multiple backends listening on ! * several channels. (This was previously called a "relation" even though it ! * is just an identifier and has nothing to do with a database relation.) ! * ! * 2. There is one central queue in the form of Slru backed file based storage ! * (directory pg_notify/), with several pages mapped into shared memory. ! * All listening backends read from that queue and all notifications are ! * placed in this queue, see the data structure AsyncQueueEntry. ! * ! * There is no central storage of which backend listens on which channel, ! * every backend has its own list. ! * ! * Every backend that is listening on at least one channel registers by ! * entering its Pid into the array of all backends. It then scans all ! * incoming notifications in the central queue and first compares the ! * database oid of the notification with its own database oid and then ! * compares the notified channel with the list of channels that it listens ! * to. All notifications without a match are just skipped. ! * ! * In case there is a match it delivers the corresponding notification to ! * its frontend. ! * ! * 3. The NOTIFY statement (routine Async_Notify) stores the notification ! * in a list which will not be processed until at transaction end. Every ! * notification can additionally send a "payload" which is an extra text ! * parameter to convey arbitrary information to the recipient. ! * ! * Duplicate notifications from the same transaction are sent out as one ! * notification only. This is done to save work when for example a trigger ! * on a 2 million row table fires a notification for each row that has been ! * changed. If the application needs to receive every single notification ! * that has been sent, it can easily add some unique string into the extra ! * payload parameter. ! * ! * Once the transaction commits, AtCommit_NotifyBeforeCommit() performs the ! * required changes regarding listeners (Listen/Unlisten) and then adds the ! * pending notifications to the head of the queue. The head pointer of the ! * queue always points to the next free position and a position is just a ! * page number and the offset in that page. This is done before marking the ! * transaction as committed in clog. If we run into problems writing the ! * notifications, we can still call elog(ERROR, ...) and the transaction ! * will roll back. ! * ! * Once we have put all of the notifications into the queue, we return to ! * CommitTransaction() which will then commit to clog. ! * ! * After clog commit we are called another time ! * (AtCommit_NotifyAfterCommit()). Here we check if we need to signal the ! * backends. In SignalBackends() we scan the list of listening backends and ! * send a PROCSIG_NOTIFY_INTERRUPT to every backend that has set its Pid (we ! * don't know which backend is listening on which channel so we need to send ! * a signal to every listening backend). We can exclude backends that are ! * already up to date. * * 4. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * can call inbound-notify processing immediately if this backend is idle *************** *** 46,84 **** * block). Otherwise the handler may only set a flag, which will cause the * processing to occur just before we next go idle. * ! * 5. Inbound-notify processing consists of scanning pg_listener for tuples ! * matching our own listenerPID and having nonzero notification fields. ! * For each such tuple, we send a message to our frontend and clear the ! * notification field. BTW: this routine has to start/commit its own ! * transaction, since by assumption it is only called from outside any ! * transaction. ! * ! * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list ! * of pending actions. If we reach transaction commit, the changes are ! * applied to pg_listener just before executing any pending NOTIFYs. This ! * method is necessary because to avoid race conditions, we must hold lock ! * on pg_listener from when we insert a new listener tuple until we commit. ! * To do that and not create undue hazard of deadlock, we don't want to ! * touch pg_listener until we are otherwise done with the transaction; ! * in particular it'd be uncool to still be taking user-commanded locks ! * while holding the pg_listener lock. ! * ! * Although we grab ExclusiveLock on pg_listener for any operation, ! * the lock is never held very long, so it shouldn't cause too much of ! * a performance problem. (Previously we used AccessExclusiveLock, but ! * there's no real reason to forbid concurrent reads.) * ! * An application that listens on the same relname it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * by comparing be_pid in the NOTIFY message to the application's own backend's ! * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the * frontend during startup.) The above design guarantees that notifies from ! * other backends will never be missed by ignoring self-notifies. Note, ! * however, that we do *not* guarantee that a separate frontend message will ! * be sent for every outside NOTIFY. Since there is only room for one ! * originating PID in pg_listener, outside notifies occurring at about the ! * same time may be collapsed into a single message bearing the PID of the ! * first outside backend to perform the NOTIFY. *------------------------------------------------------------------------- */ --- 75,96 ---- * block). Otherwise the handler may only set a flag, which will cause the * processing to occur just before we next go idle. * ! * Inbound-notify processing consists of reading all of the notifications ! * that have arrived since scanning last time. We read every notification ! * until we reach either a notification from an uncommitted transaction or ! * the head pointer's position. Then we check if we were the laziest ! * backend: if our pointer is set to the same position as the global tail ! * pointer is set, then we set it further to the second-laziest backend (We ! * can identify it by inspecting the positions of all other backends' ! * pointers). Whenever we move the tail pointer we also truncate now unused ! * pages (i.e. delete files in pg_notify/ that are no longer used). * ! * An application that listens on the same channel it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * by comparing be_pid in the NOTIFY message to the application's own backend's ! * Pid. (As of FE/BE protocol 2.0, the backend's Pid is provided to the * frontend during startup.) The above design guarantees that notifies from ! * other backends will never be missed by ignoring self-notifies. *------------------------------------------------------------------------- */ *************** *** 88,97 **** #include #include "access/heapam.h" ! #include "access/twophase_rmgr.h" #include "access/xact.h" ! #include "catalog/pg_listener.h" #include "commands/async.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" --- 100,111 ---- #include #include "access/heapam.h" ! #include "access/slru.h" ! #include "access/transam.h" #include "access/xact.h" ! #include "catalog/pg_type.h" #include "commands/async.h" + #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" *************** *** 108,115 **** /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of ! * all actions requested in the current transaction. As explained above, ! * we don't actually modify pg_listener until we reach transaction commit. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but --- 122,129 ---- /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of ! * all actions requested in the current transaction. As explained above, ! * we don't actually send notifications until we reach transaction commit. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but *************** *** 126,132 **** typedef struct { ListenActionKind action; ! char condname[1]; /* actually, as long as needed */ } ListenAction; static List *pendingActions = NIL; /* list of ListenAction */ --- 140,146 ---- typedef struct { ListenActionKind action; ! char channel[1]; /* actually, as long as needed */ } ListenAction; static List *pendingActions = NIL; /* list of ListenAction */ *************** *** 134,140 **** static List *upperPendingActions = NIL; /* list of upper-xact lists */ /* ! * State for outbound notifies consists of a list of all relnames NOTIFYed * in the current transaction. We do not actually perform a NOTIFY until * and unless the transaction commits. pendingNotifies is NIL if no * NOTIFYs have been done in the current transaction. --- 148,154 ---- static List *upperPendingActions = NIL; /* list of upper-xact lists */ /* ! * State for outbound notifies consists of a list of all channels NOTIFYed * in the current transaction. We do not actually perform a NOTIFY until * and unless the transaction commits. pendingNotifies is NIL if no * NOTIFYs have been done in the current transaction. *************** *** 149,160 **** * condition name, it will get a self-notify at commit. This is a bit odd * but is consistent with our historical behavior. */ - static List *pendingNotifies = NIL; /* list of C strings */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ /* ! * State for inbound notifies consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessIncomingNotify * directly, and one saying whether the signal has occurred but the handler * was not allowed to call ProcessIncomingNotify at the time. --- 163,285 ---- * condition name, it will get a self-notify at commit. This is a bit odd * but is consistent with our historical behavior. */ + typedef struct QueuePosition + { + int page; + int offset; + } QueuePosition; + + typedef struct Notification + { + char *channel; + char *payload; + TransactionId xid; + int32 srcPid; + } Notification; + + typedef struct AsyncQueueEntry + { + /* + * this record has the maximal length, but usually we limit it to + * AsyncQueueEntryEmptySize + strlen(payload). + */ + Size length; + Oid dboid; + TransactionId xid; + int32 srcPid; + char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; + } AsyncQueueEntry; + #define AsyncQueueEntryEmptySize \ + (sizeof(AsyncQueueEntry) - NOTIFY_PAYLOAD_MAX_LENGTH + 1 \ + - NAMEDATALEN + 1) + + #define InvalidPid (-1) + #define QUEUE_POS_PAGE(x) ((x).page) + #define QUEUE_POS_OFFSET(x) ((x).offset) + #define QUEUE_POS_EQUAL(x,y) \ + ((x).page == (y).page ? (x).offset == (y).offset : false) + #define SET_QUEUE_POS(x,y,z) \ + do { \ + (x).page = (y); \ + (x).offset = (z); \ + } while (0); + /* does page x logically precede page y with z = HEAD ? */ + #define QUEUE_POS_MIN(x,y,z) \ + asyncQueuePagePrecedesLogically((x).page, (y).page, (z).page) ? (x) : \ + asyncQueuePagePrecedesLogically((y).page, (x).page, (z).page) ? (y) : \ + (x).offset < (y).offset ? (x) : \ + (y) + #define QUEUE_BACKEND_POS(i) asyncQueueControl->backend[(i)].pos + #define QUEUE_BACKEND_PID(i) asyncQueueControl->backend[(i)].pid + #define QUEUE_HEAD asyncQueueControl->head + #define QUEUE_TAIL asyncQueueControl->tail + + typedef struct QueueBackendStatus + { + int32 pid; + QueuePosition pos; + } QueueBackendStatus; + + /* + * The AsyncQueueControl structure is protected by the AsyncQueueLock. + * + * In SHARED mode, backends will only inspect their own entries as well as + * head and tail pointers. Consequently we can allow a backend to update its + * own record while holding only a shared lock (since no other backend will + * inspect it). + * + * In EXCLUSIVE mode, backends can inspect the entries of other backends and + * also change head and tail pointers. + * + * In order to avoid deadlocks, whenever we need both locks, we always first + * get AsyncQueueLock and then AsyncCtlLock. + */ + typedef struct AsyncQueueControl + { + QueuePosition head; /* head points to the next free location */ + QueuePosition tail; /* the global tail is equivalent to the + tail of the "slowest" backend */ + TimestampTz lastQueueFillWarn; /* when the queue is full we only + want to log that once in a + while */ + QueueBackendStatus backend[1]; /* actually this one has as many entries as + * connections are allowed (MaxBackends) */ + /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */ + } AsyncQueueControl; + + static AsyncQueueControl *asyncQueueControl; + static SlruCtlData AsyncCtlData; + + #define AsyncCtl (&AsyncCtlData) + #define QUEUE_PAGESIZE BLCKSZ + #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ + + /* + * slru.c currently assumes that all filenames are four characters of hex + * digits. That means that we can use segments 0000 through FFFF. + * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us + * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0xFFFF. + * + * It's of course easy to enhance slru.c but those pages give us so much + * space already that it doesn't seem worth the trouble... + * + * It's an interesting test case to define QUEUE_MAX_PAGE to a very small + * multiple of SLRU_PAGES_PER_SEGMENT to test queue full behaviour. + */ + #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0xFFFF) + + static List *pendingNotifies = NIL; /* list of Notifications */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ + static List *listenChannels = NIL; /* list of channels we are listening to */ + + /* has this backend sent notifications in the current transaction ? */ + static bool backendHasSentNotifications = false; + /* has this backend executed a LISTEN in the current transaction ? */ + static bool backendHasExecutedInitialListen = false; /* ! * State for inbound notifications consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessIncomingNotify * directly, and one saying whether the signal has occurred but the handler * was not allowed to call ProcessIncomingNotify at the time. *************** *** 171,224 **** bool Trace_notify = false; ! ! static void queue_listen(ListenActionKind action, const char *condname); static void Async_UnlistenOnExit(int code, Datum arg); ! static void Exec_Listen(Relation lRel, const char *relname); ! static void Exec_Unlisten(Relation lRel, const char *relname); ! static void Exec_UnlistenAll(Relation lRel); ! static void Send_Notify(Relation lRel); static void ProcessIncomingNotify(void); ! static void NotifyMyFrontEnd(char *relname, int32 listenerPID); ! static bool AsyncExistsPendingNotify(const char *relname); static void ClearPendingActionsAndNotifies(void); /* * Async_Notify * * This is executed by the SQL notify command. * ! * Adds the relation to the list of pending notifies. * Actual notification happens during transaction commit. * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */ void ! Async_Notify(const char *relname) { if (Trace_notify) ! elog(DEBUG1, "Async_Notify(%s)", relname); ! /* no point in making duplicate entries in the list ... */ ! if (!AsyncExistsPendingNotify(relname)) { ! /* ! * The name list needs to live until end of transaction, so store it ! * in the transaction context. ! */ ! MemoryContext oldcontext; ! oldcontext = MemoryContextSwitchTo(CurTransactionContext); ! /* ! * Ordering of the list isn't important. We choose to put new entries ! * on the front, as this might make duplicate-elimination a tad faster ! * when the same condition is signaled many times in a row. ! */ ! pendingNotifies = lcons(pstrdup(relname), pendingNotifies); ! MemoryContextSwitchTo(oldcontext); ! } } /* --- 296,520 ---- bool Trace_notify = false; ! static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); ! static bool IsListeningOn(const char *channel); ! static bool AsyncExistsPendingNotify(const char *channel, const char *payload); ! static void Exec_ListenBeforeCommit(const char *channel); ! static void Exec_ListenAfterCommit(const char *channel); ! static void Exec_UnlistenAfterCommit(const char *channel); ! static void Exec_UnlistenAllAfterCommit(void); ! static void SignalBackends(void); ! static bool asyncQueuePagePrecedesPhysically(int p, int q); ! static bool asyncQueuePagePrecedesLogically(int p, int q, int head); ! static bool asyncQueueAdvance(QueuePosition *position, int entryLength); ! static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); ! static void asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n); ! static List *asyncQueueAddEntries(List *notifications); ! static bool asyncQueueGetEntriesByPage(QueuePosition *current, ! QueuePosition stop, ! List **notifications); ! static void asyncQueueReadAllNotifications(void); ! static void asyncQueueAdvanceTail(void); ! static void asyncQueueUnregister(void); ! static void asyncQueueFillWarning(void); ! static bool asyncQueueIsFull(void); static void ProcessIncomingNotify(void); ! static void NotifyMyFrontEnd(const char *channel, ! const char *payload, ! int32 srcPid); ! static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static void ClearPendingActionsAndNotifies(void); + /* + * We will work on the page range of 0..(SLRU_PAGES_PER_SEGMENT * 0xFFFF). + * asyncQueuePagePrecedesPhysically just checks numerically without any magic + * if one page precedes another one. + * + * On the other hand, when asyncQueuePagePrecedesLogically does that check, it + * takes the current head page number into account. If we have wrapped + * around, it can happen that p precedes q, even though p > q (if the head page + * is in between the two). + */ + static bool + asyncQueuePagePrecedesPhysically(int p, int q) + { + return p < q; + } + + static bool + asyncQueuePagePrecedesLogically(int p, int q, int head) + { + if (p <= head && q <= head) + return p < q; + if (p > head && q > head) + return p < q; + if (p <= head) + { + Assert(q > head); + /* q is older */ + return false; + } + else + { + Assert(p > head && q <= head); + /* p is older */ + return true; + } + } + + void + AsyncShmemInit(void) + { + bool found; + int slotno; + Size size; + + /* + * Remember that sizeof(AsyncQueueControl) already contains one member of + * QueueBackendStatus, so we only need to add the status space requirement + * for MaxBackends-1 backends. + */ + size = mul_size(MaxBackends-1, sizeof(QueueBackendStatus)); + size = add_size(size, sizeof(AsyncQueueControl)); + + asyncQueueControl = (AsyncQueueControl *) + ShmemInitStruct("Async Queue Control", size, &found); + + if (!asyncQueueControl) + elog(ERROR, "out of memory"); + + if (!found) + { + int i; + SET_QUEUE_POS(QUEUE_HEAD, 0, 0); + SET_QUEUE_POS(QUEUE_TAIL, 0, 0); + for (i = 0; i < MaxBackends; i++) + { + SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + QUEUE_BACKEND_PID(i) = InvalidPid; + } + } + + AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically; + SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0, + AsyncCtlLock, "pg_notify"); + AsyncCtl->do_fsync = false; + asyncQueueControl->lastQueueFillWarn = GetCurrentTimestamp(); + + if (!found) + { + SlruScanDirectory(AsyncCtl, + QUEUE_MAX_PAGE + SLRU_PAGES_PER_SEGMENT, + true); + + LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); + slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); + SimpleLruWritePage(AsyncCtl, slotno, NULL); + LWLockRelease(AsyncCtlLock); + } + } + + + /* + * pg_notify - + * Send a notification to listening clients + */ + Datum + pg_notify(PG_FUNCTION_ARGS) + { + const char *channel; + const char *payload; + + if (PG_ARGISNULL(0)) + channel = ""; + else + channel = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + if (PG_ARGISNULL(1)) + payload = ""; + else + payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); + + Async_Notify(channel, payload); + + PG_RETURN_VOID(); + } + /* * Async_Notify * * This is executed by the SQL notify command. * ! * Adds the channel to the list of pending notifies. * Actual notification happens during transaction commit. * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */ void ! Async_Notify(const char *channel, const char *payload) { + Notification *n; + MemoryContext oldcontext; + int i; + if (Trace_notify) ! elog(DEBUG1, "Async_Notify(%s)", channel); ! /* a channel name must be specified */ ! if (!channel || !strlen(channel)) ! ereport(ERROR, ! (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ! errmsg("channel name cannot be empty"))); ! ! if (strlen(channel) >= NAMEDATALEN) ! ereport(ERROR, ! (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ! errmsg("channel name too long"))); ! ! if (payload) { ! if (strlen(payload) > NOTIFY_PAYLOAD_MAX_LENGTH - 1) ! ereport(ERROR, ! (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ! errmsg("payload string too long"))); ! ! for (i = 0; i < strlen(payload); i++) ! if (payload[i] < 32 || payload[i] > 126) ! ereport(ERROR, ! (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ! errmsg("invalid character in payload"), ! errdetail("only 7-bit ASCII characters allowed"))); ! } ! /* no point in making duplicate entries in the list ... */ ! if (AsyncExistsPendingNotify(channel, payload)) ! return; ! /* ! * The name list needs to live until end of transaction, so store it ! * in the transaction context. ! */ ! oldcontext = MemoryContextSwitchTo(CurTransactionContext); ! n = (Notification *) palloc(sizeof(Notification)); ! n->channel = pstrdup(channel); ! if (payload) ! n->payload = pstrdup(payload); ! else ! n->payload = ""; ! ! /* will set the xid and the srcPid later... */ ! n->xid = InvalidTransactionId; ! n->srcPid = InvalidPid; ! ! /* ! * We want to preserve the order so we need to append every ! * notification. See comments at AsyncExistsPendingNotify(). ! */ ! pendingNotifies = lappend(pendingNotifies, n); ! ! MemoryContextSwitchTo(oldcontext); } /* *************** *** 226,236 **** * Common code for listen, unlisten, unlisten all commands. * * Adds the request to the list of pending actions. ! * Actual update of pg_listener happens during transaction commit. ! * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */ static void ! queue_listen(ListenActionKind action, const char *condname) { MemoryContext oldcontext; ListenAction *actrec; --- 522,532 ---- * Common code for listen, unlisten, unlisten all commands. * * Adds the request to the list of pending actions. ! * Actual update of the notification queue happens during transaction ! * commit. */ static void ! queue_listen(ListenActionKind action, const char *channel) { MemoryContext oldcontext; ListenAction *actrec; *************** *** 244,252 **** oldcontext = MemoryContextSwitchTo(CurTransactionContext); /* space for terminating null is included in sizeof(ListenAction) */ ! actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname)); actrec->action = action; ! strcpy(actrec->condname, condname); pendingActions = lappend(pendingActions, actrec); --- 540,548 ---- oldcontext = MemoryContextSwitchTo(CurTransactionContext); /* space for terminating null is included in sizeof(ListenAction) */ ! actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel)); actrec->action = action; ! strcpy(actrec->channel, channel); pendingActions = lappend(pendingActions, actrec); *************** *** 259,270 **** * This is executed by the SQL listen command. */ void ! Async_Listen(const char *relname) { if (Trace_notify) ! elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); ! queue_listen(LISTEN_LISTEN, relname); } /* --- 555,566 ---- * This is executed by the SQL listen command. */ void ! Async_Listen(const char *channel) { if (Trace_notify) ! elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); ! queue_listen(LISTEN_LISTEN, channel); } /* *************** *** 273,288 **** * This is executed by the SQL unlisten command. */ void ! Async_Unlisten(const char *relname) { if (Trace_notify) ! elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); /* If we couldn't possibly be listening, no need to queue anything */ if (pendingActions == NIL && !unlistenExitRegistered) return; ! queue_listen(LISTEN_UNLISTEN, relname); } /* --- 569,584 ---- * This is executed by the SQL unlisten command. */ void ! Async_Unlisten(const char *channel) { if (Trace_notify) ! elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); /* If we couldn't possibly be listening, no need to queue anything */ if (pendingActions == NIL && !unlistenExitRegistered) return; ! queue_listen(LISTEN_UNLISTEN, channel); } /* *************** *** 306,313 **** /* * Async_UnlistenOnExit * - * Clean up the pg_listener table at backend exit. - * * This is executed if we have done any LISTENs in this backend. * It might not be necessary anymore, if the user UNLISTENed everything, * but we don't try to detect that case. --- 602,607 ---- *************** *** 315,331 **** static void Async_UnlistenOnExit(int code, Datum arg) { - /* - * We need to start/commit a transaction for the unlisten, but if there is - * already an active transaction we had better abort that one first. - * Otherwise we'd end up committing changes that probably ought to be - * discarded. - */ AbortOutOfAnyTransaction(); ! /* Now we can do the unlisten */ ! StartTransactionCommand(); ! Async_UnlistenAll(); ! CommitTransactionCommand(); } /* --- 609,616 ---- static void Async_UnlistenOnExit(int code, Datum arg) { AbortOutOfAnyTransaction(); ! Exec_UnlistenAllAfterCommit(); } /* *************** *** 337,386 **** void AtPrepare_Notify(void) { - ListCell *p; - /* It's not sensible to have any pending LISTEN/UNLISTEN actions */ ! if (pendingActions) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN"))); ! ! /* We can deal with pending NOTIFY though */ ! foreach(p, pendingNotifies) ! { ! const char *relname = (const char *) lfirst(p); ! ! RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, ! relname, strlen(relname) + 1); ! } ! ! /* ! * We can clear the state immediately, rather than needing a separate ! * PostPrepare call, because if the transaction fails we'd just discard ! * the state anyway. ! */ ! ClearPendingActionsAndNotifies(); } /* ! * AtCommit_Notify * ! * This is called at transaction commit. * ! * If there are pending LISTEN/UNLISTEN actions, insert or delete ! * tuples in pg_listener accordingly. * ! * If there are outbound notify requests in the pendingNotifies list, ! * scan pg_listener for matching tuples, and either signal the other ! * backend or send a message to our own frontend. ! * ! * NOTE: we are still inside the current transaction, therefore can ! * piggyback on its committing of changes. */ void ! AtCommit_Notify(void) { - Relation lRel; ListCell *p; if (pendingActions == NIL && pendingNotifies == NIL) --- 622,649 ---- void AtPrepare_Notify(void) { /* It's not sensible to have any pending LISTEN/UNLISTEN actions */ ! if (pendingActions || pendingNotifies) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot PREPARE a transaction that has executed LISTEN/UNLISTEN or NOTIFY"))); } /* ! * AtCommit_NotifyBeforeCommit * ! * This is called at transaction commit, before actually committing to ! * clog. * ! * If there are pending LISTEN/UNLISTEN actions, update our ! * "listenChannels" list. * ! * If there are outbound notify requests in the pendingNotifies list, add ! * them to the global queue and signal any backend that is listening. */ void ! AtCommit_NotifyBeforeCommit(void) { ListCell *p; if (pendingActions == NIL && pendingNotifies == NIL) *************** *** 397,406 **** } if (Trace_notify) ! elog(DEBUG1, "AtCommit_Notify"); ! /* Acquire ExclusiveLock on pg_listener */ ! lRel = heap_open(ListenerRelationId, ExclusiveLock); /* Perform any pending listen/unlisten actions */ foreach(p, pendingActions) --- 660,669 ---- } if (Trace_notify) ! elog(DEBUG1, "AtCommit_NotifyBeforeCommit"); ! Assert(backendHasSentNotifications == false); ! Assert(backendHasExecutedInitialListen == false); /* Perform any pending listen/unlisten actions */ foreach(p, pendingActions) *************** *** 410,508 **** switch (actrec->action) { case LISTEN_LISTEN: ! Exec_Listen(lRel, actrec->condname); break; case LISTEN_UNLISTEN: ! Exec_Unlisten(lRel, actrec->condname); break; case LISTEN_UNLISTEN_ALL: ! Exec_UnlistenAll(lRel); break; } - - /* We must CCI after each action in case of conflicting actions */ - CommandCounterIncrement(); } - /* Perform any pending notifies */ - if (pendingNotifies) - Send_Notify(lRel); - /* ! * We do NOT release the lock on pg_listener here; we need to hold it ! * until end of transaction (which is about to happen, anyway) to ensure ! * that notified backends see our tuple updates when they look. Else they ! * might disregard the signal, which would make the application programmer ! * very unhappy. Also, this prevents race conditions when we have just ! * inserted a listening tuple. */ ! heap_close(lRel, NoLock); ! ! ClearPendingActionsAndNotifies(); ! if (Trace_notify) ! elog(DEBUG1, "AtCommit_Notify: done"); } /* ! * Exec_Listen --- subroutine for AtCommit_Notify * ! * Register the current backend as listening on the specified relation. */ ! static void ! Exec_Listen(Relation lRel, const char *relname) { ! HeapScanDesc scan; ! HeapTuple tuple; ! Datum values[Natts_pg_listener]; ! bool nulls[Natts_pg_listener]; ! NameData condname; ! bool alreadyListener = false; ! if (Trace_notify) ! elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid); ! /* Detect whether we are already listening on this relname */ ! scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); ! while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { ! Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); ! if (listener->listenerpid == MyProcPid && ! strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) { ! alreadyListener = true; ! /* No need to scan the rest of the table */ ! break; } } - heap_endscan(scan); ! if (alreadyListener) ! return; ! /* ! * OK to insert a new tuple ! */ ! memset(nulls, false, sizeof(nulls)); ! namestrcpy(&condname, relname); ! values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname); ! values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid); ! values[Anum_pg_listener_notification - 1] = Int32GetDatum(0); /* no notifies pending */ ! tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls); ! simple_heap_insert(lRel, tuple); ! #ifdef NOT_USED /* currently there are no indexes */ ! CatalogUpdateIndexes(lRel, tuple); ! #endif ! heap_freetuple(tuple); /* ! * now that we are listening, make sure we will unlisten before dying. */ if (!unlistenExitRegistered) { --- 673,886 ---- switch (actrec->action) { case LISTEN_LISTEN: ! Exec_ListenBeforeCommit(actrec->channel); break; case LISTEN_UNLISTEN: ! /* there is no Exec_UnlistenBeforeCommit() */ break; case LISTEN_UNLISTEN_ALL: ! /* there is no Exec_UnlistenAllBeforeCommit() */ break; } } /* ! * Perform any pending notifies. */ ! if (pendingNotifies) ! { ! backendHasSentNotifications = true; ! while (pendingNotifies != NIL) ! { ! /* ! * Add the pending notifications to the queue. ! * ! * A full queue is very uncommon and should really not happen, ! * given that we have so much space available in the slru pages. ! * Nevertheless we need to deal with this possibility. Note that ! * when we get here we are in the process of committing our ! * transaction, we have not yet committed to clog but this would be ! * the next step. So at this point in time we can still roll the ! * transaction back. ! */ ! LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); ! asyncQueueFillWarning(); ! if (asyncQueueIsFull()) ! { ! LWLockRelease(AsyncQueueLock); ! ereport(ERROR, ! (errcode(ERRCODE_TOO_MANY_ENTRIES), ! errmsg("Too many notifications in the queue"))); ! } ! pendingNotifies = asyncQueueAddEntries(pendingNotifies); ! LWLockRelease(AsyncQueueLock); ! } ! } } /* ! * AtCommit_NotifyAfterCommit ! * ! * This is called at transaction commit, after committing to clog. * ! * Notify the listening backends. */ ! void ! AtCommit_NotifyAfterCommit(void) { ! ListCell *p; ! /* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to ! * return as soon as possible */ ! if (!pendingActions && !backendHasSentNotifications) ! return; ! /* Perform any pending listen/unlisten actions */ ! foreach(p, pendingActions) { ! ListenAction *actrec = (ListenAction *) lfirst(p); ! switch (actrec->action) { ! case LISTEN_LISTEN: ! Exec_ListenAfterCommit(actrec->channel); ! break; ! case LISTEN_UNLISTEN: ! Exec_UnlistenAfterCommit(actrec->channel); ! break; ! case LISTEN_UNLISTEN_ALL: ! Exec_UnlistenAllAfterCommit(); ! break; } } ! if (backendHasSentNotifications) ! SignalBackends(); ! ClearPendingActionsAndNotifies(); ! ! if (Trace_notify) ! elog(DEBUG1, "AtCommit_NotifyAfterCommit: done"); ! } ! ! /* ! * This function is executed for every notification found in the queue in order ! * to check if the current backend is listening on that channel. Not sure if we ! * should further optimize this, for example convert to a sorted array and ! * allow binary search on it... ! */ ! static bool ! IsListeningOn(const char *channel) ! { ! ListCell *p; ! char *lchan; ! ! foreach(p, listenChannels) ! { ! lchan = (char *) lfirst(p); ! if (strcmp(lchan, channel) == 0) ! return true; ! } ! return false; ! } ! Datum ! pg_listening(PG_FUNCTION_ARGS) ! { ! FuncCallContext *funcctx; ! ListCell **lcp; ! /* stuff done only on the first call of the function */ ! if (SRF_IS_FIRSTCALL()) ! { ! MemoryContext oldcontext; ! /* create a function context for cross-call persistence */ ! funcctx = SRF_FIRSTCALL_INIT(); ! /* ! * switch to memory context appropriate for multiple function calls ! */ ! oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); ! ! /* allocate memory for user context */ ! lcp = (ListCell **) palloc(sizeof(ListCell **)); ! if (listenChannels != NIL) ! *lcp = list_head(listenChannels); ! else ! *lcp = NULL; ! funcctx->user_fctx = (void *) lcp; ! MemoryContextSwitchTo(oldcontext); ! } ! ! /* stuff done on every call of the function */ ! funcctx = SRF_PERCALL_SETUP(); ! lcp = (ListCell **) funcctx->user_fctx; ! ! while (*lcp != NULL) ! { ! char *channel = (char *) lfirst(*lcp); ! ! *lcp = (*lcp)->next; ! SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); ! } ! ! SRF_RETURN_DONE(funcctx); ! } ! ! /* ! * Exec_ListenBeforeCommit --- subroutine for AtCommit_NotifyBeforeCommit ! * ! * Note that we do only set our pointer here and do not yet add the channel to ! * listenChannels. Since our transaction could still roll back we do this only ! * after commit. We know that our tail pointer won't move between here and ! * directly after commit, so we won't miss a notification. ! */ ! static void ! Exec_ListenBeforeCommit(const char *channel) ! { ! if (Trace_notify) ! elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid); ! ! /* Detect whether we are already listening to something. */ ! if (listenChannels != NIL) ! return; /* ! * We need this variable to detect an aborted initial LISTEN. ! * In that case we would set up our pointer but not listen on any channel. ! * This state gets cleaned up again in AtAbort_Notify(). ! */ ! backendHasExecutedInitialListen = true; ! ! /* ! * This is our first LISTEN, establish our pointer. ! * We set our pointer to the global tail pointer, this way we make ! * sure that we get all of the notifications. We might get a few more ! * but that doesn't hurt. ! */ ! LWLockAcquire(AsyncQueueLock, LW_SHARED); ! QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL; ! QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; ! LWLockRelease(AsyncQueueLock); ! ! /* ! * Try to move our pointer forward as far as possible. This will skip ! * over already committed notifications. Still, we could get ! * notifications that have already committed before we started to ! * LISTEN. ! * ! * Note that we are not yet listening on anything, so we won't deliver ! * any notification. ! * ! * This will also advance the global tail pointer if necessary. ! */ ! asyncQueueReadAllNotifications(); ! ! /* ! * Now that we are listening, make sure we will unlisten before dying. */ if (!unlistenExitRegistered) { *************** *** 512,550 **** } /* ! * Exec_Unlisten --- subroutine for AtCommit_Notify * ! * Remove the current backend from the list of listening backends ! * for the specified relation. */ static void ! Exec_Unlisten(Relation lRel, const char *relname) { ! HeapScanDesc scan; ! HeapTuple tuple; if (Trace_notify) ! elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid); ! scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); ! while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { ! Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); ! ! if (listener->listenerpid == MyProcPid && ! strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) { ! /* Found the matching tuple, delete it */ ! simple_heap_delete(lRel, &tuple->t_self); ! ! /* ! * We assume there can be only one match, so no need to scan the ! * rest of the table ! */ break; } } ! heap_endscan(scan); /* * We do not complain about unlistening something not being listened; --- 890,942 ---- } /* ! * Exec_ListenAfterCommit --- subroutine for AtCommit_NotifyAfterCommit ! * ! * Add the channel to the list of channels we are listening on. ! */ ! static void ! Exec_ListenAfterCommit(const char *channel) ! { ! MemoryContext oldcontext; ! ! /* Detect whether we are already listening on this channel */ ! if (IsListeningOn(channel)) ! return; ! ! oldcontext = MemoryContextSwitchTo(TopMemoryContext); ! listenChannels = lappend(listenChannels, pstrdup(channel)); ! MemoryContextSwitchTo(oldcontext); ! } ! ! /* ! * Exec_UnlistenAfterCommit --- subroutine for AtCommit_NotifyAfterCommit * ! * Remove a specified channel from "listenChannels". */ static void ! Exec_UnlistenAfterCommit(const char *channel) { ! ListCell *q; ! ListCell *prev; if (Trace_notify) ! elog(DEBUG1, "Exec_UnlistenAfterCommit(%s,%d)", channel, MyProcPid); ! prev = NULL; ! foreach(q, listenChannels) { ! char *lchan = (char *) lfirst(q); ! if (strcmp(lchan, channel) == 0) { ! pfree(lchan); ! listenChannels = list_delete_cell(listenChannels, q, prev); break; } + prev = q; } ! ! if (listenChannels == NIL) ! asyncQueueUnregister(); /* * We do not complain about unlistening something not being listened; *************** *** 553,690 **** } /* ! * Exec_UnlistenAll --- subroutine for AtCommit_Notify * ! * Update pg_listener to unlisten all relations for this backend. */ static void ! Exec_UnlistenAll(Relation lRel) { - HeapScanDesc scan; - HeapTuple lTuple; - ScanKeyData key[1]; - if (Trace_notify) ! elog(DEBUG1, "Exec_UnlistenAll"); ! /* Find and delete all entries with my listenerPID */ ! ScanKeyInit(&key[0], ! Anum_pg_listener_listenerpid, ! BTEqualStrategyNumber, F_INT4EQ, ! Int32GetDatum(MyProcPid)); ! scan = heap_beginscan(lRel, SnapshotNow, 1, key); ! while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) ! simple_heap_delete(lRel, &lTuple->t_self); ! heap_endscan(scan); } /* ! * Send_Notify --- subroutine for AtCommit_Notify ! * ! * Scan pg_listener for tuples matching our pending notifies, and ! * either signal the other backend or send a message to our own frontend. */ static void ! Send_Notify(Relation lRel) { ! TupleDesc tdesc = RelationGetDescr(lRel); ! HeapScanDesc scan; ! HeapTuple lTuple, ! rTuple; ! Datum value[Natts_pg_listener]; ! bool repl[Natts_pg_listener], ! nulls[Natts_pg_listener]; ! ! /* preset data to update notify column to MyProcPid */ ! memset(nulls, false, sizeof(nulls)); ! memset(repl, false, sizeof(repl)); ! repl[Anum_pg_listener_notification - 1] = true; ! memset(value, 0, sizeof(value)); ! value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid); ! ! scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); ! ! while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) ! { ! Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); ! char *relname = NameStr(listener->relname); ! int32 listenerPID = listener->listenerpid; ! if (!AsyncExistsPendingNotify(relname)) ! continue; ! if (listenerPID == MyProcPid) { ! /* ! * Self-notify: no need to bother with table update. Indeed, we ! * *must not* clear the notification field in this path, or we ! * could lose an outside notify, which'd be bad for applications ! * that ignore self-notify messages. ! */ ! if (Trace_notify) ! elog(DEBUG1, "AtCommit_Notify: notifying self"); ! NotifyMyFrontEnd(relname, listenerPID); } else { - if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify: notifying pid %d", - listenerPID); - /* ! * If someone has already notified this listener, we don't bother ! * modifying the table, but we do still send a NOTIFY_INTERRUPT ! * signal, just in case that backend missed the earlier signal for ! * some reason. It's OK to send the signal first, because the ! * other guy can't read pg_listener until we unlock it. ! * ! * Note: we don't have the other guy's BackendId available, so ! * this will incur a search of the ProcSignal table. That's ! * probably not worth worrying about. */ ! if (SendProcSignal(listenerPID, PROCSIG_NOTIFY_INTERRUPT, ! InvalidBackendId) < 0) { ! /* ! * Get rid of pg_listener entry if it refers to a PID that no ! * longer exists. Presumably, that backend crashed without ! * deleting its pg_listener entries. This code used to only ! * delete the entry if errno==ESRCH, but as far as I can see ! * we should just do it for any failure (certainly at least ! * for EPERM too...) ! */ ! simple_heap_delete(lRel, &lTuple->t_self); } ! else if (listener->notification == 0) { ! /* Rewrite the tuple with my PID in notification column */ ! rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); ! simple_heap_update(lRel, &lTuple->t_self, rTuple); ! ! #ifdef NOT_USED /* currently there are no indexes */ ! CatalogUpdateIndexes(lRel, rTuple); ! #endif } } } ! heap_endscan(scan); } /* * AtAbort_Notify * ! * This is called at transaction abort. * ! * Gets rid of pending actions and outbound notifies that we would have ! * executed if the transaction got committed. */ void AtAbort_Notify(void) { ClearPendingActionsAndNotifies(); } --- 945,1325 ---- } /* ! * Exec_UnlistenAllAfterCommit --- subroutine for AtCommit_Notify * ! * Unlisten on all channels for this backend. */ static void ! Exec_UnlistenAllAfterCommit(void) { if (Trace_notify) ! elog(DEBUG1, "Exec_UnlistenAllAferCommit(%d)", MyProcPid); ! ! list_free_deep(listenChannels); ! listenChannels = NIL; ! ! asyncQueueUnregister(); ! } ! ! static void ! asyncQueueUnregister(void) ! { ! bool advanceTail = false; ! LWLockAcquire(AsyncQueueLock, LW_SHARED); ! QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; ! /* ! * If we have been the last backend, advance the tail pointer. ! */ ! if (QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL)) ! advanceTail = true; ! LWLockRelease(AsyncQueueLock); ! if (advanceTail) ! asyncQueueAdvanceTail(); ! } ! static bool ! asyncQueueIsFull(void) ! { ! QueuePosition lookahead = QUEUE_HEAD; ! Size remain = QUEUE_PAGESIZE - QUEUE_POS_OFFSET(lookahead) - 1; ! Size advance = Min(remain, NOTIFY_PAYLOAD_MAX_LENGTH); ! ! /* ! * Check what happens if we wrote a maximally sized entry. Would we go to a ! * new page? If not, then our queue can not be full (because we can still ! * fill at least the current page with at least one more entry). ! */ ! if (!asyncQueueAdvance(&lookahead, advance)) ! return false; ! ! /* ! * The queue is full if with a switch to a new page we reach the page ! * of the tail pointer. ! */ ! return QUEUE_POS_PAGE(lookahead) == QUEUE_POS_PAGE(QUEUE_TAIL); } /* ! * The function advances the position to the next entry. In case we jump to ! * a new page the function returns true, else false. */ + static bool + asyncQueueAdvance(QueuePosition *position, int entryLength) + { + int pageno = QUEUE_POS_PAGE(*position); + int offset = QUEUE_POS_OFFSET(*position); + bool pageJump = false; + + /* + * Move to the next writing position: First jump over what we have just + * written or read. + */ + offset += entryLength; + Assert(offset < QUEUE_PAGESIZE); + + /* + * In a second step check if another entry can be written to the page. If + * it does, stay here, we have reached the next position. If not, then we + * need to move on to the next page. + */ + if (offset + AsyncQueueEntryEmptySize >= QUEUE_PAGESIZE) + { + pageno++; + if (pageno > QUEUE_MAX_PAGE) + /* wrap around */ + pageno = 0; + offset = 0; + pageJump = true; + } + + SET_QUEUE_POS(*position, pageno, offset); + return pageJump; + } + + static void + asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) + { + Assert(n->channel != NULL); + Assert(n->payload != NULL); + Assert(strlen(n->payload) < NOTIFY_PAYLOAD_MAX_LENGTH); + Assert(strlen(n->channel) < NAMEDATALEN); + + /* The terminators are already included in AsyncQueueEntryEmptySize */ + qe->length = AsyncQueueEntryEmptySize + strlen(n->payload) + + strlen(n->channel); + qe->srcPid = MyProcPid; + qe->dboid = MyDatabaseId; + qe->xid = GetCurrentTransactionId(); + strcpy(qe->data, n->channel); + Assert(*(qe->data + strlen(n->channel)) == '\0'); + strcpy(qe->data + strlen(n->channel) + 1, n->payload); + } + static void ! asyncQueueEntryToNotification(AsyncQueueEntry *qe, Notification *n) { ! n->channel = pstrdup(qe->data); ! Assert(*(qe->data + strlen(qe->data)) == '\0'); ! n->payload = pstrdup(qe->data + strlen(qe->data) + 1); ! n->srcPid = qe->srcPid; ! n->xid = qe->xid; ! } ! ! /* ! * Add the notifications to the queue: we go page by page here, i.e. we stop ! * once we have to go to a new page but we will be called again and then fill ! * that next page. If an entry does not fit to a page anymore, we write a dummy ! * entry with an InvalidOid as the database oid in order to fill the page. So ! * every page is always used up to the last byte which simplifies reading the ! * page later. ! * ! * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock ! * here in this function. ! * ! * We are passed the list of notifications to write and return the ! * not-yet-written notifications back. Eventually we will return NIL. ! */ ! static List * ! asyncQueueAddEntries(List *notifications) ! { ! AsyncQueueEntry qe; ! int pageno; ! int offset; ! int slotno; ! ! /* ! * Note that we are holding exclusive AsyncQueueLock already. ! */ ! LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); ! pageno = QUEUE_POS_PAGE(QUEUE_HEAD); ! slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId); ! AsyncCtl->shared->page_dirty[slotno] = true; ! do ! { ! Notification *n; ! if (asyncQueueIsFull()) { ! /* document that we will not go into the if-block further down */ ! Assert(QUEUE_POS_OFFSET(QUEUE_HEAD) != 0); ! break; ! } ! ! n = (Notification *) linitial(notifications); ! asyncQueueNotificationToEntry(n, &qe); ! ! offset = QUEUE_POS_OFFSET(QUEUE_HEAD); ! /* ! * Check whether or not the entry still fits on the current page. ! */ ! if (offset + qe.length < QUEUE_PAGESIZE) ! { ! notifications = list_delete_first(notifications); } else { /* ! * Write a dummy entry to fill up the page. Actually readers will ! * only check dboid and since it won't match any reader's database ! * oid, they will ignore this entry and move on. */ ! qe.length = QUEUE_PAGESIZE - offset - 1; ! qe.dboid = InvalidOid; ! qe.data[0] = '\0'; /* empty channel */ ! qe.data[1] = '\0'; /* empty payload */ ! qe.xid = InvalidTransactionId; ! } ! memcpy((char*) AsyncCtl->shared->page_buffer[slotno] + offset, ! &qe, qe.length); ! ! } while (!asyncQueueAdvance(&(QUEUE_HEAD), qe.length) ! && notifications != NIL); ! ! if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0) ! { ! /* ! * we need to go to continue on a new page, stop here but prepare that ! * page already. ! */ ! slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); ! AsyncCtl->shared->page_dirty[slotno] = true; ! } ! LWLockRelease(AsyncCtlLock); ! ! return notifications; ! } ! ! /* ! * Here we calculate how full our queue already is. As the queue is quite ! * large, we would probably only see a high filling degree with a long running ! * idle transaction. We don't emit anything if the queue is less than half ! * full. ! * ! * In case it is between 50% to 75% full, we log a warning and calculate the ! * "slowest" backend to give a hint on which backend is preventing cleanup. ! * ! * There's a similar warning in case our queue is more than 75% full. ! * ! * The warnings show up only once every QUEUE_FULL_WARN_INTERVAL. ! */ ! static void ! asyncQueueFillWarning(void) ! { ! /* ! * Caller must hold exclusive AsyncQueueLock. ! */ ! TimestampTz t; ! double fillDegree; ! int occupied; ! int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); ! int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); ! ! occupied = headPage - tailPage; ! ! if (occupied == 0) ! return; ! ! if (!asyncQueuePagePrecedesPhysically(tailPage, headPage)) ! /* head has wrapped around, tail not yet */ ! occupied += QUEUE_MAX_PAGE; ! ! fillDegree = (float) occupied / (float) QUEUE_MAX_PAGE; ! ! if (fillDegree < 0.5) ! return; ! ! t = GetCurrentTimestamp(); ! ! if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, ! t, QUEUE_FULL_WARN_INTERVAL)) ! { ! QueuePosition min = QUEUE_HEAD; ! int32 minPid = InvalidPid; ! int i; ! ! for (i = 0; i < MaxBackends; i++) ! if (QUEUE_BACKEND_PID(i) != InvalidPid) { ! min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD); ! if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) ! minPid = QUEUE_BACKEND_PID(i); } ! ! if (fillDegree < 0.75) ! ereport(WARNING, (errmsg("pg_notify queue is more than 50%% full. " ! "Among the slowest backends: %d", minPid), ! errdetail("Cleanup can only proceed if " ! "this backend ends its current " ! "transaction"))); ! else ! ereport(WARNING, (errmsg("pg_notify queue is more than 75%% full. " ! "Among the slowest backends: %d", minPid), ! errdetail("Cleanup can only proceed if " ! "this backend ends its current " ! "transaction"))); ! ! asyncQueueControl->lastQueueFillWarn = t; ! } ! } ! ! /* ! * Send signals to all listening backends. Since we have EXCLUSIVE lock anyway ! * we also check the position of the other backends and in case that anyone is ! * already up-to-date we don't signal it. This can happen if concurrent ! * notifying transactions have sent a signal and the signaled backend has read ! * the other notifications and ours in the same step. ! * ! * Since we know the BackendId and the Pid the signalling is quite cheap. ! */ ! static void ! SignalBackends(void) ! { ! QueuePosition pos; ! ListCell *p1, *p2; ! int i; ! int32 pid; ! List *pids = NIL; ! List *ids = NIL; ! int count = 0; ! ! /* Signal everybody who is LISTENing to any channel. */ ! LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); ! for (i = 0; i < MaxBackends; i++) ! { ! pid = QUEUE_BACKEND_PID(i); ! if (pid != InvalidPid) ! { ! count++; ! pos = QUEUE_BACKEND_POS(i); ! if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) { ! pids = lappend_int(pids, pid); ! ids = lappend_int(ids, i); } } } + LWLockRelease(AsyncQueueLock); ! forboth(p1, pids, p2, ids) ! { ! pid = (int32) lfirst_int(p1); ! i = lfirst_int(p2); ! /* ! * Should we check for failure? Can it happen that a backend ! * has crashed without the postmaster starting over? ! */ ! if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, i) < 0) ! elog(WARNING, "Error signalling backend %d", pid); ! } ! ! if (count == 0) ! { ! /* No backend is listening at all, try to clean up the queue. ! * Even if by now (after we determined count to be 0 and now) ! * a backend has started to listen, advancing the tail does not ! * hurt. Our notifications are committed already and a newly ! * listening backend would skip over them anyway. */ ! asyncQueueAdvanceTail(); ! } } /* * AtAbort_Notify * ! * This is called at transaction abort. * ! * Gets rid of pending actions and outbound notifies that we would have ! * executed if the transaction got committed. ! * ! * Even though we have not committed, we need to signal the listening backends ! * because our notifications might block readers from processing the queue. ! * Now that the transaction has aborted, they can go on and skip over our ! * notifications. They could find notifications past ours that they need to ! * deliver. */ void AtAbort_Notify(void) { + if (backendHasSentNotifications) + SignalBackends(); + + /* + * If we LISTEN but then roll back the transaction we have set our pointer + * but have not made the entry in listenChannels. In that case, remove + * our pointer again. + */ + if (backendHasExecutedInitialListen) + /* + * Checking listenChannels should be redundant but it can't hurt doing + * it for safety reasons. + */ + if (listenChannels == NIL) + asyncQueueUnregister(); + ClearPendingActionsAndNotifies(); } *************** *** 940,968 **** } /* * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. * This is called either directly from the PROCSIG_NOTIFY_INTERRUPT * signal handler, or the next time control reaches the outer idle loop. ! * Scan pg_listener for arriving notifies, report them to my front end, ! * and clear the notification field in pg_listener until next time. * ! * NOTE: since we are outside any transaction, we must create our own. */ static void ProcessIncomingNotify(void) { ! Relation lRel; ! TupleDesc tdesc; ! ScanKeyData key[1]; ! HeapScanDesc scan; ! HeapTuple lTuple, ! rTuple; ! Datum value[Natts_pg_listener]; ! bool repl[Natts_pg_listener], ! nulls[Natts_pg_listener]; ! bool catchup_enabled; /* Must prevent catchup interrupt while I am running */ catchup_enabled = DisableCatchupInterrupt(); --- 1575,1809 ---- } /* + * This function will ask for a page with ReadOnly access and once we have the + * lock, we read the whole content and pass back the list of notifications + * that the calling function will deliver then. The list will contain all + * notifications from transactions that have already committed. + * + * We stop if we have either reached the stop position or go to a new page. + * + * The function returns true once we have reached the end or a notification of + * a transaction that is still running and false if we have finished with + * the page. In other words: once it returns true there is no point in calling + * it again. + */ + static bool + asyncQueueGetEntriesByPage(QueuePosition *current, + QueuePosition stop, + List **notifications) + { + AsyncQueueEntry qe; + Notification *n; + int slotno; + bool reachedStop = false; + + if (QUEUE_POS_EQUAL(*current, stop)) + return true; + + slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, current->page, + InvalidTransactionId); + do { + char *readPtr = (char *) (AsyncCtl->shared->page_buffer[slotno]); + + if (QUEUE_POS_EQUAL(*current, stop)) + { + reachedStop = true; + break; + } + + readPtr += current->offset; + /* at first we only read the header of the notification */ + memcpy(&qe, readPtr, AsyncQueueEntryEmptySize); + + if (qe.dboid == MyDatabaseId) + { + if (TransactionIdDidCommit(qe.xid)) + { + memcpy(&qe, readPtr, qe.length); + /* qe.data is the NUL terminated channel name */ + if (IsListeningOn(qe.data)) + { + n = (Notification *) palloc(sizeof(Notification)); + asyncQueueEntryToNotification(&qe, n); + *notifications = lappend(*notifications, n); + } + } + else + { + if (!TransactionIdDidAbort(qe.xid)) + { + /* + * The transaction has neither committed nor aborted so + * far. + */ + reachedStop = true; + break; + } + /* + * Here we know that the transaction has aborted, we just + * ignore its notifications. + */ + } + } + /* + * The call to asyncQueueAdvance just jumps over what we have + * just read. If there is no more space for the next record on the + * current page, it will also switch to the beginning of the next page. + */ + } while(!asyncQueueAdvance(current, qe.length)); + + /* + * Release the lock that we implicitly got from + * SimpleLruReadPage_ReadOnly(). + */ + LWLockRelease(AsyncCtlLock); + + if (QUEUE_POS_EQUAL(*current, stop)) + reachedStop = true; + + return reachedStop; + } + + + static void + asyncQueueReadAllNotifications(void) + { + QueuePosition pos; + QueuePosition oldpos; + QueuePosition head; + List *notifications; + ListCell *lc; + Notification *n; + bool advanceTail = false; + bool reachedStop; + + LWLockAcquire(AsyncQueueLock, LW_SHARED); + pos = oldpos = QUEUE_BACKEND_POS(MyBackendId); + head = QUEUE_HEAD; + LWLockRelease(AsyncQueueLock); + + if (QUEUE_POS_EQUAL(pos, head)) + { + /* Nothing to do, we have read all notifications already. */ + return; + } + + do + { + /* + * Our stop position is what we found to be the head's position when + * we entered this function. It might have changed already. But if it + * has, we will receive (or have already received and queued) another + * signal and come here again. + * + * We are not holding AsyncQueueLock here! The queue can only extend + * beyond the head pointer (see above) and we leave our backend's + * pointer where it is so nobody will truncate or rewrite pages under + * us. Especially we don't want to hold a lock while sending the + * notifications to the frontend. + */ + reachedStop = false; + + notifications = NIL; + reachedStop = asyncQueueGetEntriesByPage(&pos, head, ¬ifications); + + /* + * Note that we deliver everything that we see in the queue and that + * matches our _current_ listening state. + * Especially we do not take into account different commit times. + * + * See the following example: + * + * Backend 1: Backend 2: + * + * transaction starts + * NOTIFY foo; + * commit starts + * transaction starts + * LISTEN foo; + * commit starts + * commit to clog + * commit to clog + * + * It could happen that backend 2 sees the notification from + * backend 1 in the queue and even though the notifying transaction + * committed before the listening transaction, we still deliver the + * notification. + * + * The idea is that an additional notification does not do any + * harm we just need to make sure that we do not miss a + * notification. + */ + foreach(lc, notifications) + { + n = (Notification *) lfirst(lc); + NotifyMyFrontEnd(n->channel, n->payload, n->srcPid); + } + } while (!reachedStop); + + LWLockAcquire(AsyncQueueLock, LW_SHARED); + QUEUE_BACKEND_POS(MyBackendId) = pos; + if (QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL)) + advanceTail = true; + LWLockRelease(AsyncQueueLock); + + if (advanceTail) + /* Move forward the tail pointer and try to truncate. */ + asyncQueueAdvanceTail(); + } + + static void + asyncQueueAdvanceTail(void) + { + QueuePosition min; + int i; + int tailp; + int headp; + + LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + min = QUEUE_HEAD; + for (i = 0; i < MaxBackends; i++) + if (QUEUE_BACKEND_PID(i) != InvalidPid) + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i), QUEUE_HEAD); + + tailp = QUEUE_POS_PAGE(QUEUE_TAIL); + headp = QUEUE_POS_PAGE(QUEUE_HEAD); + QUEUE_TAIL = min; + LWLockRelease(AsyncQueueLock); + + /* This is our wraparound check */ + if ((asyncQueuePagePrecedesLogically(tailp, QUEUE_POS_PAGE(min), headp) + && asyncQueuePagePrecedesPhysically(tailp, headp)) + || tailp == QUEUE_POS_PAGE(min)) + { + /* + * SimpleLruTruncate() will ask for AsyncCtlLock but will also + * release the lock again. + * + * XXX this could be optimized, to call SimpleLruTruncate only when we + * know that we can truncate something. + */ + SimpleLruTruncate(AsyncCtl, QUEUE_POS_PAGE(min)); + } + } + + /* * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. * This is called either directly from the PROCSIG_NOTIFY_INTERRUPT * signal handler, or the next time control reaches the outer idle loop. ! * Scan the queue for arriving notifications and report them to my front ! * end. * ! * NOTE: we are outside of any transaction here. */ static void ProcessIncomingNotify(void) { ! bool catchup_enabled; ! ! Assert(GetCurrentTransactionIdIfAny() == InvalidTransactionId); /* Must prevent catchup interrupt while I am running */ catchup_enabled = DisableCatchupInterrupt(); *************** *** 974,1037 **** notifyInterruptOccurred = 0; ! StartTransactionCommand(); ! ! lRel = heap_open(ListenerRelationId, ExclusiveLock); ! tdesc = RelationGetDescr(lRel); ! ! /* Scan only entries with my listenerPID */ ! ScanKeyInit(&key[0], ! Anum_pg_listener_listenerpid, ! BTEqualStrategyNumber, F_INT4EQ, ! Int32GetDatum(MyProcPid)); ! scan = heap_beginscan(lRel, SnapshotNow, 1, key); ! ! /* Prepare data for rewriting 0 into notification field */ ! memset(nulls, false, sizeof(nulls)); ! memset(repl, false, sizeof(repl)); ! repl[Anum_pg_listener_notification - 1] = true; ! memset(value, 0, sizeof(value)); ! value[Anum_pg_listener_notification - 1] = Int32GetDatum(0); ! ! while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) ! { ! Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); ! char *relname = NameStr(listener->relname); ! int32 sourcePID = listener->notification; ! ! if (sourcePID != 0) ! { ! /* Notify the frontend */ ! ! if (Trace_notify) ! elog(DEBUG1, "ProcessIncomingNotify: received %s from %d", ! relname, (int) sourcePID); ! ! NotifyMyFrontEnd(relname, sourcePID); ! ! /* ! * Rewrite the tuple with 0 in notification column. ! */ ! rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); ! simple_heap_update(lRel, &lTuple->t_self, rTuple); ! ! #ifdef NOT_USED /* currently there are no indexes */ ! CatalogUpdateIndexes(lRel, rTuple); ! #endif ! } ! } ! heap_endscan(scan); ! ! /* ! * We do NOT release the lock on pg_listener here; we need to hold it ! * until end of transaction (which is about to happen, anyway) to ensure ! * that other backends see our tuple updates when they look. Otherwise, a ! * transaction started after this one might mistakenly think it doesn't ! * need to send this backend a new NOTIFY. ! */ ! heap_close(lRel, NoLock); ! ! CommitTransactionCommand(); /* * Must flush the notify messages to ensure frontend gets them promptly. --- 1815,1821 ---- notifyInterruptOccurred = 0; ! asyncQueueReadAllNotifications(); /* * Must flush the notify messages to ensure frontend gets them promptly. *************** *** 1051,1070 **** * Send NOTIFY message to my front end. */ static void ! NotifyMyFrontEnd(char *relname, int32 listenerPID) { if (whereToSendOutput == DestRemote) { StringInfoData buf; pq_beginmessage(&buf, 'A'); ! pq_sendint(&buf, listenerPID, sizeof(int32)); ! pq_sendstring(&buf, relname); if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) ! { ! /* XXX Add parameter string here later */ ! pq_sendstring(&buf, ""); ! } pq_endmessage(&buf); /* --- 1835,1851 ---- * Send NOTIFY message to my front end. */ static void ! NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) { if (whereToSendOutput == DestRemote) { StringInfoData buf; pq_beginmessage(&buf, 'A'); ! pq_sendint(&buf, srcPid, sizeof(int32)); ! pq_sendstring(&buf, channel); if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) ! pq_sendstring(&buf, payload); pq_endmessage(&buf); /* *************** *** 1074,1096 **** */ } else ! elog(INFO, "NOTIFY for %s", relname); } ! /* Does pendingNotifies include the given relname? */ static bool ! AsyncExistsPendingNotify(const char *relname) { ListCell *p; ! foreach(p, pendingNotifies) ! { ! const char *prelname = (const char *) lfirst(p); ! if (strcmp(prelname, relname) == 0) return true; } return false; } --- 1855,1919 ---- */ } else ! elog(INFO, "NOTIFY for %s", channel); } ! /* Does pendingNotifies include the given channel/payload? */ static bool ! AsyncExistsPendingNotify(const char *channel, const char *payload) { ListCell *p; + Notification *n; ! if (pendingNotifies == NIL) ! return false; ! ! if (payload == NULL) ! payload = ""; ! /* ! * We need to append new elements to the end of the list in order to keep ! * the order. However, on the other hand we'd like to check the list ! * backwards in order to make duplicate-elimination a tad faster when the ! * same condition is signaled many times in a row. So as a compromise we ! * check the tail element first which we can access directly. If this ! * doesn't match, we check the rest of whole list. ! * ! * As we are not checking our parents' lists, we can still get duplicates ! * in combination with subtransactions, like in: ! * ! * begin; ! * notify foo '1'; ! * savepoint foo; ! * notify foo '1'; ! * commit; ! */ ! n = (Notification *) llast(pendingNotifies); ! if (strcmp(n->channel, channel) == 0) ! { ! Assert(n->payload != NULL); ! if (strcmp(n->payload, payload) == 0) return true; } + /* + * Note the difference to foreach(). We stop if p is the last element + * already. So we don't check the last element, we have checked it already. + */ + for(p = list_head(pendingNotifies); + p != list_tail(pendingNotifies); + p = lnext(p)) + { + n = (Notification *) lfirst(p); + + if (strcmp(n->channel, channel) == 0) + { + Assert(n->payload != NULL); + if (strcmp(n->payload, payload) == 0) + return true; + } + } + return false; } *************** *** 1107,1128 **** */ pendingActions = NIL; pendingNotifies = NIL; - } ! /* ! * 2PC processing routine for COMMIT PREPARED case. ! * ! * (We don't have to do anything for ROLLBACK PREPARED.) ! */ ! void ! notify_twophase_postcommit(TransactionId xid, uint16 info, ! void *recdata, uint32 len) ! { ! /* ! * Set up to issue the NOTIFY at the end of my own current transaction. ! * (XXX this has some issues if my own transaction later rolls back, or if ! * there is any significant delay before I commit. OK for now because we ! * disallow COMMIT PREPARED inside a transaction block.) ! */ ! Async_Notify((char *) recdata); } --- 1930,1937 ---- */ pendingActions = NIL; pendingNotifies = NIL; ! backendHasSentNotifications = false; ! backendHasExecutedInitialListen = false; } + diff -cr cvs.head/src/backend/nodes/copyfuncs.c cvs.build/src/backend/nodes/copyfuncs.c *** cvs.head/src/backend/nodes/copyfuncs.c 2010-02-14 16:02:46.000000000 +0100 --- cvs.build/src/backend/nodes/copyfuncs.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 2777,2782 **** --- 2777,2783 ---- NotifyStmt *newnode = makeNode(NotifyStmt); COPY_STRING_FIELD(conditionname); + COPY_STRING_FIELD(payload); return newnode; } diff -cr cvs.head/src/backend/nodes/equalfuncs.c cvs.build/src/backend/nodes/equalfuncs.c *** cvs.head/src/backend/nodes/equalfuncs.c 2010-02-14 16:02:46.000000000 +0100 --- cvs.build/src/backend/nodes/equalfuncs.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 1325,1330 **** --- 1325,1331 ---- _equalNotifyStmt(NotifyStmt *a, NotifyStmt *b) { COMPARE_STRING_FIELD(conditionname); + COMPARE_STRING_FIELD(payload); return true; } diff -cr cvs.head/src/backend/nodes/outfuncs.c cvs.build/src/backend/nodes/outfuncs.c *** cvs.head/src/backend/nodes/outfuncs.c 2010-02-14 16:02:46.000000000 +0100 --- cvs.build/src/backend/nodes/outfuncs.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 1820,1825 **** --- 1820,1826 ---- WRITE_NODE_TYPE("NOTIFY"); WRITE_STRING_FIELD(conditionname); + WRITE_STRING_FIELD(payload); } static void diff -cr cvs.head/src/backend/nodes/readfuncs.c cvs.build/src/backend/nodes/readfuncs.c *** cvs.head/src/backend/nodes/readfuncs.c 2010-02-14 16:02:46.000000000 +0100 --- cvs.build/src/backend/nodes/readfuncs.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 231,236 **** --- 231,237 ---- READ_LOCALS(NotifyStmt); READ_STRING_FIELD(conditionname); + READ_STRING_FIELD(payload); READ_DONE(); } diff -cr cvs.head/src/backend/parser/gram.y cvs.build/src/backend/parser/gram.y *** cvs.head/src/backend/parser/gram.y 2010-02-14 16:02:47.000000000 +0100 --- cvs.build/src/backend/parser/gram.y 2010-02-14 16:04:25.000000000 +0100 *************** *** 400,406 **** %type Iconst SignedIconst %type Iconst_list ! %type Sconst comment_text %type RoleId opt_granted_by opt_boolean ColId_or_Sconst %type var_list %type ColId ColLabel var_name type_function_name param_name --- 400,406 ---- %type Iconst SignedIconst %type Iconst_list ! %type Sconst comment_text notify_payload %type RoleId opt_granted_by opt_boolean ColId_or_Sconst %type var_list %type ColId ColLabel var_name type_function_name param_name *************** *** 6123,6132 **** * *****************************************************************************/ ! NotifyStmt: NOTIFY ColId { NotifyStmt *n = makeNode(NotifyStmt); n->conditionname = $2; $$ = (Node *)n; } ; --- 6123,6138 ---- * *****************************************************************************/ ! notify_payload: ! Sconst { $$ = $1; } ! | /*EMPTY*/ { $$ = NULL; } ! ; ! ! NotifyStmt: NOTIFY ColId notify_payload { NotifyStmt *n = makeNode(NotifyStmt); n->conditionname = $2; + n->payload = $3; $$ = (Node *)n; } ; diff -cr cvs.head/src/backend/storage/ipc/ipci.c cvs.build/src/backend/storage/ipc/ipci.c *** cvs.head/src/backend/storage/ipc/ipci.c 2010-01-20 20:08:27.000000000 +0100 --- cvs.build/src/backend/storage/ipc/ipci.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 20,25 **** --- 20,26 ---- #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" + #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" *************** *** 225,230 **** --- 226,232 ---- */ BTreeShmemInit(); SyncScanShmemInit(); + AsyncShmemInit(); #ifdef EXEC_BACKEND diff -cr cvs.head/src/backend/storage/lmgr/lwlock.c cvs.build/src/backend/storage/lmgr/lwlock.c *** cvs.head/src/backend/storage/lmgr/lwlock.c 2010-01-05 12:39:29.000000000 +0100 --- cvs.build/src/backend/storage/lmgr/lwlock.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 24,29 **** --- 24,30 ---- #include "access/clog.h" #include "access/multixact.h" #include "access/subtrans.h" + #include "commands/async.h" #include "miscadmin.h" #include "pg_trace.h" #include "storage/ipc.h" *************** *** 174,179 **** --- 175,183 ---- /* multixact.c needs two SLRU areas */ numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; + /* async.c needs one per page for the AsyncQueue */ + numLocks += NUM_ASYNC_BUFFERS; + /* * Add any requested by loadable modules; for backwards-compatibility * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if diff -cr cvs.head/src/backend/tcop/utility.c cvs.build/src/backend/tcop/utility.c *** cvs.head/src/backend/tcop/utility.c 2010-01-30 22:06:36.000000000 +0100 --- cvs.build/src/backend/tcop/utility.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 930,936 **** NotifyStmt *stmt = (NotifyStmt *) parsetree; PreventCommandDuringRecovery(); ! Async_Notify(stmt->conditionname); } break; --- 930,936 ---- NotifyStmt *stmt = (NotifyStmt *) parsetree; PreventCommandDuringRecovery(); ! Async_Notify(stmt->conditionname, stmt->payload); } break; diff -cr cvs.head/src/bin/initdb/initdb.c cvs.build/src/bin/initdb/initdb.c *** cvs.head/src/bin/initdb/initdb.c 2010-01-30 22:06:37.000000000 +0100 --- cvs.build/src/bin/initdb/initdb.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 2458,2463 **** --- 2458,2464 ---- "pg_xlog", "pg_xlog/archive_status", "pg_clog", + "pg_notify", "pg_subtrans", "pg_twophase", "pg_multixact/members", diff -cr cvs.head/src/bin/psql/common.c cvs.build/src/bin/psql/common.c *** cvs.head/src/bin/psql/common.c 2010-01-05 12:39:33.000000000 +0100 --- cvs.build/src/bin/psql/common.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 555,562 **** while ((notify = PQnotifies(pset.db))) { ! fprintf(pset.queryFout, _("Asynchronous notification \"%s\" received from server process with PID %d.\n"), ! notify->relname, notify->be_pid); fflush(pset.queryFout); PQfreemem(notify); } --- 555,562 ---- while ((notify = PQnotifies(pset.db))) { ! fprintf(pset.queryFout, _("Asynchronous notification \"%s\" (%s) received from server process with PID %d.\n"), ! notify->relname, notify->extra, notify->be_pid); fflush(pset.queryFout); PQfreemem(notify); } diff -cr cvs.head/src/bin/psql/tab-complete.c cvs.build/src/bin/psql/tab-complete.c *** cvs.head/src/bin/psql/tab-complete.c 2010-01-30 22:06:37.000000000 +0100 --- cvs.build/src/bin/psql/tab-complete.c 2010-02-14 16:04:25.000000000 +0100 *************** *** 1852,1858 **** /* NOTIFY */ else if (pg_strcasecmp(prev_wd, "NOTIFY") == 0) ! COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s'"); /* OPTIONS */ else if (pg_strcasecmp(prev_wd, "OPTIONS") == 0) --- 1852,1858 ---- /* NOTIFY */ else if (pg_strcasecmp(prev_wd, "NOTIFY") == 0) ! COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s'"); /* OPTIONS */ else if (pg_strcasecmp(prev_wd, "OPTIONS") == 0) *************** *** 2093,2099 **** /* UNLISTEN */ else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0) ! COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(relname) FROM pg_catalog.pg_listener WHERE substring(pg_catalog.quote_ident(relname),1,%d)='%s' UNION SELECT '*'"); /* UPDATE */ /* If prev. word is UPDATE suggest a list of tables */ --- 2093,2099 ---- /* UNLISTEN */ else if (pg_strcasecmp(prev_wd, "UNLISTEN") == 0) ! COMPLETE_WITH_QUERY("SELECT pg_catalog.quote_ident(channel) FROM pg_catalog.pg_listening() AS channel WHERE substring(pg_catalog.quote_ident(channel),1,%d)='%s' UNION SELECT '*'"); /* UPDATE */ /* If prev. word is UPDATE suggest a list of tables */ diff -cr cvs.head/src/include/access/slru.h cvs.build/src/include/access/slru.h *** cvs.head/src/include/access/slru.h 2010-01-05 12:39:34.000000000 +0100 --- cvs.build/src/include/access/slru.h 2010-02-14 16:04:25.000000000 +0100 *************** *** 16,21 **** --- 16,40 ---- #include "access/xlogdefs.h" #include "storage/lwlock.h" + /* + * Define segment size. A page is the same BLCKSZ as is used everywhere + * else in Postgres. The segment size can be chosen somewhat arbitrarily; + * we make it 32 pages by default, or 256Kb, i.e. 1M transactions for CLOG + * or 64K transactions for SUBTRANS. + * + * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, + * page numbering also wraps around at 0xFFFFFFFF/xxxx_XACTS_PER_PAGE (where + * xxxx is CLOG or SUBTRANS, respectively), and segment numbering at + * 0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need + * take no explicit notice of that fact in this module, except when comparing + * segment and page numbers in SimpleLruTruncate (see PagePrecedes()). + * + * Note: this file currently assumes that segment file names will be four + * hex digits. This sets a lower bound on the segment size (64K transactions + * for 32-bit TransactionIds). + */ + #define SLRU_PAGES_PER_SEGMENT 32 + /* * Page status codes. Note that these do not include the "dirty" bit. diff -cr cvs.head/src/include/access/twophase_rmgr.h cvs.build/src/include/access/twophase_rmgr.h *** cvs.head/src/include/access/twophase_rmgr.h 2010-01-05 12:39:34.000000000 +0100 --- cvs.build/src/include/access/twophase_rmgr.h 2010-02-14 16:04:25.000000000 +0100 *************** *** 23,31 **** */ #define TWOPHASE_RM_END_ID 0 #define TWOPHASE_RM_LOCK_ID 1 ! #define TWOPHASE_RM_NOTIFY_ID 2 ! #define TWOPHASE_RM_PGSTAT_ID 3 ! #define TWOPHASE_RM_MULTIXACT_ID 4 #define TWOPHASE_RM_MAX_ID TWOPHASE_RM_MULTIXACT_ID extern const TwoPhaseCallback twophase_recover_callbacks[]; --- 23,30 ---- */ #define TWOPHASE_RM_END_ID 0 #define TWOPHASE_RM_LOCK_ID 1 ! #define TWOPHASE_RM_PGSTAT_ID 2 ! #define TWOPHASE_RM_MULTIXACT_ID 3 #define TWOPHASE_RM_MAX_ID TWOPHASE_RM_MULTIXACT_ID extern const TwoPhaseCallback twophase_recover_callbacks[]; diff -cr cvs.head/src/include/catalog/pg_proc.h cvs.build/src/include/catalog/pg_proc.h *** cvs.head/src/include/catalog/pg_proc.h 2010-02-10 20:33:17.000000000 +0100 --- cvs.build/src/include/catalog/pg_proc.h 2010-02-14 16:04:25.000000000 +0100 *************** *** 4127,4132 **** --- 4127,4136 ---- DESCR("get the prepared statements for this session"); DATA(insert OID = 2511 ( pg_cursor PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,16,16,16,1184}" "{o,o,o,o,o,o}" "{name,statement,is_holdable,is_binary,is_scrollable,creation_time}" _null_ pg_cursor _null_ _null_ _null_ )); DESCR("get the open cursors for this session"); + DATA(insert OID = 3036 ( pg_listening PGNSP PGUID 12 1 10 0 f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ pg_listening _null_ _null_ _null_ )); + DESCR("get the channels that the current backend listens to"); + DATA(insert OID = 3035 ( pg_notify PGNSP PGUID 12 1 0 0 f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_)); + DESCR("send a notification to clients"); DATA(insert OID = 2599 ( pg_timezone_abbrevs PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,1186,16}" "{o,o,o}" "{abbrev,utc_offset,is_dst}" _null_ pg_timezone_abbrevs _null_ _null_ _null_ )); DESCR("get the available time zone abbreviations"); DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 f f f t t s 0 0 2249 "" "{25,25,1186,16}" "{o,o,o,o}" "{name,abbrev,utc_offset,is_dst}" _null_ pg_timezone_names _null_ _null_ _null_ )); diff -cr cvs.head/src/include/commands/async.h cvs.build/src/include/commands/async.h *** cvs.head/src/include/commands/async.h 2010-01-05 12:39:35.000000000 +0100 --- cvs.build/src/include/commands/async.h 2010-02-14 16:48:57.000000000 +0100 *************** *** 13,28 **** #ifndef ASYNC_H #define ASYNC_H extern bool Trace_notify; /* notify-related SQL statements */ ! extern void Async_Notify(const char *relname); extern void Async_Listen(const char *relname); extern void Async_Unlisten(const char *relname); extern void Async_UnlistenAll(void); /* perform (or cancel) outbound notify processing at transaction commit */ ! extern void AtCommit_Notify(void); extern void AtAbort_Notify(void); extern void AtSubStart_Notify(void); extern void AtSubCommit_Notify(void); --- 13,41 ---- #ifndef ASYNC_H #define ASYNC_H + /* + * Maximum size of the payload, including terminating NULL. + */ + #define NOTIFY_PAYLOAD_MAX_LENGTH 8000 + + /* + * The number of page slots that we reserve. + */ + #define NUM_ASYNC_BUFFERS 8 + extern bool Trace_notify; + extern void AsyncShmemInit(void); + /* notify-related SQL statements */ ! extern void Async_Notify(const char *relname, const char *payload); extern void Async_Listen(const char *relname); extern void Async_Unlisten(const char *relname); extern void Async_UnlistenAll(void); /* perform (or cancel) outbound notify processing at transaction commit */ ! extern void AtCommit_NotifyBeforeCommit(void); ! extern void AtCommit_NotifyAfterCommit(void); extern void AtAbort_Notify(void); extern void AtSubStart_Notify(void); extern void AtSubCommit_Notify(void); *************** *** 43,46 **** --- 56,62 ---- extern void notify_twophase_postcommit(TransactionId xid, uint16 info, void *recdata, uint32 len); + extern Datum pg_listening(PG_FUNCTION_ARGS); + extern Datum pg_notify(PG_FUNCTION_ARGS); + #endif /* ASYNC_H */ diff -cr cvs.head/src/include/nodes/parsenodes.h cvs.build/src/include/nodes/parsenodes.h *** cvs.head/src/include/nodes/parsenodes.h 2010-02-14 16:02:49.000000000 +0100 --- cvs.build/src/include/nodes/parsenodes.h 2010-02-14 16:04:25.000000000 +0100 *************** *** 2097,2102 **** --- 2097,2103 ---- { NodeTag type; char *conditionname; /* condition name to notify */ + char *payload; /* the payload string to be conveyed */ } NotifyStmt; /* ---------------------- diff -cr cvs.head/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h *** cvs.head/src/include/storage/lwlock.h 2010-02-10 20:33:18.000000000 +0100 --- cvs.build/src/include/storage/lwlock.h 2010-02-14 16:04:25.000000000 +0100 *************** *** 68,73 **** --- 68,75 ---- AutovacuumScheduleLock, SyncScanLock, RelationMappingLock, + AsyncCtlLock, + AsyncQueueLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff -cr cvs.head/src/include/utils/errcodes.h cvs.build/src/include/utils/errcodes.h *** cvs.head/src/include/utils/errcodes.h 2010-01-05 12:39:36.000000000 +0100 --- cvs.build/src/include/utils/errcodes.h 2010-02-14 16:04:25.000000000 +0100 *************** *** 318,323 **** --- 318,324 ---- #define ERRCODE_STATEMENT_TOO_COMPLEX MAKE_SQLSTATE('5','4', '0','0','1') #define ERRCODE_TOO_MANY_COLUMNS MAKE_SQLSTATE('5','4', '0','1','1') #define ERRCODE_TOO_MANY_ARGUMENTS MAKE_SQLSTATE('5','4', '0','2','3') + #define ERRCODE_TOO_MANY_ENTRIES MAKE_SQLSTATE('5','4', '0','3','1') /* Class 55 - Object Not In Prerequisite State (class borrowed from DB2) */ #define ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE MAKE_SQLSTATE('5','5', '0','0','0') diff -cr cvs.head/src/test/regress/expected/guc.out cvs.build/src/test/regress/expected/guc.out *** cvs.head/src/test/regress/expected/guc.out 2009-11-22 06:20:41.000000000 +0100 --- cvs.build/src/test/regress/expected/guc.out 2010-02-14 16:04:25.000000000 +0100 *************** *** 532,540 **** CREATE ROLE temp_reset_user; SET SESSION AUTHORIZATION temp_reset_user; -- look changes ! SELECT relname FROM pg_listener; ! relname ! ----------- foo_event (1 row) --- 532,540 ---- CREATE ROLE temp_reset_user; SET SESSION AUTHORIZATION temp_reset_user; -- look changes ! SELECT pg_listening(); ! pg_listening ! -------------- foo_event (1 row) *************** *** 571,579 **** -- discard everything DISCARD ALL; -- look again ! SELECT relname FROM pg_listener; ! relname ! --------- (0 rows) SELECT name FROM pg_prepared_statements; --- 571,579 ---- -- discard everything DISCARD ALL; -- look again ! SELECT pg_listening(); ! pg_listening ! -------------- (0 rows) SELECT name FROM pg_prepared_statements; diff -cr cvs.head/src/test/regress/expected/sanity_check.out cvs.build/src/test/regress/expected/sanity_check.out *** cvs.head/src/test/regress/expected/sanity_check.out 2010-01-20 20:08:32.000000000 +0100 --- cvs.build/src/test/regress/expected/sanity_check.out 2010-02-14 16:04:25.000000000 +0100 *************** *** 107,113 **** pg_language | t pg_largeobject | t pg_largeobject_metadata | t - pg_listener | f pg_namespace | t pg_opclass | t pg_operator | t --- 107,112 ---- *************** *** 154,160 **** timetz_tbl | f tinterval_tbl | f varchar_tbl | f ! (143 rows) -- -- another sanity check: every system catalog that has OIDs should have --- 153,159 ---- timetz_tbl | f tinterval_tbl | f varchar_tbl | f ! (142 rows) -- -- another sanity check: every system catalog that has OIDs should have diff -cr cvs.head/src/test/regress/sql/guc.sql cvs.build/src/test/regress/sql/guc.sql *** cvs.head/src/test/regress/sql/guc.sql 2009-10-21 22:38:58.000000000 +0200 --- cvs.build/src/test/regress/sql/guc.sql 2010-02-14 16:04:25.000000000 +0100 *************** *** 165,171 **** CREATE ROLE temp_reset_user; SET SESSION AUTHORIZATION temp_reset_user; -- look changes ! SELECT relname FROM pg_listener; SELECT name FROM pg_prepared_statements; SELECT name FROM pg_cursors; SHOW vacuum_cost_delay; --- 165,171 ---- CREATE ROLE temp_reset_user; SET SESSION AUTHORIZATION temp_reset_user; -- look changes ! SELECT pg_listening(); SELECT name FROM pg_prepared_statements; SELECT name FROM pg_cursors; SHOW vacuum_cost_delay; *************** *** 174,180 **** -- discard everything DISCARD ALL; -- look again ! SELECT relname FROM pg_listener; SELECT name FROM pg_prepared_statements; SELECT name FROM pg_cursors; SHOW vacuum_cost_delay; --- 174,180 ---- -- discard everything DISCARD ALL; -- look again ! SELECT pg_listening(); SELECT name FROM pg_prepared_statements; SELECT name FROM pg_cursors; SHOW vacuum_cost_delay;