Re: Make COPY format extendable: Extract COPY TO format implementations
От | Sutou Kouhei |
---|---|
Тема | Re: Make COPY format extendable: Extract COPY TO format implementations |
Дата | |
Msg-id | 20250618.125920.377159334539867546.kou@clear-code.com обсуждение исходный текст |
Ответ на | Re: Make COPY format extendable: Extract COPY TO format implementations (Michael Paquier <michael@paquier.xyz>) |
Список | pgsql-hackers |
Hi, In <aFC5HmZHU5NCPuTL@paquier.xyz> "Re: Make COPY format extendable: Extract COPY TO format implementations" on Tue, 17 Jun 2025 09:38:54 +0900, Michael Paquier <michael@paquier.xyz> wrote: > On Tue, Jun 17, 2025 at 08:50:37AM +0900, Sutou Kouhei wrote: >> OK. I'll implement the initial version with this >> design. (Allocating IDs local not shared.) > > Sounds good to me. Thanks Sutou-san! I've attached the v41 patch set that uses the C API approach with local (not shared) COPY routine management. 0001: This is same as 0001 in the v40 patch set. It just cleans up CopySource and CopyDest enums. 0002: This is the initial version of this approach. Here are some discussion points: 1. This provides 2 registration APIs (RegisterCopy{From,To}Routine(name, routine)) instead of 1 registration API (RegisterCopyFormat(name, from_routine, to_routine)). It's for simple implementation and easy to extend without breaking APIs in the future. (And some formats may provide only FROM routine or TO routine.) Is this design acceptable? FYI: RegisterCopy{From,To}Routine() uses the same logic as RegisterExtensionExplainOption(). 2. This allocates IDs internally but doesn't provide APIs that get them. Because it's not needed for now. We can provide GetExplainExtensionId() like API when we need it. Is this design acceptable? 3. I want to register the built-in COPY {FROM,TO} routines in the PostgreSQL initialization phase. Where should we do it? In 0002, it's done in InitPostgres() but I'm not sure whether it's a suitable location or not. 4. 0002 adds CopyFormatOptions::routine as union: @@ -87,9 +91,14 @@ typedef struct CopyFormatOptions CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ int64 reject_limit; /* maximum tolerable number of errors */ List *convert_select; /* list of column names (can be NIL) */ + union + { + const struct CopyFromRoutine *from; /* for COPY FROM */ + const struct CopyToRoutine *to; /* for COPY TO */ + } routine; /* routine to process the specified format */ } CopyFormatOptions; Because one of Copy{From,To}Routine is only needed at once. Is this union usage strange in PostgreSQL? 5. 0002 adds InitializeCopy{From,To}Routines() and GetCopy{From,To}Routine() that aren't used by COPY {FROM,TO} routine implementations to copyapi.h. Should we move them to other .h? If so, which .h should be used for them? Thanks, -- kou From 78b0c3897e3c78988239dd149753ab55336d060c Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 25 Nov 2024 13:58:33 +0900 Subject: [PATCH v41 1/2] Export CopyDest as private data This is a preparation to export CopyToStateData as private data. CopyToStateData depends on CopyDest. So we need to export CopyDest too. But CopyDest and CopySource has the same names. So we can't export CopyDest as-is. This uses the COPY_DEST_ prefix for CopyDest enum values. CopySource uses the COPY_FROM_ prefix for consistency. --- src/backend/commands/copyfrom.c | 4 ++-- src/backend/commands/copyfromparse.c | 10 ++++---- src/backend/commands/copyto.c | 30 ++++++++---------------- src/include/commands/copyfrom_internal.h | 8 +++---- src/include/commands/copyto_internal.h | 28 ++++++++++++++++++++++ 5 files changed, 49 insertions(+), 31 deletions(-) create mode 100644 src/include/commands/copyto_internal.h diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index fbbbc09a97b..b4dad744547 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1709,7 +1709,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1837,7 +1837,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index f5fc346e201..9f7171d1478 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -180,7 +180,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -248,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -257,7 +257,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -340,7 +340,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -1172,7 +1172,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index ea6f18f2c80..99aec9c4c48 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -20,6 +20,7 @@ #include "access/tableam.h" #include "commands/copyapi.h" +#include "commands/copyto_internal.h" #include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -36,17 +37,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * Represents the different dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to file (or a piped program) */ - COPY_FRONTEND, /* to frontend */ - COPY_CALLBACK, /* to callback function */ -} CopyDest; - /* * This struct contains all the state variables used throughout a COPY TO * operation. @@ -69,7 +59,7 @@ typedef struct CopyToStateData /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ + FILE *copy_file; /* used if copy_dest == COPY_DEST_FILE */ StringInfo fe_msgbuf; /* used for all dests during COPY TO */ int file_encoding; /* file or remote side's character encoding */ @@ -401,7 +391,7 @@ SendCopyBegin(CopyToState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_FRONTEND; + cstate->copy_dest = COPY_DEST_FRONTEND; } static void @@ -448,7 +438,7 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -482,11 +472,11 @@ CopySendEndOfRow(CopyToState cstate) errmsg("could not write to COPY file: %m"))); } break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; - case COPY_CALLBACK: + case COPY_DEST_CALLBACK: cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -507,7 +497,7 @@ CopySendTextLikeEndOfRow(CopyToState cstate) { switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); @@ -515,7 +505,7 @@ CopySendTextLikeEndOfRow(CopyToState cstate) CopySendString(cstate, "\r\n"); #endif break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* The FE/BE protocol uses \n as newline for all platforms */ CopySendChar(cstate, '\n'); break; @@ -902,12 +892,12 @@ BeginCopyTo(ParseState *pstate, /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - cstate->copy_dest = COPY_FILE; /* default */ + cstate->copy_dest = COPY_DEST_FILE; /* default */ if (data_dest_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_dest = COPY_CALLBACK; + cstate->copy_dest = COPY_DEST_CALLBACK; cstate->data_dest_cb = data_dest_cb; } else if (pipe) diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index c8b22af22d8..24157e11a73 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -24,9 +24,9 @@ */ typedef enum CopySource { - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ } CopySource; /* @@ -64,7 +64,7 @@ typedef struct CopyFromStateData /* low-level state data */ CopySource copy_src; /* type of copy source */ FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_SOURCE_FRONTEND */ EolType eol_type; /* EOL type of input */ int file_encoding; /* file or remote side's character encoding */ diff --git a/src/include/commands/copyto_internal.h b/src/include/commands/copyto_internal.h new file mode 100644 index 00000000000..42ddb37a8a2 --- /dev/null +++ b/src/include/commands/copyto_internal.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * copyto_internal.h + * Internal definitions for COPY TO command. + * + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyto_internal.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYTO_INTERNAL_H +#define COPYTO_INTERNAL_H + +/* + * Represents the different dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_DEST_FILE, /* to file (or a piped program) */ + COPY_DEST_FRONTEND, /* to frontend */ + COPY_DEST_CALLBACK, /* to callback function */ +} CopyDest; + +#endif /* COPYTO_INTERNAL_H */ -- 2.49.0 From bdd45f68d7026fae757bcd7d6aec8f2b6644a846 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Wed, 18 Jun 2025 11:43:09 +0900 Subject: [PATCH v41 2/2] Add support for registering COPY {FROM,TO} routines This uses the C API approach like custom EXPLAIN option. Some of this are based on the custom EXPLAIN option implementations. This approach provides C API to register COPY {FROM,TO} routines: void RegisterCopyFromRoutine(const char *name, const CopyFromRoutine *routine); void RegisterCopyToRoutine(const char *name, const CopyToRoutine *routine); (They are based on RegisterExtensionExplainOption().) They assign an ID for each name internally but the current API set doesn't provide it to users. Because it's not needed for now. If it's needed, we can provide APIs for it like GetExplainExtensionId() for custom EXPLAIN option. This manages registered COPY {FROM,TO} routines in a process. Registered COPY {FROM,TO} routines aren't shared in multiple processes because it's not needed for now. We may revisit it when we need it. --- src/backend/commands/copy.c | 16 +- src/backend/commands/copyfrom.c | 108 ++++++++++-- src/backend/commands/copyto.c | 161 +++++++++++------- src/backend/utils/init/postinit.c | 5 + src/include/commands/copy.h | 11 +- src/include/commands/copyapi.h | 13 ++ src/include/commands/copyto_internal.h | 55 ++++++ src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_copy_format/.gitignore | 4 + src/test/modules/test_copy_format/Makefile | 21 +++ .../expected/test_copy_format.out | 19 +++ src/test/modules/test_copy_format/meson.build | 29 ++++ .../test_copy_format/sql/test_copy_format.sql | 8 + .../test_copy_format/test_copy_format.c | 91 ++++++++++ .../test_copy_format/test_copy_format.conf | 1 + 16 files changed, 466 insertions(+), 78 deletions(-) create mode 100644 src/test/modules/test_copy_format/.gitignore create mode 100644 src/test/modules/test_copy_format/Makefile create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out create mode 100644 src/test/modules/test_copy_format/meson.build create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format.c create mode 100644 src/test/modules/test_copy_format/test_copy_format.conf diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 74ae42b19a7..787a3bdf8a4 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -22,7 +22,7 @@ #include "access/table.h" #include "access/xact.h" #include "catalog/pg_authid.h" -#include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/defrem.h" #include "executor/executor.h" #include "mb/pg_wchar.h" @@ -524,13 +524,21 @@ ProcessCopyOptions(ParseState *pstate, if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) + if (strcmp(fmt, "csv") == 0) opts_out->csv_mode = true; else if (strcmp(fmt, "binary") == 0) opts_out->binary = true; + + if (is_from) + opts_out->routine.from = GetCopyFromRoutine(fmt); else + opts_out->routine.to = GetCopyToRoutine(fmt); + + /* + * We can use either opts_out->routine.from or .to here to check + * the nonexistent routine case. + */ + if (!opts_out->routine.from) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY format \"%s\" not recognized", fmt), diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index b4dad744547..72c96fc6ff6 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -103,6 +103,22 @@ typedef struct CopyMultiInsertInfo } CopyMultiInsertInfo; +/* + * Manage registered COPY FROM routines in a process. They aren't shared in + * multiple processes for now. We may do it later when it's needed. + */ + +typedef struct +{ + const char *name; + const CopyFromRoutine *routine; +} CopyFromRoutineEntry; + +static CopyFromRoutineEntry * CopyFromRoutineEntries = NULL; +static int CopyFromRoutineEntriesAssigned = 0; +static int CopyFromRoutineEntriesAllocated = 0; + + /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); @@ -151,17 +167,87 @@ static const CopyFromRoutine CopyFromRoutineBinary = { .CopyFromEnd = CopyFromBinaryEnd, }; -/* Return a COPY FROM routine for the given options */ -static const CopyFromRoutine * -CopyFromGetRoutine(const CopyFormatOptions *opts) +/* + * Register a new COPY FROM routine. + * + * When name is used as a COPY FROM format name, routine will be used to + * process the COPY FROM request. See CopyFromRoutine how to implement a COPY + * FROM routine. + * + * If name is already registered, registered routine is replaced with the + * given routine. + * + * name is assumed to be a constant string or allocated in storage that will + * never be freed. + * + * routine is assumed to be allocated in storage that will never be freed. + */ +void +RegisterCopyFromRoutine(const char *name, const CopyFromRoutine *routine) { - if (opts->csv_mode) - return &CopyFromRoutineCSV; - else if (opts->binary) - return &CopyFromRoutineBinary; + CopyFromRoutineEntry *entry; - /* default is text */ - return &CopyFromRoutineText; + /* Search for an existing routine by this name; if found, update handler. */ + for (int i = 0; i < CopyFromRoutineEntriesAssigned; ++i) + { + if (strcmp(CopyFromRoutineEntries[i].name, name) == 0) + { + CopyFromRoutineEntries[i].routine = routine; + return; + } + } + + /* If there is no array yet, create one. */ + if (!CopyFromRoutineEntries) + { + CopyFromRoutineEntriesAllocated = 16; + CopyFromRoutineEntries = + MemoryContextAlloc(TopMemoryContext, + sizeof(CopyFromRoutineEntry) * CopyFromRoutineEntriesAllocated); + } + + /* If there's an array but it's currently full, expand it. */ + if (CopyFromRoutineEntriesAssigned >= CopyFromRoutineEntriesAllocated) + { + int i = pg_nextpower2_32(CopyFromRoutineEntriesAssigned + 1); + + CopyFromRoutineEntries = + repalloc(CopyFromRoutineEntries, sizeof(CopyFromRoutineEntry) * i); + CopyFromRoutineEntriesAllocated = i; + } + + /* Assign new ID. */ + entry = &CopyFromRoutineEntries[CopyFromRoutineEntriesAssigned++]; + entry->name = name; + entry->routine = routine; +} + +/* + * Register built-in COPY FROM routines. + * + * This must be called only once in the initialization process. + */ +void +InitializeCopyFromRoutines(void) +{ + RegisterCopyFromRoutine("text", &CopyFromRoutineText); + RegisterCopyFromRoutine("csv", &CopyFromRoutineCSV); + RegisterCopyFromRoutine("binary", &CopyFromRoutineBinary); +} + +/* Return a COPY FROM routine for the given options */ +const CopyFromRoutine * +GetCopyFromRoutine(const char *name) +{ + for (int i = 0; i < CopyFromRoutineEntriesAssigned; ++i) + { + if (strcmp(CopyFromRoutineEntries[i].name, name) == 0) + { + return CopyFromRoutineEntries[i].routine; + } + } + + return NULL; } /* Implementation of the start callback for text and CSV formats */ @@ -1574,7 +1660,9 @@ BeginCopyFrom(ParseState *pstate, ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); /* Set the format routine */ - cstate->routine = CopyFromGetRoutine(&cstate->opts); + cstate->routine = cstate->opts.routine.from; + if (!cstate->routine) + cstate->routine = &CopyFromRoutineText; /* default is text */ /* Process the target relation */ cstate->rel = rel; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 99aec9c4c48..7ef690981cb 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -19,12 +19,9 @@ #include <sys/stat.h> #include "access/tableam.h" -#include "commands/copyapi.h" #include "commands/copyto_internal.h" #include "commands/progress.h" -#include "executor/execdesc.h" #include "executor/executor.h" -#include "executor/tuptable.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" @@ -37,56 +34,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * This struct contains all the state variables used throughout a COPY TO - * operation. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyToStateData -{ - /* format-specific routines */ - const CopyToRoutine *routine; - - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_DEST_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO */ - - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_dest_cb data_dest_cb; /* function for writing data */ - - CopyFormatOptions opts; - Node *whereClause; /* WHERE condition (or NULL) */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyToStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -99,6 +46,22 @@ typedef struct static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; +/* + * Manage registered COPY TO routines in a process. They aren't shared in + * multiple processes for now. We may do it later when it's needed. + */ + +typedef struct +{ + const char *name; + const CopyToRoutine *routine; +} CopyToRoutineEntry; + +static CopyToRoutineEntry * CopyToRoutineEntries = NULL; +static int CopyToRoutineEntriesAssigned = 0; +static int CopyToRoutineEntriesAllocated = 0; + + /* non-export function prototypes */ static void EndCopy(CopyToState cstate); static void ClosePipeToProgram(CopyToState cstate); @@ -131,6 +94,7 @@ static void CopySendTextLikeEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); + /* * COPY TO routines for built-in formats. * @@ -162,17 +126,86 @@ static const CopyToRoutine CopyToRoutineBinary = { .CopyToEnd = CopyToBinaryEnd, }; -/* Return a COPY TO routine for the given options */ -static const CopyToRoutine * -CopyToGetRoutine(const CopyFormatOptions *opts) +/* + * Register a new COPY TO routine. + * + * When name is used as a COPY TO format name, routine will be used to process + * the COPY TO request. See CopyToRoutine how to implement a COPY TO routine. + * + * If name is already registered, registered routine is replaced with the + * given routine. + * + * name is assumed to be a constant string or allocated in storage that will + * never be freed. + * + * routine is assumed to be allocated in storage that will never be freed. + */ +void +RegisterCopyToRoutine(const char *name, const CopyToRoutine *routine) { - if (opts->csv_mode) - return &CopyToRoutineCSV; - else if (opts->binary) - return &CopyToRoutineBinary; + CopyToRoutineEntry *entry; - /* default is text */ - return &CopyToRoutineText; + /* Search for an existing routine by this name; if found, update handler. */ + for (int i = 0; i < CopyToRoutineEntriesAssigned; ++i) + { + if (strcmp(CopyToRoutineEntries[i].name, name) == 0) + { + CopyToRoutineEntries[i].routine = routine; + return; + } + } + + /* If there is no array yet, create one. */ + if (!CopyToRoutineEntries) + { + CopyToRoutineEntriesAllocated = 16; + CopyToRoutineEntries = + MemoryContextAlloc(TopMemoryContext, + sizeof(CopyToRoutineEntry) * CopyToRoutineEntriesAllocated); + } + + /* If there's an array but it's currently full, expand it. */ + if (CopyToRoutineEntriesAssigned >= CopyToRoutineEntriesAllocated) + { + int i = pg_nextpower2_32(CopyToRoutineEntriesAssigned + 1); + + CopyToRoutineEntries = + repalloc(CopyToRoutineEntries, sizeof(CopyToRoutineEntry) * i); + CopyToRoutineEntriesAllocated = i; + } + + /* Assign new ID. */ + entry = &CopyToRoutineEntries[CopyToRoutineEntriesAssigned++]; + entry->name = name; + entry->routine = routine; +} + +/* + * Register built-in COPY TO routines. + * + * This must be called only once in the initialization process. + */ +void +InitializeCopyToRoutines(void) +{ + RegisterCopyToRoutine("text", &CopyToRoutineText); + RegisterCopyToRoutine("csv", &CopyToRoutineCSV); + RegisterCopyToRoutine("binary", &CopyToRoutineBinary); +} + +/* Return a COPY TO routine for the given options */ +const CopyToRoutine * +GetCopyToRoutine(const char *name) +{ + for (int i = 0; i < CopyToRoutineEntriesAssigned; ++i) + { + if (strcmp(CopyToRoutineEntries[i].name, name) == 0) + { + return CopyToRoutineEntries[i].routine; + } + } + + return NULL; } /* Implementation of the start callback for text and CSV formats */ @@ -693,7 +726,9 @@ BeginCopyTo(ParseState *pstate, ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); /* Set format routine */ - cstate->routine = CopyToGetRoutine(&cstate->opts); + cstate->routine = cstate->opts.routine.to; + if (!cstate->routine) + cstate->routine = &CopyToRoutineText; /* default is text */ /* Process the source/target relation or query */ if (rel) diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index c86ceefda94..d566f542d62 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -33,6 +33,7 @@ #include "catalog/pg_database.h" #include "catalog/pg_db_role_setting.h" #include "catalog/pg_tablespace.h" +#include "commands/copyapi.h" #include "libpq/auth.h" #include "libpq/libpq-be.h" #include "mb/pg_wchar.h" @@ -1217,6 +1218,10 @@ InitPostgres(const char *in_dbname, Oid dboid, /* Initialize this backend's session state. */ InitializeSession(); + /* Initialize COPY routines. */ + InitializeCopyFromRoutines(); + InitializeCopyToRoutines(); + /* * If this is an interactive session, load any libraries that should be * preloaded at backend start. Since those are determined by GUCs, this diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 06dfdfef721..88fa0703d0a 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -51,6 +51,10 @@ typedef enum CopyLogVerbosityChoice COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ } CopyLogVerbosityChoice; +/* These are in commands/copyapi.h */ +struct CopyFromRoutine; +struct CopyToRoutine; + /* * A struct to hold COPY options, in a parsed form. All of these are related * to formatting, except for 'freeze', which doesn't really belong here, but @@ -87,9 +91,14 @@ typedef struct CopyFormatOptions CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ int64 reject_limit; /* maximum tolerable number of errors */ List *convert_select; /* list of column names (can be NIL) */ + union + { + const struct CopyFromRoutine *from; /* for COPY FROM */ + const struct CopyToRoutine *to; /* for COPY TO */ + } routine; /* routine to process the specified format */ } CopyFormatOptions; -/* These are private in commands/copy[from|to].c */ +/* These are private in commands/copy[from|to]_internal.h */ typedef struct CopyFromStateData *CopyFromState; typedef struct CopyToStateData *CopyToState; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 2a2d2f9876b..39253d616d7 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -54,6 +54,13 @@ typedef struct CopyToRoutine void (*CopyToEnd) (CopyToState cstate); } CopyToRoutine; +extern void RegisterCopyToRoutine(const char *name, const CopyToRoutine *routine); + +/* TODO: Should we move the followings to other .h because they are not for + * custom COPY TO format extensions? */ +extern void InitializeCopyToRoutines(void); +extern const CopyToRoutine *GetCopyToRoutine(const char *name); + /* * API structure for a COPY FROM format implementation. Note this must be * allocated in a server-lifetime manner, typically as a static const struct. @@ -102,4 +109,10 @@ typedef struct CopyFromRoutine void (*CopyFromEnd) (CopyFromState cstate); } CopyFromRoutine; +extern void RegisterCopyFromRoutine(const char *name, const CopyFromRoutine *routine); +/* TODO: Should we move the followings to other .h because they are not for + * custom COPY FROM format extensions? */ +extern void InitializeCopyFromRoutines(void); +extern const CopyFromRoutine *GetCopyFromRoutine(const char *name); + #endif /* COPYAPI_H */ diff --git a/src/include/commands/copyto_internal.h b/src/include/commands/copyto_internal.h index 42ddb37a8a2..9dbbbc592b7 100644 --- a/src/include/commands/copyto_internal.h +++ b/src/include/commands/copyto_internal.h @@ -14,6 +14,11 @@ #ifndef COPYTO_INTERNAL_H #define COPYTO_INTERNAL_H +#include "commands/copyapi.h" +#include "executor/execdesc.h" +#include "executor/tuptable.h" +#include "nodes/execnodes.h" + /* * Represents the different dest cases we need to worry about at * the bottom level @@ -25,4 +30,54 @@ typedef enum CopyDest COPY_DEST_CALLBACK, /* to callback function */ } CopyDest; +/* + * This struct contains all the state variables used throughout a COPY TO + * operation. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyToStateData +{ + /* format-specific routines */ + const CopyToRoutine *routine; + + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_DEST_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO */ + + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ + + CopyFormatOptions opts; + Node *whereClause; /* WHERE condition (or NULL) */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyToStateData; + #endif /* COPYTO_INTERNAL_H */ diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index aa1d27bbed3..9bf5d58cdae 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -17,6 +17,7 @@ SUBDIRS = \ test_aio \ test_bloomfilter \ test_copy_callbacks \ + test_copy_format \ test_custom_rmgrs \ test_ddl_deparse \ test_dsa \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 9de0057bd1d..5fd06de2737 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -16,6 +16,7 @@ subdir('ssl_passphrase_callback') subdir('test_aio') subdir('test_bloomfilter') subdir('test_copy_callbacks') +subdir('test_copy_format') subdir('test_custom_rmgrs') subdir('test_ddl_deparse') subdir('test_dsa') diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_copy_format/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile new file mode 100644 index 00000000000..85dce14ebb3 --- /dev/null +++ b/src/test/modules/test_copy_format/Makefile @@ -0,0 +1,21 @@ +# src/test/modules/test_copy_format/Makefile + +MODULE_big = test_copy_format +OBJS = \ + $(WIN32RES) \ + test_copy_format.o +PGFILEDESC = "test_copy_format - test custom COPY FORMAT" + +REGRESS = test_copy_format +REGRESS_OPTS = --temp-config $(top_srcdir)/src/test/modules/test_copy_format/test_copy_format.conf + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_format +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out new file mode 100644 index 00000000000..163ff94fa41 --- /dev/null +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -0,0 +1,19 @@ +CREATE TABLE copy_data (a smallint, b integer, c bigint); +INSERT INTO copy_data VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY copy_data FROM stdin WITH (FORMAT 'test_copy_format'); +NOTICE: CopyFromInFunc: attribute: smallint +NOTICE: CopyFromInFunc: attribute: integer +NOTICE: CopyFromInFunc: attribute: bigint +NOTICE: CopyFromStart: the number of attributes: 3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromEnd +COPY copy_data TO stdout WITH (FORMAT 'test_copy_format'); +NOTICE: CopyToOutFunc: attribute: smallint +NOTICE: CopyToOutFunc: attribute: integer +NOTICE: CopyToOutFunc: attribute: bigint +NOTICE: CopyToStart: the number of attributes: 3 +NOTICE: CopyToOneRow: the number of valid values: 3 +NOTICE: CopyToOneRow: the number of valid values: 3 +NOTICE: CopyToOneRow: the number of valid values: 3 +NOTICE: CopyToEnd +DROP TABLE copy_data; diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build new file mode 100644 index 00000000000..723c51d3f45 --- /dev/null +++ b/src/test/modules/test_copy_format/meson.build @@ -0,0 +1,29 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +test_copy_format_sources = files( + 'test_copy_format.c', +) + +if host_system == 'windows' + test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_format', + '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',]) +endif + +test_copy_format = shared_module('test_copy_format', + test_copy_format_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_copy_format + +tests += { + 'name': 'test_copy_format', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_format', + ], + 'regress_args': ['--temp-config', files('test_copy_format.conf')], + }, +} diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql new file mode 100644 index 00000000000..6d60a493e0e --- /dev/null +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -0,0 +1,8 @@ +CREATE TABLE copy_data (a smallint, b integer, c bigint); +INSERT INTO copy_data VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); + +COPY copy_data FROM stdin WITH (FORMAT 'test_copy_format'); +\. +COPY copy_data TO stdout WITH (FORMAT 'test_copy_format'); + +DROP TABLE copy_data; diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c new file mode 100644 index 00000000000..70b7a308d8a --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -0,0 +1,91 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_format.c + * Code for testing custom COPY format. + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_format/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copyapi.h" +#include "commands/defrem.h" +#include "utils/builtins.h" + +PG_MODULE_MAGIC; + +static void +TestCopyFromInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + ereport(NOTICE, (errmsg("CopyFromInFunc: attribute: %s", format_type_be(atttypid)))); +} + +static void +TestCopyFromStart(CopyFromState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyFromStart: the number of attributes: %d", tupDesc->natts))); +} + +static bool +TestCopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + ereport(NOTICE, (errmsg("CopyFromOneRow"))); + return false; +} + +static void +TestCopyFromEnd(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromEnd"))); +} + +static const CopyFromRoutine CopyFromRoutineTestCopyFormat = { + .CopyFromInFunc = TestCopyFromInFunc, + .CopyFromStart = TestCopyFromStart, + .CopyFromOneRow = TestCopyFromOneRow, + .CopyFromEnd = TestCopyFromEnd, +}; + +static void +TestCopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + ereport(NOTICE, (errmsg("CopyToOutFunc: attribute: %s", format_type_be(atttypid)))); +} + +static void +TestCopyToStart(CopyToState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyToStart: the number of attributes: %d", tupDesc->natts))); +} + +static void +TestCopyToOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + ereport(NOTICE, (errmsg("CopyToOneRow: the number of valid values: %u", slot->tts_nvalid))); +} + +static void +TestCopyToEnd(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToEnd"))); +} + +static const CopyToRoutine CopyToRoutineTestCopyFormat = { + .CopyToOutFunc = TestCopyToOutFunc, + .CopyToStart = TestCopyToStart, + .CopyToOneRow = TestCopyToOneRow, + .CopyToEnd = TestCopyToEnd, +}; + +void +_PG_init(void) +{ + RegisterCopyFromRoutine("test_copy_format", &CopyFromRoutineTestCopyFormat); + RegisterCopyToRoutine("test_copy_format", &CopyToRoutineTestCopyFormat); +} diff --git a/src/test/modules/test_copy_format/test_copy_format.conf b/src/test/modules/test_copy_format/test_copy_format.conf new file mode 100644 index 00000000000..743a02ac92a --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.conf @@ -0,0 +1 @@ +shared_preload_libraries = 'test_copy_format' -- 2.49.0
В списке pgsql-hackers по дате отправления: