All this does is put all the trigger events given to it after a certain
number out to disk in DeferredTriggerAddEvent; then read them as
necessary in DeferredTriggerInvokeExec.
The certain number after which it moves to disk is currently, &
arbiterily, 100000 defined by this line in trigger.h:
#define max_deferred_trigger_mem_queue 100000
It has passed the regression checks on Linux (RH9ish) and cygwin.
Also include is a quick test script I used
This is my first patch of any size so please review carefully.
regards,
- Stuart
*** trigger.c.orig Wed Jun 25 19:39:26 2003
--- trigger.c Fri Jun 27 20:50:20 2003
***************
*** 38,43 ****
--- 38,53 ----
#include "utils/lsyscache.h"
#include "utils/syscache.h"
+ #ifndef PG_TEMP_FILE_PREFIX
+ #define PG_TEMP_FILE_PREFIX "pgsql_tmp"
+ #endif
+
+ static void deferredTriggerClean();
+ static DeferredTriggerEvent deferredTriggerDiskImmReadNextEvent();
+ static void deferredTriggerDiskImmWriteDoneEvent(DeferredTriggerEvent event);
+ static void deferredTriggerDiskImmClean();
+ static void deferredTriggerDiskImmReset();
+ static void deferredTriggerWriteEvent(DeferredTriggerEvent event);
static void InsertTrigger(TriggerDesc *trigdesc, Trigger *trigger, int indx);
static HeapTuple GetTupleForTrigger(EState *estate,
***************
*** 1601,1606 ****
--- 1611,1620 ----
* ----------
*/
+ /*
+ * Note count is not a correct count>=max_deferred_trigger_mem_queue
+ */
+
typedef struct DeferredTriggersData {
/* Internal data is held in a per-transaction memory context */
MemoryContext deftrig_cxt;
***************
*** 1613,1618 ****
--- 1627,1642 ----
DeferredTriggerEvent deftrig_events;
DeferredTriggerEvent deftrig_events_imm;
DeferredTriggerEvent deftrig_event_tail;
+ int num_files;
+ long *bytes_written;
+ FILE *file_handle;
+ long count;
+ char *filename;
+ FILE *imm_file_handle;
+ long imm_filepos;
+ int imm_filenum;
+ char *imm_filename;
+ int imm_szrd;
} DeferredTriggersData;
/* ----------
***************
*** 1638,1643 ****
--- 1662,2009 ----
static DeferredTriggers deferredTriggers;
+ /*
+ * Records an event done back into the disk queue
+ * requires event passed to it for speed rather than
+ * rereading it, then marking as done
+ * also writes the entire event to make sure nothing is missed
+ * assumes event size is the same as original
+ */
+ static void deferredTriggerDiskImmWriteDoneEvent(DeferredTriggerEvent event)
+ {
+ int bw,rt,eln,c;
+ FILE *fp;
+ void *buf,*cbp;
+ eln=(sizeof(int32)<<1)+sizeof(Oid)+(sizeof(ItemPointerData)<<1)+
+ (event->dte_n_items*sizeof(DeferredTriggerEventItem));
+ buf=palloc(eln);
+ if (!(buf))
+ elog(ERROR, "Could not allocate buffer for writing deferred event done");
+ fp=deferredTriggers->imm_file_handle;
+ if (!(fp))
+ elog(ERROR, "NULL file handle for done deferred trigger write");
+ rt=fseek(fp,deferredTriggers->imm_filepos,SEEK_SET);
+ if (rt!=0)
+ elog(ERROR,"Unable to seek for write done deferred trigger. Err=%i",errno);
+ *((int32 *) buf)=event->dte_event;
+ cbp=buf+sizeof(int32);
+ *((Oid *) cbp)=event->dte_relid;
+ cbp+=sizeof(Oid);
+ *((ItemPointerData *) cbp)=event->dte_oldctid;
+ cbp+=sizeof(ItemPointerData);
+ *((ItemPointerData *) cbp)=event->dte_newctid;
+ cbp+=sizeof(ItemPointerData);
+ *((int32 *) cbp)=event->dte_n_items;
+ cbp+=sizeof(int32);
+ /*
+ * copying all the items to the buffer
+ */
+ for(c=0;c<event->dte_n_items;c++)
+ {
+ *((DeferredTriggerEventItem *) cbp)=event->dte_item[c];
+ cbp+=sizeof(DeferredTriggerEventItem);
+ }
+ bw=fwrite(buf,1,eln,fp);
+ if (bw!=eln)
+ elog(ERROR,"deferred trigger write done should have returned %i, actual %i",
+ bw,eln);
+ rt=fseek(fp,deferredTriggers->imm_filepos +
+ deferredTriggers->imm_szrd,SEEK_SET);
+ if (rt!=0)
+ elog(ERROR,"Unable to seek after done deferred trigger. Err=%i",errno);
+ }
+
+ /*
+ * DeferredTriggerEvent deferredTriggerDiskImmReadNextEvent()
+ *
+ * Returns the next deferred trigger event from the disk queue.
+ * must be initialised with deferredTriggerDiskImmReset()
+ */
+ static DeferredTriggerEvent deferredTriggerDiskImmReadNextEvent()
+ {
+ void *buf,*cbp;
+ int rdln,bufln,c,rt;
+ DeferredTriggerEvent event,oevent;
+ if ((deferredTriggers->imm_filenum<1) || (!(deferredTriggers->imm_file_handle)))
+ elog(ERROR,
+ "Attempt to read from disk deferred trigger queue before initialisation.");
+ /*
+ * Check to see if we need to advance to the next file
+ */
+ deferredTriggers->imm_filepos+=deferredTriggers->imm_szrd;
+ deferredTriggers->imm_szrd=0;
+
+ if (deferredTriggers->imm_filepos>=
+ *(deferredTriggers->bytes_written+deferredTriggers->imm_filenum-1))
+ {
+ /*
+ * Check to see if no more files. Return null (no more events)
+ */
+ if (deferredTriggers->imm_file_handle!=deferredTriggers->file_handle)
+ {
+ fclose(deferredTriggers->imm_file_handle);
+ deferredTriggers->imm_file_handle=NULL;
+ }
+ else
+ deferredTriggers->imm_file_handle=NULL;
+ if (deferredTriggers->imm_filenum==deferredTriggers->num_files)
+ return NULL;
+ /*
+ * otherwise advance to next file
+ */
+ deferredTriggers->imm_filenum++;
+ sprintf(deferredTriggers->imm_filename+
+ strlen(deferredTriggers->imm_filename)-9,"%09d",
+ deferredTriggers->imm_filenum);
+ if (strcmp(deferredTriggers->imm_filename,deferredTriggers->filename)==0)
+ deferredTriggers->imm_file_handle = deferredTriggers->file_handle;
+ else
+ deferredTriggers->imm_file_handle=
+ fopen(deferredTriggers->imm_filename, "rb+");
+ if (!(deferredTriggers->imm_file_handle))
+ elog(ERROR, "Can not open for reading first disk event file handle %s"
+ ,deferredTriggers->imm_filename);
+ deferredTriggers->imm_filepos=0;
+ }
+ event=buf=NULL;
+ event=palloc(sizeof(DeferredTriggerEventData));
+ if (!(event))
+ elog(ERROR,"Can not allocate event for deferred trigger queue disk read.");
+ bufln=(sizeof(int32)<<1)+sizeof(Oid)+(sizeof(ItemPointerData)<<1);
+ buf=palloc(bufln);
+ if (!(event))
+ elog(ERROR,"Can not allocate buf for deferred trigger queue disk read.");
+ rt=fseek(deferredTriggers->imm_file_handle, deferredTriggers->imm_filepos,
+ SEEK_SET);
+ if (rt!=0)
+ elog(ERROR,"Unable to seek for trigger at %i. Err=%i",
+ deferredTriggers->imm_filepos,errno);
+ rdln=fread(buf,1,bufln,deferredTriggers->imm_file_handle);
+ if (rdln!=bufln)
+ elog(ERROR,"deferred trigger fread should have returned %i, actual %i. Err=%i. imm_filepos=%i, bytes_written=%i,
ftell=%i",
+ bufln,rdln,errno,(int) deferredTriggers->imm_filepos
+ ,(int) *(deferredTriggers->bytes_written+deferredTriggers->imm_filenum-1)
+ ,(int) ftell(deferredTriggers->imm_file_handle));
+ deferredTriggers->imm_szrd=bufln;
+ event->dte_event=*((int32 *) buf);
+ cbp=buf+sizeof(int32);
+ event->dte_relid=*((Oid *) cbp);
+ cbp+=sizeof(Oid);
+ event->dte_oldctid=*((ItemPointerData *) cbp);
+ cbp+=sizeof(ItemPointerData);
+ event->dte_newctid=*((ItemPointerData *) cbp);
+ cbp+=sizeof(ItemPointerData);
+ event->dte_n_items=*((int32 *) cbp);
+ pfree(buf);
+ bufln=event->dte_n_items*sizeof(DeferredTriggerEventItem);
+ buf=palloc(bufln);
+ if (!(event))
+ elog(ERROR,"Can not allocate buf for deferred trigger queue disk read.");
+ rdln=fread(buf,1,bufln,deferredTriggers->imm_file_handle);
+ if (rdln!=bufln)
+ elog(ERROR,"deferred trigger fread should have returned %i, actual %i",
+ bufln,rdln);
+ deferredTriggers->imm_szrd+=bufln;
+ if (event->dte_n_items>1)
+ {
+ oevent=event;
+ event=palloc(sizeof(DeferredTriggerEventData)+
+ sizeof(DeferredTriggerEventItem));
+ if (!(event->dte_item))
+ elog(ERROR,
+ "Could not allocate event->dte_item for deferred trigger disk read");
+ *event=*oevent;
+ pfree(oevent);
+ }
+ for(c=0,cbp=buf;c<event->dte_n_items;c++,cbp+=sizeof(DeferredTriggerEventItem))
+ event->dte_item[c]=*((DeferredTriggerEventItem *) cbp);
+ event->dte_next=NULL;
+ pfree(buf);
+ return event;
+ }
+
+
+ /*
+ * Cleans up after DiskImm Stuff
+ */
+ static void deferredTriggerDiskImmClean()
+ {
+ deferredTriggers->imm_szrd=0;
+ deferredTriggers->imm_filenum=0;
+ deferredTriggers->imm_filepos=0;
+ if (deferredTriggers->imm_file_handle)
+ {
+ if (deferredTriggers->imm_file_handle==deferredTriggers->file_handle)
+ deferredTriggers->imm_file_handle=NULL;
+ else
+ {
+ fclose(deferredTriggers->imm_file_handle);
+ deferredTriggers->imm_file_handle=NULL;
+ }
+ }
+ }
+
+ /*
+ * Cleans up everything that disk deferred trigger queue might have added
+ * (inc free memory & rm files).
+ */
+ static void deferredTriggerClean()
+ {
+ int c,rt;
+ deferredTriggers->count=0;
+ if (deferredTriggers->num_files<1)
+ return;
+
+ deferredTriggerDiskImmClean();
+
+ if (deferredTriggers->bytes_written)
+ {
+ pfree(deferredTriggers->bytes_written);
+ deferredTriggers->bytes_written=NULL;
+ }
+ fclose(deferredTriggers->file_handle);
+ deferredTriggers->file_handle=NULL;
+ for(c=1;c<=deferredTriggers->num_files;c++)
+ {
+ sprintf(deferredTriggers->filename+strlen(deferredTriggers->filename)-9,
+ "%09d", c);
+ rt=remove(deferredTriggers->filename);
+ if (rt!=0)
+ elog(DEBUG1,"Failed to remove %s. Err=%i"
+ ,deferredTriggers->filename,errno);
+ else
+ elog(DEBUG1,"Succesfully removed %s",deferredTriggers->filename);
+ }
+ if (deferredTriggers->imm_filename)
+ {
+ pfree(deferredTriggers->imm_filename);
+ deferredTriggers->imm_filename=NULL;
+ }
+ if (deferredTriggers->filename)
+ {
+ pfree(deferredTriggers->filename);
+ deferredTriggers->filename=NULL;
+ }
+
+ deferredTriggers->num_files=0;
+ }
+
+ /*
+ * Initialises the stuff for reading disk deferred triggers.
+ */
+ static void deferredTriggerDiskImmReset()
+ {
+ deferredTriggers->imm_szrd=0;
+ deferredTriggers->imm_filepos=0;
+ deferredTriggers->imm_filenum=1;
+
+ if (deferredTriggers->imm_file_handle)
+ {
+ if (deferredTriggers->imm_file_handle==deferredTriggers->file_handle)
+ deferredTriggers->imm_file_handle=NULL;
+ else
+ {
+ fclose(deferredTriggers->imm_file_handle);
+ deferredTriggers->imm_file_handle=NULL;
+ }
+ }
+ sprintf(deferredTriggers->imm_filename,"%s/pgsql_tmp/%sdeftrig_%i-000000001",
+ DatabasePath,PG_TEMP_FILE_PREFIX,(int) GetCurrentTransactionId());
+ if (strcmp(deferredTriggers->imm_filename,deferredTriggers->filename)==0)
+ deferredTriggers->imm_file_handle = deferredTriggers->file_handle;
+ else
+ deferredTriggers->imm_file_handle
+ = fopen(deferredTriggers->imm_filename, "rb+");
+ if (!(deferredTriggers->imm_file_handle))
+ elog(ERROR, "Can not open for reading first disk event file handle %s"
+ ,deferredTriggers->imm_filename);
+ }
+
+ /*
+ * deferredTriggerWriteEvent(DeferredTriggerEvent event)
+ *
+ * Writes to disk. Will automatically go to the next file
+ * but needs to have the first one already made
+ */
+ static void deferredTriggerWriteEvent(DeferredTriggerEvent event)
+ {
+ int eln,c,bw;
+ long *n;
+ void *buf,*cbp;
+ eln=(sizeof(int32)<<1)+sizeof(Oid)+(sizeof(ItemPointerData)<<1)+
+ (event->dte_n_items*sizeof(DeferredTriggerEventItem));
+ buf=palloc(eln);
+ if (!(buf))
+ elog(ERROR, "Could not allocated deferred trigger write buffer");
+ if (eln+*(deferredTriggers->bytes_written+
+ deferredTriggers->num_files-1)>1073741823)
+ {
+ deferredTriggers->num_files++;
+ n=(long *) MemoryContextAlloc(TopTransactionContext,
+ sizeof(long)*(deferredTriggers->num_files));
+ if (!n)
+ elog(ERROR, "Could not expand deferred trigger bytes written array");
+ for(c=0;c<deferredTriggers->num_files-1;c++)
+ *(n+c)=*(deferredTriggers->bytes_written+c);
+ pfree(deferredTriggers->bytes_written);
+ deferredTriggers->bytes_written=n;
+ *(deferredTriggers->bytes_written + deferredTriggers->num_files-1)=0;
+ if (deferredTriggers->file_handle!=deferredTriggers->imm_file_handle)
+ {
+ fclose(deferredTriggers->file_handle);
+ deferredTriggers->file_handle=NULL;
+ }
+ else
+ deferredTriggers->file_handle=NULL;
+ sprintf(deferredTriggers->filename+strlen(deferredTriggers->filename)-9,
+ "%09d", c);
+ deferredTriggers->file_handle=fopen(deferredTriggers->filename, "wb");
+ if (!(deferredTriggers->file_handle))
+ elog(ERROR, "Can not open deferred trigger event file %s"
+ ,deferredTriggers->filename);
+ }
+ /*
+ * paranoid check to see if there is a file handle (should always be)
+ */
+ if (!(deferredTriggers->file_handle))
+ {
+ elog(DEBUG1, "No File handle for deferred trigger file - should be here");
+ deferredTriggers->file_handle=fopen(deferredTriggers->filename, "wb");
+ if (!(deferredTriggers->file_handle))
+ elog(ERROR, "Can not open deferred trigger event file %s"
+ ,deferredTriggers->filename);
+ }
+ /*
+ * copy the data to a buffer
+ */
+ *((int32 *) buf)=event->dte_event;
+ cbp=buf+sizeof(int32);
+ *((Oid *) cbp)=event->dte_relid;
+ cbp+=sizeof(Oid);
+ *((ItemPointerData *) cbp)=event->dte_oldctid;
+ cbp+=sizeof(ItemPointerData);
+ *((ItemPointerData *) cbp)=event->dte_newctid;
+ cbp+=sizeof(ItemPointerData);
+ *((int32 *) cbp)=event->dte_n_items;
+ cbp+=sizeof(int32);
+ /*
+ * copying all the items to the buffer
+ */
+ for(c=0;c<event->dte_n_items;c++)
+ {
+ *((DeferredTriggerEventItem *) cbp)=event->dte_item[c];
+ cbp+=sizeof(DeferredTriggerEventItem);
+ }
+ if (fseek(deferredTriggers->file_handle,0,SEEK_END)!=0)
+ elog(ERROR,"Could not seek end of file for write event");
+ bw=fwrite(buf,1,eln,deferredTriggers->file_handle);
+ if (bw!=eln)
+ elog(ERROR,"deferred trigger write should have returned %i, actual %i",
+ bw,eln);
+ *(deferredTriggers->bytes_written+deferredTriggers->num_files-1)+=bw;
+ // fflush(deferredTriggers->file_handle);
+ }
+
/* ----------
* deferredTriggerCheckState()
*
***************
*** 1705,1727 ****
static void
deferredTriggerAddEvent(DeferredTriggerEvent event)
{
! /*
! * Since the event list could grow quite long, we keep track of the
! * list tail and append there, rather than just doing a stupid
! * "lappend". This avoids O(N^2) behavior for large numbers of events.
! */
! event->dte_next = NULL;
! if (deferredTriggers->deftrig_event_tail == NULL)
! {
! /* first list entry */
! deferredTriggers->deftrig_events = event;
! deferredTriggers->deftrig_event_tail = event;
! }
else
! {
! deferredTriggers->deftrig_event_tail->dte_next = event;
! deferredTriggers->deftrig_event_tail = event;
! }
}
--- 2071,2159 ----
static void
deferredTriggerAddEvent(DeferredTriggerEvent event)
{
! char *dskpth;
! /*
! * Since the event list could grow quite long, we keep track of the
! * list tail and append there, rather than just doing a stupid
! * "lappend". This avoids O(N^2) behavior for large numbers of events.
! */
! event->dte_next = NULL;
! if (deferredTriggers->deftrig_event_tail == NULL)
! {
! /* first list entry */
! deferredTriggers->count=0;
! deferredTriggers->num_files=0;
! deferredTriggers->deftrig_events = event;
! deferredTriggers->deftrig_event_tail = event;
! }
! else
! {
! if (deferredTriggers->count<max_deferred_trigger_mem_queue)
! {
! deferredTriggers->deftrig_event_tail->dte_next = event;
! deferredTriggers->deftrig_event_tail = event;
! deferredTriggers->count++;
! }
! /*
! * Record the rest of the deferred triggers to disk
! */
else
! {
! if (deferredTriggers->count==max_deferred_trigger_mem_queue)
! {
! /*
! * Use a false trigger event to say this is the end of mem store
! */
! elog(DEBUG1, "Createing disk queue for xid %i",
! (int) GetCurrentTransactionId());
! deferredTriggers->count++;
! deferredTriggers->num_files=1;
! sprintf(deferredTriggers->filename,
! "%s/pgsql_tmp/%sdeftrig_%i-000000001",
! DatabasePath,PG_TEMP_FILE_PREFIX,(int) GetCurrentTransactionId());
! deferredTriggers->bytes_written=(long *)
! MemoryContextAlloc(TopTransactionContext, sizeof(long));
! if (!(deferredTriggers->bytes_written))
! elog(ERROR,
! "Could not create deferred trigger bytes written array");
! *deferredTriggers->bytes_written=0;
! deferredTriggers->file_handle
! =fopen(deferredTriggers->filename, "wb+");
! if (deferredTriggers->file_handle==NULL)
! {
! dskpth=palloc(strlen(DatabasePath)+11);
! if (!(dskpth))
! elog(ERROR, "Can not allocate disk deferred trigger path");
! sprintf(dskpth,"%s/pgsql_tmp",DatabasePath);
! if (mkdir(dskpth,S_IREAD | S_IWRITE)==0)
! elog(DEBUG1, "Created temp directory %s",dskpth);
! pfree(dskpth);
! dskpth=NULL;
! deferredTriggers->file_handle
! = fopen(deferredTriggers->filename, "wb");
! if (!(deferredTriggers->file_handle))
! elog(ERROR,
! "Can not open first disk event file handle for %s"
! ,deferredTriggers->filename);
! elog(DEBUG1, "Created disk queue %s"
! ,deferredTriggers->filename);
! }
! else
! elog(DEBUG1, "Created disk queue file %s"
! ,deferredTriggers->filename);
! deferredTriggerWriteEvent(event);
! event->dte_event=TRIGGER_DEFERRED_ON_DISK;
! event->dte_next=NULL;
! deferredTriggers->deftrig_event_tail->dte_next = event;
! deferredTriggers->deftrig_event_tail = event;
! }
! else
! {
! deferredTriggerWriteEvent(event);
! pfree(event);
! }
! }
! }
}
***************
*** 1869,1882 ****
* the new states. See DeferredTriggerSetState.)
*/
- /* Make a per-tuple memory context for trigger function calls */
- per_tuple_context =
- AllocSetContextCreate(CurrentMemoryContext,
- "DeferredTriggerTupleContext",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
/*
* If immediate_only is true, then the only events that could need firing
* are those since deftrig_events_imm. (But if deftrig_events_imm is
--- 2301,2306 ----
***************
*** 1885,1903 ****
if (immediate_only && deferredTriggers->deftrig_events_imm != NULL)
{
prev_event = deferredTriggers->deftrig_events_imm;
! event = prev_event->dte_next;
}
else
{
prev_event = NULL;
event = deferredTriggers->deftrig_events;
}
while (event != NULL)
{
bool still_deferred_ones = false;
DeferredTriggerEvent next_event;
int i;
/*
* Check if event is already completely done.
--- 2309,2359 ----
if (immediate_only && deferredTriggers->deftrig_events_imm != NULL)
{
prev_event = deferredTriggers->deftrig_events_imm;
! if (prev_event->dte_event!=TRIGGER_DEFERRED_ON_DISK)
! {
! if (event->dte_event==TRIGGER_DEFERRED_ON_DISK)
! {
! prev_event=event;
! deferredTriggerDiskImmReset();
! }
! else
! {
! event = prev_event->dte_next;
! if (deferredTriggers->num_files>0)
! deferredTriggerDiskImmReset();
! }
! }
}
else
{
prev_event = NULL;
event = deferredTriggers->deftrig_events;
+ if (event)
+ if (event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+ {
+ prev_event=event;
+ deferredTriggerDiskImmReset();
+ }
+ if (deferredTriggers->num_files>0)
+ deferredTriggerDiskImmReset();
}
+ /* Make a per-tuple memory context for trigger function calls */
+ per_tuple_context =
+ AllocSetContextCreate(CurrentMemoryContext,
+ "DeferredTriggerTupleContext",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ if (prev_event)
+ if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+ event=deferredTriggerDiskImmReadNextEvent();
while (event != NULL)
{
bool still_deferred_ones = false;
DeferredTriggerEvent next_event;
int i;
+ int any_dn;
/*
* Check if event is already completely done.
***************
*** 1910,1915 ****
--- 2366,2372 ----
/*
* Check each trigger item in the event.
*/
+ any_dn=0;
for (i = 0; i < event->dte_n_items; i++)
{
if (event->dte_item[i].dti_state & TRIGGER_DEFERRED_DONE)
***************
*** 1966,1972 ****
--- 2423,2433 ----
per_tuple_context);
event->dte_item[i].dti_state |= TRIGGER_DEFERRED_DONE;
+ any_dn=1;
} /* end loop over items within event */
+ if ((prev_event)&&(any_dn))
+ if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+ deferredTriggerDiskImmWriteDoneEvent(event);
}
/*
***************
*** 1977,1987 ****
* we have to be careful about maintaining list validity here.
*/
next_event = event->dte_next;
if (still_deferred_ones)
{
/* Not done, keep in list */
! prev_event = event;
}
else
{
--- 2438,2455 ----
* we have to be careful about maintaining list validity here.
*/
next_event = event->dte_next;
+ if (prev_event)
+ if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+ next_event = deferredTriggerDiskImmReadNextEvent();
if (still_deferred_ones)
{
/* Not done, keep in list */
! if (prev_event==NULL)
! prev_event=event;
! else
! if (prev_event->dte_event!=TRIGGER_DEFERRED_ON_DISK)
! prev_event = event;
}
else
{
***************
*** 1989,1999 ****
if (immediate_only)
{
/* delink it from list and free it */
! if (prev_event)
prev_event->dte_next = next_event;
else
deferredTriggers->deftrig_events = next_event;
! pfree(event);
}
else
{
--- 2457,2468 ----
if (immediate_only)
{
/* delink it from list and free it */
! if (prev_event)
prev_event->dte_next = next_event;
else
deferredTriggers->deftrig_events = next_event;
! pfree(event);
! event=NULL;
}
else
{
***************
*** 2002,2011 ****
* mark the event done.
*/
event->dte_event |= TRIGGER_DEFERRED_DONE;
}
}
!
event = next_event;
}
/* Update list tail pointer in case we just deleted tail event */
--- 2471,2492 ----
* mark the event done.
*/
event->dte_event |= TRIGGER_DEFERRED_DONE;
+ if (prev_event)
+ if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+ deferredTriggerDiskImmWriteDoneEvent(event);
}
}
! if ((prev_event)&&(event))
! if (prev_event->dte_event==TRIGGER_DEFERRED_ON_DISK)
! pfree(event);
!
event = next_event;
+ if (event!=NULL)
+ if (event->dte_event==TRIGGER_DEFERRED_ON_DISK)
+ {
+ prev_event=event;
+ event = deferredTriggerDiskImmReadNextEvent();
+ }
}
/* Update list tail pointer in case we just deleted tail event */
***************
*** 2057,2062 ****
--- 2538,2547 ----
MemoryContext cxt = TopTransactionContext;
deferredTriggers = (DeferredTriggers) MemoryContextAlloc(TopTransactionContext,
sizeof(DeferredTriggersData));
+ deferredTriggers->filename = (char *)
+ MemoryContextAlloc(TopTransactionContext, strlen(DatabasePath)+128);
+ deferredTriggers->imm_filename = (char *)
+ MemoryContextAlloc(TopTransactionContext, strlen(DatabasePath)+128);
/*
* Create the per transaction memory context
***************
*** 2076,2081 ****
--- 2561,2574 ----
deferredTriggers->deftrig_events = NULL;
deferredTriggers->deftrig_events_imm = NULL;
deferredTriggers->deftrig_event_tail = NULL;
+ deferredTriggers->num_files=0;
+ deferredTriggers->bytes_written=NULL;
+ deferredTriggers->file_handle=NULL;
+ deferredTriggers->count=0;
+ deferredTriggers->imm_file_handle=NULL;
+ deferredTriggers->imm_filepos=0;
+ deferredTriggers->imm_filenum=0;
+ deferredTriggers->imm_szrd=0;
}
***************
*** 2094,2100 ****
*/
if (deferredTriggers == NULL)
return;
-
deferredTriggerInvokeEvents(true);
}
--- 2587,2592 ----
***************
*** 2115,2122 ****
if (deferredTriggers == NULL)
return;
! deferredTriggerInvokeEvents(false);
!
deferredTriggers = NULL;
}
--- 2607,2619 ----
if (deferredTriggers == NULL)
return;
!
! if (deferredTriggers->num_files>0)
! deferredTriggerDiskImmReset();
!
! deferredTriggerInvokeEvents(true);
! deferredTriggerDiskImmClean();
! deferredTriggerClean();
deferredTriggers = NULL;
}
***************
*** 2141,2146 ****
--- 2638,2645 ----
/*
* Forget everything we know about deferred triggers.
*/
+ deferredTriggerDiskImmClean();
+ deferredTriggerClean();
deferredTriggers = NULL;
}
*** trigger.h.orig Wed Jun 25 19:40:16 2003
--- trigger.h Thu Jun 26 10:06:56 2003
***************
*** 16,21 ****
--- 16,22 ----
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
+ #define max_deferred_trigger_mem_queue 100000
/*
* TriggerData is the node type that is passed as fmgr "context" info
* when a function is called by the trigger manager.
***************
*** 50,55 ****
--- 51,57 ----
#define TRIGGER_DEFERRED_DEFERRABLE 0x00000040
#define TRIGGER_DEFERRED_INITDEFERRED 0x00000080
#define TRIGGER_DEFERRED_HAS_BEFORE 0x00000100
+ #define TRIGGER_DEFERRED_ON_DISK 0x00000200
#define TRIGGER_FIRED_BY_INSERT(event) \
(((TriggerEvent) (event) & TRIGGER_EVENT_OPMASK) == \
DROP TABLE trig_tst1;
CREATE TABLE trig_tst1(
pk serial,
vli int4,
vlt text
);
DROP TABLE trig_tst2;
CREATE TABLE trig_tst2(
opr int4,
npk int4,
opk int4,
vli int4,
vlt text
);
CREATE OR REPLACE FUNCTION tf_ins() RETURNS TRIGGER AS '
BEGIN
INSERT INTO trig_tst2(opr,npk,vli,vlt) VALUES (1,NEW.pk,NEW.vli,NEW.vlt);
RETURN NEW;
END;
' LANGUAGE 'plpgsql' VOLATILE;
CREATE OR REPLACE FUNCTION tf_updt() RETURNS TRIGGER AS '
BEGIN
INSERT INTO trig_tst2(opr,npk,opk,vli,vlt) VALUES (2,NEW.pk,OLD.pk,NEW.vli,NEW.vlt);
RETURN NEW;
END;
' LANGUAGE 'plpgsql' VOLATILE;
CREATE OR REPLACE FUNCTION tf_DEL() RETURNS TRIGGER AS '
BEGIN
INSERT INTO trig_tst2(opr,opk) VALUES (3,OLD.pk);
RETURN NEW;
END;
' LANGUAGE 'plpgsql' VOLATILE;
INSERT INTO trig_tst1 (vli,vlt) VALUES (1,'tst');
INSERT INTO trig_tst1 (vli,vlt) VALUES (2,'abc');
INSERT INTO trig_tst1 (vli,vlt) VALUES (3,'xyz');
INSERT INTO trig_tst1 (vli,vlt) VALUES (0,'bob');
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+4,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+8,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+16,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+32,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+64,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+128,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+256,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+512,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+1024,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+2048,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+4096,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+8192,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+16384,vlt FROM trig_tst1;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+32768,vlt FROM trig_tst1;
CREATE TRIGGER tst_tf_ins AFTER INSERT ON trig_tst1 FOR EACH ROW EXECUTE PROCEDURE tf_ins();
CREATE TRIGGER tst_tf_updt AFTER UPDATE ON trig_tst1 FOR EACH ROW EXECUTE PROCEDURE tf_updt();
CREATE TRIGGER tst_tf_del AFTER DELETE ON trig_tst1 FOR EACH ROW EXECUTE PROCEDURE tf_del();
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+65536,vlt FROM trig_tst1;
SELECT count(*) FROM trig_tst2;
INSERT INTO trig_tst1 (vli,vlt) SELECT vli+131072,vlt FROM trig_tst1;
SELECT count(*) FROM trig_tst2;
TRUNCATE trig_tst2;
UPDATE trig_tst1 SET vli=vli*2;
TRUNCATE trig_tst2;