Re: Make COPY format extendable: Extract COPY TO format implementations

Поиск
Список
Период
Сортировка
От Sutou Kouhei
Тема Re: Make COPY format extendable: Extract COPY TO format implementations
Дата
Msg-id 20231206.162834.1390394858102165899.kou@clear-code.com
обсуждение исходный текст
Ответ на Re: Make COPY format extendable: Extract COPY TO format implementations  (Junwang Zhao <zhjwpku@gmail.com>)
Ответы Re: Make COPY format extendable: Extract COPY TO format implementations  (Junwang Zhao <zhjwpku@gmail.com>)
Список pgsql-hackers
Hi,

In <CAEG8a3K9dE2gt3+K+h=DwTqMenR84aeYuYS+cty3SR3LAeDBAQ@mail.gmail.com>
  "Re: Make COPY format extendable: Extract COPY TO format implementations" on Wed, 6 Dec 2023 15:11:34 +0800,
  Junwang Zhao <zhjwpku@gmail.com> wrote:

> For the extra curly braces, I mean the following code block in
> CopyToFormatBinaryStart:
> 
> + {        <-- I thought this is useless?
> + /* Generate header for a binary copy */
> + int32 tmp;
> +
> + /* Signature */
> + CopySendData(cstate, BinarySignature, 11);
> + /* Flags field */
> + tmp = 0;
> + CopySendInt32(cstate, tmp);
> + /* No header extension */
> + tmp = 0;
> + CopySendInt32(cstate, tmp);
> + }

Oh, I see. I've removed and attach the v3 patch. In general,
I don't change variable name and so on in this patch. I just
move codes in this patch. But I also removed the "tmp"
variable for this case because I think that the name isn't
suitable for larger scope. (I think that "tmp" is acceptable
in a small scope like the above code.)

New code:

/* Generate header for a binary copy */
/* Signature */
CopySendData(cstate, BinarySignature, 11);
/* Flags field */
CopySendInt32(cstate, 0);
/* No header extension */
CopySendInt32(cstate, 0);


Thanks,
-- 
kou
From 9fe0087d9a6a79a7d1a7d0af63eb16abadbf0d4a Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 4 Dec 2023 12:32:54 +0900
Subject: [PATCH v3] Extract COPY TO format implementations

This is a part of making COPY format extendable. See also these past
discussions:
* New Copy Formats - avro/orc/parquet:
  https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com
* Make COPY extendable in order to support Parquet and other formats:
  https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com

This doesn't change the current behavior. This just introduces
CopyToFormatOps, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.

Note that CopyToFormatOps can't be used from extensions yet because
CopySend*() aren't exported yet. Extensions can't send formatted data
to a destination without CopySend*(). They will be exported by
subsequent patches.

Here is a benchmark result with/without this change because there was
a discussion that we should care about performance regression:

https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us

> I think that step 1 ought to be to convert the existing formats into
> plug-ins, and demonstrate that there's no significant loss of
> performance.

You can see that there is no significant loss of performance:

Data: Random 32 bit integers:

    CREATE TABLE data (int32 integer);
    INSERT INTO data
      SELECT random() * 10000
        FROM generate_series(1, ${n_records});

The number of records: 100K, 1M and 10M

100K without this change:

    format,elapsed time (ms)
    text,22.527
    csv,23.822
    binary,24.806

100K with this change:

    format,elapsed time (ms)
    text,22.919
    csv,24.643
    binary,24.705

1M without this change:

    format,elapsed time (ms)
    text,223.457
    csv,233.583
    binary,242.687

1M with this change:

    format,elapsed time (ms)
    text,224.591
    csv,233.964
    binary,247.164

10M without this change:

    format,elapsed time (ms)
    text,2330.383
    csv,2411.394
    binary,2590.817

10M with this change:

    format,elapsed time (ms)
    text,2231.307
    csv,2408.067
    binary,2473.617
---
 src/backend/commands/copy.c   |   8 +
 src/backend/commands/copyto.c | 377 ++++++++++++++++++++--------------
 src/include/commands/copy.h   |  27 ++-
 3 files changed, 256 insertions(+), 156 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b562..27a1add456 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate,
 
     opts_out->file_encoding = -1;
 
+    /* Text is the default format. */
+    opts_out->to_ops = CopyToFormatOpsText;
     /* Extract options from the statement node tree */
     foreach(option, options)
     {
@@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate,
             if (strcmp(fmt, "text") == 0)
                  /* default format */ ;
             else if (strcmp(fmt, "csv") == 0)
+            {
                 opts_out->csv_mode = true;
+                opts_out->to_ops = CopyToFormatOpsCSV;
+            }
             else if (strcmp(fmt, "binary") == 0)
+            {
                 opts_out->binary = true;
+                opts_out->to_ops = CopyToFormatOpsBinary;
+            }
             else
                 ereport(ERROR,
                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c66a047c4a..8f51090a03 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,228 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyToFormatOps implementations.
+ */
+
+/*
+ * CopyToFormatOps implementation for "text" and "csv". CopyToFormatText*()
+ * refer cstate->opts.csv_mode and change their behavior. We can split this
+ * implementation and stop referring cstate->opts.csv_mode later.
+ */
+
+static void
+CopyToFormatTextSendEndOfRow(CopyToState cstate)
+{
+    switch (cstate->copy_dest)
+    {
+    case COPY_FILE:
+        /* Default line termination depends on platform */
+#ifndef WIN32
+        CopySendChar(cstate, '\n');
+#else
+        CopySendString(cstate, "\r\n");
+#endif
+        break;
+    case COPY_FRONTEND:
+        /* The FE/BE protocol uses \n as newline for all platforms */
+        CopySendChar(cstate, '\n');
+        break;
+    default:
+        break;
+    }
+    CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    int            num_phys_attrs;
+    ListCell   *cur;
+
+    num_phys_attrs = tupDesc->natts;
+    /* Get info about the columns we need to process. */
+    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Oid            out_func_oid;
+        bool        isvarlena;
+        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+        getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+    }
+
+    /*
+     * For non-binary copy, we need to convert null_print to file
+     * encoding, because it will be sent directly with CopySendString.
+     */
+    if (cstate->need_transcoding)
+        cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+                                                          cstate->opts.null_print_len,
+                                                          cstate->file_encoding);
+
+    /* if a header has been requested send the line */
+    if (cstate->opts.header_line)
+    {
+        bool        hdr_delim = false;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            char       *colname;
+
+            if (hdr_delim)
+                CopySendChar(cstate, cstate->opts.delim[0]);
+            hdr_delim = true;
+
+            colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+            if (cstate->opts.csv_mode)
+                CopyAttributeOutCSV(cstate, colname, false,
+                                    list_length(cstate->attnumlist) == 1);
+            else
+                CopyAttributeOutText(cstate, colname);
+        }
+
+        CopyToFormatTextSendEndOfRow(cstate);
+    }
+}
+
+static void
+CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    bool        need_delim = false;
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (need_delim)
+            CopySendChar(cstate, cstate->opts.delim[0]);
+        need_delim = true;
+
+        if (isnull)
+            CopySendString(cstate, cstate->opts.null_print_client);
+        else
+        {
+            char       *string;
+
+            string = OutputFunctionCall(&out_functions[attnum - 1], value);
+            if (cstate->opts.csv_mode)
+                CopyAttributeOutCSV(cstate, string,
+                                    cstate->opts.force_quote_flags[attnum - 1],
+                                    list_length(cstate->attnumlist) == 1);
+            else
+                CopyAttributeOutText(cstate, string);
+        }
+    }
+
+    CopyToFormatTextSendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyToFormatOps implementation for "binary".
+ */
+
+static void
+CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    int            num_phys_attrs;
+    ListCell   *cur;
+
+    num_phys_attrs = tupDesc->natts;
+    /* Get info about the columns we need to process. */
+    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Oid            out_func_oid;
+        bool        isvarlena;
+        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+        getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+    }
+
+    /* Generate header for a binary copy */
+    /* Signature */
+    CopySendData(cstate, BinarySignature, 11);
+    /* Flags field */
+    CopySendInt32(cstate, 0);
+    /* No header extension */
+    CopySendInt32(cstate, 0);
+}
+
+static void
+CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    /* Binary per-tuple header */
+    CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (isnull)
+            CopySendInt32(cstate, -1);
+        else
+        {
+            bytea       *outputbytes;
+
+            outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+            CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+            CopySendData(cstate, VARDATA(outputbytes),
+                         VARSIZE(outputbytes) - VARHDRSZ);
+        }
+    }
+
+    CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatBinaryEnd(CopyToState cstate)
+{
+    /* Generate trailer for a binary copy */
+    CopySendInt16(cstate, -1);
+    /* Need to flush out the trailer */
+    CopySendEndOfRow(cstate);
+}
+
+const CopyToFormatOps CopyToFormatOpsText = {
+    .start = CopyToFormatTextStart,
+    .one_row = CopyToFormatTextOneRow,
+    .end = CopyToFormatTextEnd,
+};
+
+/*
+ * We can use the same CopyToFormatOps for both of "text" and "csv" because
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their
+ * behavior. We can split the implementations and stop referring
+ * cstate->opts.csv_mode later.
+ */
+const CopyToFormatOps CopyToFormatOpsCSV = CopyToFormatOpsText;
+
+const CopyToFormatOps CopyToFormatOpsBinary = {
+    .start = CopyToFormatBinaryStart,
+    .one_row = CopyToFormatBinaryOneRow,
+    .end = CopyToFormatBinaryEnd,
+};
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -198,16 +420,6 @@ CopySendEndOfRow(CopyToState cstate)
     switch (cstate->copy_dest)
     {
         case COPY_FILE:
-            if (!cstate->opts.binary)
-            {
-                /* Default line termination depends on platform */
-#ifndef WIN32
-                CopySendChar(cstate, '\n');
-#else
-                CopySendString(cstate, "\r\n");
-#endif
-            }
-
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -242,10 +454,6 @@ CopySendEndOfRow(CopyToState cstate)
             }
             break;
         case COPY_FRONTEND:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->opts.binary)
-                CopySendChar(cstate, '\n');
-
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
@@ -748,8 +956,6 @@ DoCopyTo(CopyToState cstate)
     bool        pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
     bool        fe_copy = (pipe && whereToSendOutput == DestRemote);
     TupleDesc    tupDesc;
-    int            num_phys_attrs;
-    ListCell   *cur;
     uint64        processed;
 
     if (fe_copy)
@@ -759,32 +965,11 @@ DoCopyTo(CopyToState cstate)
         tupDesc = RelationGetDescr(cstate->rel);
     else
         tupDesc = cstate->queryDesc->tupDesc;
-    num_phys_attrs = tupDesc->natts;
     cstate->opts.null_print_client = cstate->opts.null_print;    /* default */
 
     /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
     cstate->fe_msgbuf = makeStringInfo();
 
-    /* Get info about the columns we need to process. */
-    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Oid            out_func_oid;
-        bool        isvarlena;
-        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
-        if (cstate->opts.binary)
-            getTypeBinaryOutputInfo(attr->atttypid,
-                                    &out_func_oid,
-                                    &isvarlena);
-        else
-            getTypeOutputInfo(attr->atttypid,
-                              &out_func_oid,
-                              &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-    }
-
     /*
      * Create a temporary memory context that we can reset once per row to
      * recover palloc'd memory.  This avoids any problems with leaks inside
@@ -795,57 +980,7 @@ DoCopyTo(CopyToState cstate)
                                                "COPY TO",
                                                ALLOCSET_DEFAULT_SIZES);
 
-    if (cstate->opts.binary)
-    {
-        /* Generate header for a binary copy */
-        int32        tmp;
-
-        /* Signature */
-        CopySendData(cstate, BinarySignature, 11);
-        /* Flags field */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-        /* No header extension */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-    }
-    else
-    {
-        /*
-         * For non-binary copy, we need to convert null_print to file
-         * encoding, because it will be sent directly with CopySendString.
-         */
-        if (cstate->need_transcoding)
-            cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-                                                              cstate->opts.null_print_len,
-                                                              cstate->file_encoding);
-
-        /* if a header has been requested send the line */
-        if (cstate->opts.header_line)
-        {
-            bool        hdr_delim = false;
-
-            foreach(cur, cstate->attnumlist)
-            {
-                int            attnum = lfirst_int(cur);
-                char       *colname;
-
-                if (hdr_delim)
-                    CopySendChar(cstate, cstate->opts.delim[0]);
-                hdr_delim = true;
-
-                colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, colname, false,
-                                        list_length(cstate->attnumlist) == 1);
-                else
-                    CopyAttributeOutText(cstate, colname);
-            }
-
-            CopySendEndOfRow(cstate);
-        }
-    }
+    cstate->opts.to_ops.start(cstate, tupDesc);
 
     if (cstate->rel)
     {
@@ -884,13 +1019,7 @@ DoCopyTo(CopyToState cstate)
         processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
     }
 
-    if (cstate->opts.binary)
-    {
-        /* Generate trailer for a binary copy */
-        CopySendInt16(cstate, -1);
-        /* Need to flush out the trailer */
-        CopySendEndOfRow(cstate);
-    }
+    cstate->opts.to_ops.end(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -906,71 +1035,15 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-    bool        need_delim = false;
-    FmgrInfo   *out_functions = cstate->out_functions;
     MemoryContext oldcontext;
-    ListCell   *cur;
-    char       *string;
 
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
-    if (cstate->opts.binary)
-    {
-        /* Binary per-tuple header */
-        CopySendInt16(cstate, list_length(cstate->attnumlist));
-    }
-
     /* Make sure the tuple is fully deconstructed */
     slot_getallattrs(slot);
 
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Datum        value = slot->tts_values[attnum - 1];
-        bool        isnull = slot->tts_isnull[attnum - 1];
-
-        if (!cstate->opts.binary)
-        {
-            if (need_delim)
-                CopySendChar(cstate, cstate->opts.delim[0]);
-            need_delim = true;
-        }
-
-        if (isnull)
-        {
-            if (!cstate->opts.binary)
-                CopySendString(cstate, cstate->opts.null_print_client);
-            else
-                CopySendInt32(cstate, -1);
-        }
-        else
-        {
-            if (!cstate->opts.binary)
-            {
-                string = OutputFunctionCall(&out_functions[attnum - 1],
-                                            value);
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, string,
-                                        cstate->opts.force_quote_flags[attnum - 1],
-                                        list_length(cstate->attnumlist) == 1);
-                else
-                    CopyAttributeOutText(cstate, string);
-            }
-            else
-            {
-                bytea       *outputbytes;
-
-                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-                                               value);
-                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-                CopySendData(cstate, VARDATA(outputbytes),
-                             VARSIZE(outputbytes) - VARHDRSZ);
-            }
-        }
-    }
-
-    CopySendEndOfRow(cstate);
+    cstate->opts.to_ops.one_row(cstate, slot);
 
     MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b90b..6b5231b2f3 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -30,6 +30,28 @@ typedef enum CopyHeaderChoice
     COPY_HEADER_MATCH,
 } CopyHeaderChoice;
 
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY TO format implementation. */
+typedef struct CopyToFormatOps
+{
+    /* Called when COPY TO is started. This will send a header. */
+    void        (*start) (CopyToState cstate, TupleDesc tupDesc);
+
+    /* Copy one row for COPY TO. */
+    void        (*one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+    /* Called when COPY TO is ended. This will send a trailer. */
+    void        (*end) (CopyToState cstate);
+} CopyToFormatOps;
+
+/* Predefined CopyToFormatOps for "text", "csv" and "binary". */
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsText;
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsCSV;
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsBinary;
+
 /*
  * A struct to hold COPY options, in a parsed form. All of these are related
  * to formatting, except for 'freeze', which doesn't really belong here, but
@@ -63,12 +85,9 @@ typedef struct CopyFormatOptions
     bool       *force_null_flags;    /* per-column CSV FN flags */
     bool        convert_selectively;    /* do selective binary conversion? */
     List       *convert_select; /* list of column names (can be NIL) */
+    CopyToFormatOps to_ops;        /* how to format to */
 } CopyFormatOptions;
 
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 typedef void (*copy_data_dest_cb) (void *data, int len);
 
-- 
2.40.1


В списке pgsql-hackers по дате отправления:

Предыдущее
От: "Euler Taveira"
Дата:
Сообщение: Re: speed up a logical replica setup
Следующее
От: Michael Paquier
Дата:
Сообщение: Re: Remove MSVC scripts from the tree