Re: pg_upgrade failing for 200+ million Large Objects

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: pg_upgrade failing for 200+ million Large Objects
Дата
Msg-id 534746.1703116064@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: pg_upgrade failing for 200+ million Large Objects  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: pg_upgrade failing for 200+ million Large Objects  (Nathan Bossart <nathandbossart@gmail.com>)
Re: pg_upgrade failing for 200+ million Large Objects  (Robins Tharakan <tharakan@gmail.com>)
Re: pg_upgrade failing for 200+ million Large Objects  (Nathan Bossart <nathandbossart@gmail.com>)
Список pgsql-hackers
I have spent some more effort in this area and developed a patch
series that I think addresses all of the performance issues that
we've discussed in this thread, both for pg_upgrade and more
general use of pg_dump/pg_restore.  Concretely, it absorbs
the pg_restore --transaction-size switch that I proposed before
to cut the number of transactions needed during restore, and
rearranges the representation of BLOB-related TOC entries to
reduce the client-side memory requirements, and fixes some
ancient mistakes that prevent both selective restore of BLOBs
and parallel restore of BLOBs.

As a demonstration, I made a database containing 100K empty blobs,
and measured the time needed to dump/restore that using -Fd
and -j 10.  HEAD doesn't get any useful parallelism on blobs,
but with this patch series we do:

        dump    restore
HEAD:        14sec    15sec
after 0002:    7sec    10sec
after 0003:    7sec    3sec

There are a few loose ends:

* I did not invent a switch to control the batching of blobs; it's
just hard-wired at 1000 blobs per group here.  Probably we need some
user knob for that, but I'm unsure if we want to expose a count or
just a boolean for one vs more than one blob per batch.  The point of
forcing one blob per batch would be to allow exact control during
selective restore, and I'm not sure if there's any value in random
other settings.  On the other hand, selective restore of blobs has
been completely broken for the last dozen years and I can't recall any
user complaints about that; so maybe nobody cares and we could just
leave this as an internal choice.

* Likewise, there's no user-accessible knob to control what
transaction size pg_upgrade uses.  Do we need one?  In any case, it's
likely that the default needs a bit more thought than I've given it.
I used 1000, but if pg_upgrade is launching parallel restore jobs we
likely need to divide that by the number of restore jobs.

* As the patch stands, we still build a separate TOC entry for each
comment or seclabel or ACL attached to a blob.  If you have a lot of
blobs with non-default properties then the TOC bloat problem comes
back again.  We could do something about that, but it would take a bit
of tedious refactoring, and the most obvious way to handle it probably
re-introduces too-many-locks problems.  Is this a scenario that's
worth spending a lot of time on?

More details appear in the commit messages below.  Patch 0004
is nearly the same as the v8 patch I posted before, although
it adds some logic to ensure that a large blob metadata batch
doesn't create too many locks.

Comments?

            regards, tom lane

PS: I don't see any active CF entry for this thread, so
I'm going to go make one.

From eecef8f312967ff7cc0f47899c6db2c3e654371d Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Wed, 20 Dec 2023 13:52:28 -0500
Subject: [PATCH v9 1/4] Some small preliminaries for pg_dump changes.

Centralize management of the lo_buf used to hold data while restoring
blobs.  The code previously had each format handler create lo_buf,
which seems rather pointless given that the format handlers all make
it the same way.  Moreover, the format handlers never use lo_buf
directly, making this setup a failure from a separation-of-concerns
standpoint.  Let's move the responsibility into pg_backup_archiver.c,
which is the only module concerned with lo_buf.  The main reason to do
this now is that it allows a centralized fix for the soon-to-be-false
assumption that we never restore blobs in parallel.

Also, get rid of dead code in DropLOIfExists: it's been a long time
since we had any need to be able to restore to a pre-9.0 server.
---
 src/bin/pg_dump/pg_backup_archiver.c  |  9 +++++++++
 src/bin/pg_dump/pg_backup_custom.c    |  7 -------
 src/bin/pg_dump/pg_backup_db.c        | 27 +++++----------------------
 src/bin/pg_dump/pg_backup_directory.c |  6 ------
 src/bin/pg_dump/pg_backup_null.c      |  4 ----
 src/bin/pg_dump/pg_backup_tar.c       |  4 ----
 6 files changed, 14 insertions(+), 43 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 256d1e35a4..26c2c684c8 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -1343,6 +1343,12 @@ StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
     AH->loCount++;

     /* Initialize the LO Buffer */
+    if (AH->lo_buf == NULL)
+    {
+        /* First time through (in this process) so allocate the buffer */
+        AH->lo_buf_size = LOBBUFSIZE;
+        AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
+    }
     AH->lo_buf_used = 0;

     pg_log_info("restoring large object with OID %u", oid);
@@ -4748,6 +4754,9 @@ CloneArchive(ArchiveHandle *AH)
     /* clone has its own error count, too */
     clone->public.n_errors = 0;

+    /* clones should not share lo_buf */
+    clone->lo_buf = NULL;
+
     /*
      * Connect our new clone object to the database, using the same connection
      * parameters used for the original connection.
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index b576b29924..7c6ac89dd4 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -140,10 +140,6 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
     ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
     AH->formatData = (void *) ctx;

-    /* Initialize LO buffering */
-    AH->lo_buf_size = LOBBUFSIZE;
-    AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
-
     /*
      * Now open the file
      */
@@ -902,9 +898,6 @@ _Clone(ArchiveHandle *AH)
      * share knowledge about where the data blocks are across threads.
      * _PrintTocData has to be careful about the order of operations on that
      * state, though.
-     *
-     * Note: we do not make a local lo_buf because we expect at most one BLOBS
-     * entry per archive, so no parallelism is possible.
      */
 }

diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index f766b65059..b297ca049d 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -544,26 +544,9 @@ CommitTransaction(Archive *AHX)
 void
 DropLOIfExists(ArchiveHandle *AH, Oid oid)
 {
-    /*
-     * If we are not restoring to a direct database connection, we have to
-     * guess about how to detect whether the LO exists.  Assume new-style.
-     */
-    if (AH->connection == NULL ||
-        PQserverVersion(AH->connection) >= 90000)
-    {
-        ahprintf(AH,
-                 "SELECT pg_catalog.lo_unlink(oid) "
-                 "FROM pg_catalog.pg_largeobject_metadata "
-                 "WHERE oid = '%u';\n",
-                 oid);
-    }
-    else
-    {
-        /* Restoring to pre-9.0 server, so do it the old way */
-        ahprintf(AH,
-                 "SELECT CASE WHEN EXISTS("
-                 "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
-                 ") THEN pg_catalog.lo_unlink('%u') END;\n",
-                 oid, oid);
-    }
+    ahprintf(AH,
+             "SELECT pg_catalog.lo_unlink(oid) "
+             "FROM pg_catalog.pg_largeobject_metadata "
+             "WHERE oid = '%u';\n",
+             oid);
 }
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 679c60420b..16491d6a95 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -143,10 +143,6 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
     ctx->dataFH = NULL;
     ctx->LOsTocFH = NULL;

-    /* Initialize LO buffering */
-    AH->lo_buf_size = LOBBUFSIZE;
-    AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
-
     /*
      * Now open the TOC file
      */
@@ -823,8 +819,6 @@ _Clone(ArchiveHandle *AH)
     ctx = (lclContext *) AH->formatData;

     /*
-     * Note: we do not make a local lo_buf because we expect at most one BLOBS
-     * entry per archive, so no parallelism is possible.  Likewise,
      * TOC-entry-local state isn't an issue because any one TOC entry is
      * touched by just one worker child.
      */
diff --git a/src/bin/pg_dump/pg_backup_null.c b/src/bin/pg_dump/pg_backup_null.c
index 08f096251b..776f057770 100644
--- a/src/bin/pg_dump/pg_backup_null.c
+++ b/src/bin/pg_dump/pg_backup_null.c
@@ -63,10 +63,6 @@ InitArchiveFmt_Null(ArchiveHandle *AH)
     AH->ClonePtr = NULL;
     AH->DeClonePtr = NULL;

-    /* Initialize LO buffering */
-    AH->lo_buf_size = LOBBUFSIZE;
-    AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
-
     /*
      * Now prevent reading...
      */
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index aad88ad559..4cb9707e63 100644
--- a/src/bin/pg_dump/pg_backup_tar.c
+++ b/src/bin/pg_dump/pg_backup_tar.c
@@ -156,10 +156,6 @@ InitArchiveFmt_Tar(ArchiveHandle *AH)
     ctx->filePos = 0;
     ctx->isSpecialScript = 0;

-    /* Initialize LO buffering */
-    AH->lo_buf_size = LOBBUFSIZE;
-    AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
-
     /*
      * Now open the tar file, and load the TOC if we're in read mode.
      */
--
2.39.3

From b3239164371648ccb0053f045ddc14a762e88d49 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Wed, 20 Dec 2023 15:34:19 -0500
Subject: [PATCH v9 2/4] In dumps, group large objects into matching metadata
 and data entries.

Commit c0d5be5d6 caused pg_dump to create a separate BLOB metadata TOC
entry for each large object (blob), but it did not touch the ancient
decision to put all the blobs' data into a single BLOBS TOC entry.
This is bad for a few reasons: for databases with millions of blobs,
the TOC becomes unreasonably large, causing performance issues;
selective restore of just some blobs is quite impossible; and we
cannot parallelize either dump or restore of the blob data, since our
architecture for that relies on farming out whole TOC entries to
worker processes.

To improve matters, let's group multiple blobs into each blob metadata
TOC entry, and then make corresponding per-group blob data TOC entries.
Selective restore using pg_restore's -l/-L switches is then possible,
though only at the group level.  (We should provide a switch to allow
forcing one-blob-per-group for users who need precise selective
restore and don't have huge numbers of blobs.  This patch doesn't yet
do that, instead just hard-wiring the maximum number of blobs per
entry at 1000.)

The blobs in a group must all have the same owner, since the TOC entry
format only allows one owner to be named.  In this implementation
we also require them to all share the same ACL (grants); the archive
format wouldn't require that, but pg_dump's representation of
DumpableObjects does.  It seems unlikely that either restriction
will be problematic for databases with huge numbers of blobs.

The metadata TOC entries now have a "desc" string of "BLOB METADATA",
and their "defn" string is just a newline-separated list of blob OIDs.
The restore code has to generate creation commands, ALTER OWNER
commands, and drop commands (for --clean mode) from that.  We would
need special-case code for ALTER OWNER and drop in any case, so the
alternative of keeping the "defn" as directly executable SQL code
for creation wouldn't buy much, and it seems like it'd bloat the
archive to little purpose.

The data TOC entries ("BLOBS") can be exactly the same as before,
except that now there can be more than one, so we'd better give them
identifying tag strings.

We have to bump the archive file format version number, since existing
versions of pg_restore wouldn't know they need to do something special
for BLOB METADATA, plus they aren't going to work correctly with
multiple BLOBS entries.

Also, the directory and tar-file format handlers need some work
for multiple BLOBS entries: they used to hard-wire the file name
as "blobs.toc", which is replaced here with "blobs_<dumpid>.toc".

The 002_pg_dump.pl test script also knows about that and requires
minor updates.  (I had to drop the test for manually-compressed
blobs.toc files with LZ4, because lz4's obtuse command line
design requires explicit specification of the output file name
which seems impractical here.  I don't think we're losing any
useful test coverage thereby; that test stanza seems completely
duplicative with the gzip and zstd cases anyway.)

As this stands, we still generate a separate TOC entry for any
comment, security label, or ACL attached to a blob.  I feel
comfortable in believing that comments and security labels on
blobs are rare; but we might have to do something about aggregating
blob ACLs into grouped TOC entries to avoid blowing up the TOC
size, if there are use cases with large numbers of non-default
blob ACLs.  That can be done later though, as it would not create
any compatibility issues.
---
 src/bin/pg_dump/common.c              |  26 +++
 src/bin/pg_dump/pg_backup_archiver.c  |  76 +++++--
 src/bin/pg_dump/pg_backup_archiver.h  |   6 +-
 src/bin/pg_dump/pg_backup_custom.c    |   4 +-
 src/bin/pg_dump/pg_backup_db.c        |  27 +++
 src/bin/pg_dump/pg_backup_directory.c |  38 ++--
 src/bin/pg_dump/pg_backup_null.c      |   4 +-
 src/bin/pg_dump/pg_backup_tar.c       |  39 +++-
 src/bin/pg_dump/pg_dump.c             | 280 +++++++++++++++-----------
 src/bin/pg_dump/pg_dump.h             |  11 +
 src/bin/pg_dump/t/002_pg_dump.pl      |  30 ++-
 11 files changed, 354 insertions(+), 187 deletions(-)

diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index 8b0c1e7b53..c38700c21e 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -46,6 +46,8 @@ static DumpId lastDumpId = 0;    /* Note: 0 is InvalidDumpId */
  * expects that it can move them around when resizing the table.  So we
  * cannot make the DumpableObjects be elements of the hash table directly;
  * instead, the hash table elements contain pointers to DumpableObjects.
+ * This does have the advantage of letting us map multiple CatalogIds
+ * to one DumpableObject, which is useful for blobs.
  *
  * It turns out to be convenient to also use this data structure to map
  * CatalogIds to owning extensions, if any.  Since extension membership
@@ -696,6 +698,30 @@ AssignDumpId(DumpableObject *dobj)
     }
 }

+/*
+ * recordAdditionalCatalogID
+ *      Record an additional catalog ID for the given DumpableObject
+ */
+void
+recordAdditionalCatalogID(CatalogId catId, DumpableObject *dobj)
+{
+    CatalogIdMapEntry *entry;
+    bool        found;
+
+    /* CatalogId hash table must exist, if we have a DumpableObject */
+    Assert(catalogIdHash != NULL);
+
+    /* Add reference to CatalogId hash */
+    entry = catalogid_insert(catalogIdHash, catId, &found);
+    if (!found)
+    {
+        entry->dobj = NULL;
+        entry->ext = NULL;
+    }
+    Assert(entry->dobj == NULL);
+    entry->dobj = dobj;
+}
+
 /*
  * Assign a DumpId that's not tied to a DumpableObject.
  *
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 26c2c684c8..73b9972da4 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -512,7 +512,20 @@ RestoreArchive(Archive *AHX)
                  * don't necessarily emit it verbatim; at this point we add an
                  * appropriate IF EXISTS clause, if the user requested it.
                  */
-                if (*te->dropStmt != '\0')
+                if (strcmp(te->desc, "BLOB METADATA") == 0)
+                {
+                    /* We must generate the per-blob commands */
+                    if (ropt->if_exists)
+                        IssueCommandPerBlob(AH, te,
+                                            "SELECT pg_catalog.lo_unlink(oid) "
+                                            "FROM pg_catalog.pg_largeobject_metadata "
+                                            "WHERE oid = '", "'");
+                    else
+                        IssueCommandPerBlob(AH, te,
+                                            "SELECT pg_catalog.lo_unlink('",
+                                            "')");
+                }
+                else if (*te->dropStmt != '\0')
                 {
                     if (!ropt->if_exists ||
                         strncmp(te->dropStmt, "--", 2) == 0)
@@ -528,12 +541,12 @@ RestoreArchive(Archive *AHX)
                     {
                         /*
                          * Inject an appropriate spelling of "if exists".  For
-                         * large objects, we have a separate routine that
+                         * old-style large objects, we have a routine that
                          * knows how to do it, without depending on
                          * te->dropStmt; use that.  For other objects we need
                          * to parse the command.
                          */
-                        if (strncmp(te->desc, "BLOB", 4) == 0)
+                        if (strcmp(te->desc, "BLOB") == 0)
                         {
                             DropLOIfExists(AH, te->catalogId.oid);
                         }
@@ -1290,7 +1303,7 @@ EndLO(Archive *AHX, Oid oid)
  **********/

 /*
- * Called by a format handler before any LOs are restored
+ * Called by a format handler before a group of LOs is restored
  */
 void
 StartRestoreLOs(ArchiveHandle *AH)
@@ -1309,7 +1322,7 @@ StartRestoreLOs(ArchiveHandle *AH)
 }

 /*
- * Called by a format handler after all LOs are restored
+ * Called by a format handler after a group of LOs is restored
  */
 void
 EndRestoreLOs(ArchiveHandle *AH)
@@ -2994,13 +3007,14 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
     {
         /*
          * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
-         * is considered a data entry.  We don't need to check for the BLOBS
-         * entry or old-style BLOB COMMENTS, because they will have hadDumper
-         * = true ... but we do need to check new-style BLOB ACLs, comments,
+         * is considered a data entry.  We don't need to check for BLOBS or
+         * old-style BLOB COMMENTS entries, because they will have hadDumper =
+         * true ... but we do need to check new-style BLOB ACLs, comments,
          * etc.
          */
         if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
             strcmp(te->desc, "BLOB") == 0 ||
+            strcmp(te->desc, "BLOB METADATA") == 0 ||
             (strcmp(te->desc, "ACL") == 0 &&
              strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
             (strcmp(te->desc, "COMMENT") == 0 &&
@@ -3041,6 +3055,7 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
         if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
             !(ropt->binary_upgrade &&
               (strcmp(te->desc, "BLOB") == 0 ||
+               strcmp(te->desc, "BLOB METADATA") == 0 ||
                (strcmp(te->desc, "ACL") == 0 &&
                 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
                (strcmp(te->desc, "COMMENT") == 0 &&
@@ -3612,18 +3627,26 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
     }

     /*
-     * Actually print the definition.
+     * Actually print the definition.  Normally we can just print the defn
+     * string if any, but we have two special cases:
      *
-     * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
+     * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
      * versions put into CREATE SCHEMA.  Don't mutate the variant for schema
      * "public" that is a comment.  We have to do this when --no-owner mode is
      * selected.  This is ugly, but I see no other good way ...
+     *
+     * 2. BLOB METADATA entries need special processing since their defn
+     * strings are just lists of OIDs, not complete SQL commands.
      */
     if (ropt->noOwner &&
         strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
     {
         ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
     }
+    else if (strcmp(te->desc, "BLOB METADATA") == 0)
+    {
+        IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
+    }
     else
     {
         if (te->defn && strlen(te->defn) > 0)
@@ -3644,18 +3667,31 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
         te->owner && strlen(te->owner) > 0 &&
         te->dropStmt && strlen(te->dropStmt) > 0)
     {
-        PQExpBufferData temp;
+        if (strcmp(te->desc, "BLOB METADATA") == 0)
+        {
+            /* BLOB METADATA needs special code to handle multiple LOs */
+            char       *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
+
+            IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
+            pg_free(cmdEnd);
+        }
+        else
+        {
+            /* For all other cases, we can use _getObjectDescription */
+            PQExpBufferData temp;

-        initPQExpBuffer(&temp);
-        _getObjectDescription(&temp, te);
+            initPQExpBuffer(&temp);
+            _getObjectDescription(&temp, te);

-        /*
-         * If _getObjectDescription() didn't fill the buffer, then there is no
-         * owner.
-         */
-        if (temp.data[0])
-            ahprintf(AH, "ALTER %s OWNER TO %s;\n\n", temp.data, fmtId(te->owner));
-        termPQExpBuffer(&temp);
+            /*
+             * If _getObjectDescription() didn't fill the buffer, then there
+             * is no owner.
+             */
+            if (temp.data[0])
+                ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
+                         temp.data, fmtId(te->owner));
+            termPQExpBuffer(&temp);
+        }
     }

     /*
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 917283fd34..e4dd395582 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -68,10 +68,12 @@
 #define K_VERS_1_15 MAKE_ARCHIVE_VERSION(1, 15, 0)    /* add
                                                      * compression_algorithm
                                                      * in header */
+#define K_VERS_1_16 MAKE_ARCHIVE_VERSION(1, 16, 0)    /* BLOB METADATA entries
+                                                     * and multiple BLOBS */

 /* Current archive version number (the format we can output) */
 #define K_VERS_MAJOR 1
-#define K_VERS_MINOR 15
+#define K_VERS_MINOR 16
 #define K_VERS_REV 0
 #define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV)

@@ -448,6 +450,8 @@ extern void InitArchiveFmt_Tar(ArchiveHandle *AH);
 extern bool isValidTarHeader(char *header);

 extern void ReconnectToServer(ArchiveHandle *AH, const char *dbname);
+extern void IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
+                                const char *cmdBegin, const char *cmdEnd);
 extern void DropLOIfExists(ArchiveHandle *AH, Oid oid);

 void        ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH);
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 7c6ac89dd4..55107b2005 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -338,7 +338,7 @@ _EndData(ArchiveHandle *AH, TocEntry *te)
 }

 /*
- * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * Called by the archiver when starting to save BLOB DATA (not schema).
  * This routine should save whatever format-specific information is needed
  * to read the LOs back into memory.
  *
@@ -398,7 +398,7 @@ _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 }

 /*
- * Called by the archiver when finishing saving all BLOB DATA.
+ * Called by the archiver when finishing saving BLOB DATA.
  *
  * Optional.
  */
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index b297ca049d..c14d813b21 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -541,6 +541,33 @@ CommitTransaction(Archive *AHX)
     ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
 }

+/*
+ * Issue per-blob commands for the large object(s) listed in the TocEntry
+ *
+ * The TocEntry's defn string is assumed to consist of large object OIDs,
+ * one per line.  Wrap these in the given SQL command fragments and issue
+ * the commands.  (cmdEnd need not include a semicolon.)
+ */
+void
+IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
+                    const char *cmdBegin, const char *cmdEnd)
+{
+    /* Make a writable copy of the command string */
+    char       *buf = pg_strdup(te->defn);
+    char       *st;
+    char       *en;
+
+    st = buf;
+    while ((en = strchr(st, '\n')) != NULL)
+    {
+        *en++ = '\0';
+        ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd);
+        st = en;
+    }
+    ahprintf(AH, "\n");
+    pg_free(buf);
+}
+
 void
 DropLOIfExists(ArchiveHandle *AH, Oid oid)
 {
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 16491d6a95..829832586f 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -5,8 +5,10 @@
  *    A directory format dump is a directory, which contains a "toc.dat" file
  *    for the TOC, and a separate file for each data entry, named "<oid>.dat".
  *    Large objects are stored in separate files named "blob_<oid>.dat",
- *    and there's a plain-text TOC file for them called "blobs.toc". If
- *    compression is used, each data file is individually compressed and the
+ *    and there's a plain-text TOC file for each BLOBS TOC entry named
+ *    "blobs_<dumpID>.toc" (or just "blobs.toc" in archive versions before 16).
+ *
+ *    If compression is used, each data file is individually compressed and the
  *    ".gz" suffix is added to the filenames. The TOC files are never
  *    compressed by pg_dump, however they are accepted with the .gz suffix too,
  *    in case the user has manually compressed them with 'gzip'.
@@ -51,7 +53,7 @@ typedef struct
     char       *directory;

     CompressFileHandle *dataFH; /* currently open data file */
-    CompressFileHandle *LOsTocFH;    /* file handle for blobs.toc */
+    CompressFileHandle *LOsTocFH;    /* file handle for blobs_NNN.toc */
     ParallelState *pstate;        /* for parallel backup / restore */
 } lclContext;

@@ -81,7 +83,7 @@ static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
 static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
 static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
 static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
-static void _LoadLOs(ArchiveHandle *AH);
+static void _LoadLOs(ArchiveHandle *AH, TocEntry *te);

 static void _PrepParallelRestore(ArchiveHandle *AH);
 static void _Clone(ArchiveHandle *AH);
@@ -232,7 +234,10 @@ _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)

     tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     if (strcmp(te->desc, "BLOBS") == 0)
-        tctx->filename = pg_strdup("blobs.toc");
+    {
+        snprintf(fn, MAXPGPATH, "blobs_%d.toc", te->dumpId);
+        tctx->filename = pg_strdup(fn);
+    }
     else if (te->dataDumper)
     {
         snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
@@ -415,7 +420,7 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
         return;

     if (strcmp(te->desc, "BLOBS") == 0)
-        _LoadLOs(AH);
+        _LoadLOs(AH, te);
     else
     {
         char        fname[MAXPGPATH];
@@ -426,17 +431,23 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
 }

 static void
-_LoadLOs(ArchiveHandle *AH)
+_LoadLOs(ArchiveHandle *AH, TocEntry *te)
 {
     Oid            oid;
     lclContext *ctx = (lclContext *) AH->formatData;
+    lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     CompressFileHandle *CFH;
     char        tocfname[MAXPGPATH];
     char        line[MAXPGPATH];

     StartRestoreLOs(AH);

-    setFilePath(AH, tocfname, "blobs.toc");
+    /*
+     * Note: before archive v16, there was always only one BLOBS TOC entry,
+     * now there can be multiple.  We don't need to worry what version we are
+     * reading though, because tctx->filename should be correct either way.
+     */
+    setFilePath(AH, tocfname, tctx->filename);

     CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);

@@ -632,7 +643,7 @@ _ReopenArchive(ArchiveHandle *AH)
  */

 /*
- * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * Called by the archiver when starting to save BLOB DATA (not schema).
  * It is called just prior to the dumper's DataDumper routine.
  *
  * We open the large object TOC file here, so that we can append a line to
@@ -642,10 +653,11 @@ static void
 _StartLOs(ArchiveHandle *AH, TocEntry *te)
 {
     lclContext *ctx = (lclContext *) AH->formatData;
+    lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     pg_compress_specification compression_spec = {0};
     char        fname[MAXPGPATH];

-    setFilePath(AH, fname, "blobs.toc");
+    setFilePath(AH, fname, tctx->filename);

     /* The LO TOC file is never compressed */
     compression_spec.algorithm = PG_COMPRESSION_NONE;
@@ -690,7 +702,7 @@ _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
         pg_fatal("could not close LO data file: %m");
     ctx->dataFH = NULL;

-    /* register the LO in blobs.toc */
+    /* register the LO in blobs_NNN.toc */
     len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid);
     if (!CFH->write_func(buf, len, CFH))
     {
@@ -703,7 +715,7 @@ _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 }

 /*
- * Called by the archiver when finishing saving all BLOB DATA.
+ * Called by the archiver when finishing saving BLOB DATA.
  *
  * We close the LOs TOC file.
  */
@@ -795,7 +807,7 @@ _PrepParallelRestore(ArchiveHandle *AH)
         }

         /*
-         * If this is the BLOBS entry, what we stat'd was blobs.toc, which
+         * If this is a BLOBS entry, what we stat'd was blobs_NNN.toc, which
          * most likely is a lot smaller than the actual blob data.  We don't
          * have a cheap way to estimate how much smaller, but fortunately it
          * doesn't matter too much as long as we get the LOs processed
diff --git a/src/bin/pg_dump/pg_backup_null.c b/src/bin/pg_dump/pg_backup_null.c
index 776f057770..a3257f4fc8 100644
--- a/src/bin/pg_dump/pg_backup_null.c
+++ b/src/bin/pg_dump/pg_backup_null.c
@@ -113,7 +113,7 @@ _EndData(ArchiveHandle *AH, TocEntry *te)
 }

 /*
- * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * Called by the archiver when starting to save BLOB DATA (not schema).
  * This routine should save whatever format-specific information is needed
  * to read the LOs back into memory.
  *
@@ -170,7 +170,7 @@ _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 }

 /*
- * Called by the archiver when finishing saving all BLOB DATA.
+ * Called by the archiver when finishing saving BLOB DATA.
  *
  * Optional.
  */
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index 4cb9707e63..41ee52b1d6 100644
--- a/src/bin/pg_dump/pg_backup_tar.c
+++ b/src/bin/pg_dump/pg_backup_tar.c
@@ -94,7 +94,7 @@ typedef struct
     char       *filename;
 } lclTocEntry;

-static void _LoadLOs(ArchiveHandle *AH);
+static void _LoadLOs(ArchiveHandle *AH, TocEntry *te);

 static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
 static void tarClose(ArchiveHandle *AH, TAR_MEMBER *th);
@@ -634,13 +634,13 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
     }

     if (strcmp(te->desc, "BLOBS") == 0)
-        _LoadLOs(AH);
+        _LoadLOs(AH, te);
     else
         _PrintFileData(AH, tctx->filename);
 }

 static void
-_LoadLOs(ArchiveHandle *AH)
+_LoadLOs(ArchiveHandle *AH, TocEntry *te)
 {
     Oid            oid;
     lclContext *ctx = (lclContext *) AH->formatData;
@@ -651,7 +651,26 @@ _LoadLOs(ArchiveHandle *AH)

     StartRestoreLOs(AH);

-    th = tarOpen(AH, NULL, 'r');    /* Open next file */
+    /*
+     * The blobs_NNN.toc or blobs.toc file is fairly useless to us because it
+     * will appear only after the associated blob_NNN.dat files.  For archive
+     * versions >= 16 we can look at the BLOBS entry's te->tag to discover the
+     * OID of the first blob we want to restore, and then search forward to
+     * find the appropriate blob_<oid>.dat file.  For older versions we rely
+     * on the knowledge that there was only one BLOBS entry and just search
+     * for the first blob_<oid>.dat file.  Once we find the first blob file to
+     * restore, restore all blobs until we reach the blobs[_NNN].toc file.
+     */
+    if (AH->version >= K_VERS_1_16)
+    {
+        /* We rely on atooid to not complain about nnnn..nnnn tags */
+        oid = atooid(te->tag);
+        snprintf(buf, sizeof(buf), "blob_%u.dat", oid);
+        th = tarOpen(AH, buf, 'r'); /* Advance to first desired file */
+    }
+    else
+        th = tarOpen(AH, NULL, 'r');    /* Open next file */
+
     while (th != NULL)
     {
         ctx->FH = th;
@@ -681,9 +700,9 @@ _LoadLOs(ArchiveHandle *AH)

             /*
              * Once we have found the first LO, stop at the first non-LO entry
-             * (which will be 'blobs.toc').  This coding would eat all the
-             * rest of the archive if there are no LOs ... but this function
-             * shouldn't be called at all in that case.
+             * (which will be 'blobs[_NNN].toc').  This coding would eat all
+             * the rest of the archive if there are no LOs ... but this
+             * function shouldn't be called at all in that case.
              */
             if (foundLO)
                 break;
@@ -847,7 +866,7 @@ _scriptOut(ArchiveHandle *AH, const void *buf, size_t len)
  */

 /*
- * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * Called by the archiver when starting to save BLOB DATA (not schema).
  * This routine should save whatever format-specific information is needed
  * to read the LOs back into memory.
  *
@@ -862,7 +881,7 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te)
     lclContext *ctx = (lclContext *) AH->formatData;
     char        fname[K_STD_BUF_SIZE];

-    sprintf(fname, "blobs.toc");
+    sprintf(fname, "blobs_%d.toc", te->dumpId);
     ctx->loToc = tarOpen(AH, fname, 'w');
 }

@@ -908,7 +927,7 @@ _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 }

 /*
- * Called by the archiver when finishing saving all BLOB DATA.
+ * Called by the archiver when finishing saving BLOB DATA.
  *
  * Optional.
  *
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8c0b5486b9..ecb1156f5e 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3560,11 +3560,10 @@ getLOs(Archive *fout)
 {
     DumpOptions *dopt = fout->dopt;
     PQExpBuffer loQry = createPQExpBuffer();
-    LoInfo       *loinfo;
-    DumpableObject *lodata;
     PGresult   *res;
     int            ntups;
     int            i;
+    int            n;
     int            i_oid;
     int            i_lomowner;
     int            i_lomacl;
@@ -3572,11 +3571,15 @@ getLOs(Archive *fout)

     pg_log_info("reading large objects");

-    /* Fetch LO OIDs, and owner/ACL data */
+    /*
+     * Fetch LO OIDs and owner/ACL data.  Order the data so that all the blobs
+     * with the same owner/ACL appear together.
+     */
     appendPQExpBufferStr(loQry,
                          "SELECT oid, lomowner, lomacl, "
                          "acldefault('L', lomowner) AS acldefault "
-                         "FROM pg_largeobject_metadata");
+                         "FROM pg_largeobject_metadata "
+                         "ORDER BY lomowner, lomacl::pg_catalog.text, oid");

     res = ExecuteSqlQuery(fout, loQry->data, PGRES_TUPLES_OK);

@@ -3588,30 +3591,72 @@ getLOs(Archive *fout)
     ntups = PQntuples(res);

     /*
-     * Each large object has its own "BLOB" archive entry.
+     * Group the blobs into suitably-sized groups that have the same owner and
+     * ACL setting, and build a metadata and a data DumpableObject for each
+     * group.  (If we supported initprivs for blobs, we'd have to insist that
+     * groups also share initprivs settings, since the DumpableObject only has
+     * room for one.)  i is the index of the first tuple in the current group,
+     * and n is the number of tuples we include in the group.
      */
-    loinfo = (LoInfo *) pg_malloc(ntups * sizeof(LoInfo));
+    for (i = 0; i < ntups; i += n)
+    {
+        Oid            thisoid = atooid(PQgetvalue(res, i, i_oid));
+        char       *thisowner = PQgetvalue(res, i, i_lomowner);
+        char       *thisacl = PQgetvalue(res, i, i_lomacl);
+        LoInfo       *loinfo;
+        DumpableObject *lodata;
+        char        namebuf[64];
+
+        /* Scan to find first tuple not to be included in group */
+        n = 1;
+        while (n < 1000 && i + n < ntups)
+        {
+            if (strcmp(thisowner, PQgetvalue(res, i + n, i_lomowner)) != 0 ||
+                strcmp(thisacl, PQgetvalue(res, i + n, i_lomacl)) != 0)
+                break;
+            n++;
+        }

-    for (i = 0; i < ntups; i++)
-    {
-        loinfo[i].dobj.objType = DO_LARGE_OBJECT;
-        loinfo[i].dobj.catId.tableoid = LargeObjectRelationId;
-        loinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
-        AssignDumpId(&loinfo[i].dobj);
+        /* Build the metadata DumpableObject */
+        loinfo = (LoInfo *) pg_malloc(offsetof(LoInfo, looids) + n * sizeof(Oid));

-        loinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_oid));
-        loinfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_lomacl));
-        loinfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
-        loinfo[i].dacl.privtype = 0;
-        loinfo[i].dacl.initprivs = NULL;
-        loinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_lomowner));
+        loinfo->dobj.objType = DO_LARGE_OBJECT;
+        loinfo->dobj.catId.tableoid = LargeObjectRelationId;
+        loinfo->dobj.catId.oid = thisoid;
+        AssignDumpId(&loinfo->dobj);
+
+        if (n > 1)
+            snprintf(namebuf, sizeof(namebuf), "%u..%u", thisoid,
+                     atooid(PQgetvalue(res, i + n - 1, i_oid)));
+        else
+            snprintf(namebuf, sizeof(namebuf), "%u", thisoid);
+        loinfo->dobj.name = pg_strdup(namebuf);
+        loinfo->dacl.acl = pg_strdup(thisacl);
+        loinfo->dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
+        loinfo->dacl.privtype = 0;
+        loinfo->dacl.initprivs = NULL;
+        loinfo->rolname = getRoleName(thisowner);
+        loinfo->numlos = n;
+        loinfo->looids[0] = thisoid;
+        /* Collect OIDs of the remaining blobs in this group */
+        for (int k = 1; k < n; k++)
+        {
+            CatalogId    extraID;
+
+            loinfo->looids[k] = atooid(PQgetvalue(res, i + k, i_oid));
+
+            /* Make sure we can look up loinfo by any of the blobs' OIDs */
+            extraID.tableoid = LargeObjectRelationId;
+            extraID.oid = loinfo->looids[k];
+            recordAdditionalCatalogID(extraID, &loinfo->dobj);
+        }

         /* LOs have data */
-        loinfo[i].dobj.components |= DUMP_COMPONENT_DATA;
+        loinfo->dobj.components |= DUMP_COMPONENT_DATA;

-        /* Mark whether LO has an ACL */
+        /* Mark whether LO group has a non-empty ACL */
         if (!PQgetisnull(res, i, i_lomacl))
-            loinfo[i].dobj.components |= DUMP_COMPONENT_ACL;
+            loinfo->dobj.components |= DUMP_COMPONENT_ACL;

         /*
          * In binary-upgrade mode for LOs, we do *not* dump out the LO data,
@@ -3621,21 +3666,22 @@ getLOs(Archive *fout)
          * pg_largeobject_metadata, after the dump is restored.
          */
         if (dopt->binary_upgrade)
-            loinfo[i].dobj.dump &= ~DUMP_COMPONENT_DATA;
-    }
+            loinfo->dobj.dump &= ~DUMP_COMPONENT_DATA;

-    /*
-     * If we have any large objects, a "BLOBS" archive entry is needed. This
-     * is just a placeholder for sorting; it carries no data now.
-     */
-    if (ntups > 0)
-    {
+        /*
+         * Create a "BLOBS" data item for the group, too. This is just a
+         * placeholder for sorting; it carries no data now.
+         */
         lodata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
         lodata->objType = DO_LARGE_OBJECT_DATA;
         lodata->catId = nilCatalogId;
         AssignDumpId(lodata);
-        lodata->name = pg_strdup("BLOBS");
+        lodata->name = pg_strdup(namebuf);
         lodata->components |= DUMP_COMPONENT_DATA;
+        /* Set up explicit dependency from data to metadata */
+        lodata->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
+        lodata->dependencies[0] = loinfo->dobj.dumpId;
+        lodata->nDeps = lodata->allocDeps = 1;
     }

     PQclear(res);
@@ -3645,123 +3691,109 @@ getLOs(Archive *fout)
 /*
  * dumpLO
  *
- * dump the definition (metadata) of the given large object
+ * dump the definition (metadata) of the given large object group
  */
 static void
 dumpLO(Archive *fout, const LoInfo *loinfo)
 {
     PQExpBuffer cquery = createPQExpBuffer();
-    PQExpBuffer dquery = createPQExpBuffer();
-
-    appendPQExpBuffer(cquery,
-                      "SELECT pg_catalog.lo_create('%s');\n",
-                      loinfo->dobj.name);

-    appendPQExpBuffer(dquery,
-                      "SELECT pg_catalog.lo_unlink('%s');\n",
-                      loinfo->dobj.name);
+    /*
+     * The "definition" is just a newline-separated list of OIDs.  We need to
+     * put something into the dropStmt too, but it can just be a comment.
+     */
+    for (int i = 0; i < loinfo->numlos; i++)
+        appendPQExpBuffer(cquery, "%u\n", loinfo->looids[i]);

     if (loinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
         ArchiveEntry(fout, loinfo->dobj.catId, loinfo->dobj.dumpId,
                      ARCHIVE_OPTS(.tag = loinfo->dobj.name,
                                   .owner = loinfo->rolname,
-                                  .description = "BLOB",
+                                  .description = "BLOB METADATA",
                                   .section = SECTION_PRE_DATA,
                                   .createStmt = cquery->data,
-                                  .dropStmt = dquery->data));
-
-    /* Dump comment if any */
-    if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
-        dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
-                    NULL, loinfo->rolname,
-                    loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
-
-    /* Dump security label if any */
-    if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
-        dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
-                     NULL, loinfo->rolname,
-                     loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
-
-    /* Dump ACL if any */
-    if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
-        dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
-                loinfo->dobj.name, NULL,
-                NULL, loinfo->rolname, &loinfo->dacl);
+                                  .dropStmt = "-- dummy"));
+
+    /*
+     * Dump per-blob comments, seclabels, and ACLs if any.  We assume these
+     * are rare enough that it's okay to generate retail TOC entries for them.
+     */
+    if (loinfo->dobj.dump & (DUMP_COMPONENT_COMMENT |
+                             DUMP_COMPONENT_SECLABEL |
+                             DUMP_COMPONENT_ACL))
+    {
+        for (int i = 0; i < loinfo->numlos; i++)
+        {
+            CatalogId    catId;
+            char        namebuf[32];
+
+            /* Build identifying info for this blob */
+            catId.tableoid = loinfo->dobj.catId.tableoid;
+            catId.oid = loinfo->looids[i];
+            snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[i]);
+
+            if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+                dumpComment(fout, "LARGE OBJECT", namebuf,
+                            NULL, loinfo->rolname,
+                            catId, 0, loinfo->dobj.dumpId);
+
+            if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
+                dumpSecLabel(fout, "LARGE OBJECT", namebuf,
+                             NULL, loinfo->rolname,
+                             catId, 0, loinfo->dobj.dumpId);
+
+            if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
+                dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
+                        "LARGE OBJECT", namebuf, NULL,
+                        NULL, loinfo->rolname, &loinfo->dacl);
+        }
+    }

     destroyPQExpBuffer(cquery);
-    destroyPQExpBuffer(dquery);
 }

 /*
  * dumpLOs:
- *    dump the data contents of all large objects
+ *    dump the data contents of the large objects in the given group
  */
 static int
 dumpLOs(Archive *fout, const void *arg)
 {
-    const char *loQry;
-    const char *loFetchQry;
+    const LoInfo *loinfo = (const LoInfo *) arg;
     PGconn       *conn = GetConnection(fout);
-    PGresult   *res;
     char        buf[LOBBUFSIZE];
-    int            ntups;
-    int            i;
-    int            cnt;
-
-    pg_log_info("saving large objects");

-    /*
-     * Currently, we re-fetch all LO OIDs using a cursor.  Consider scanning
-     * the already-in-memory dumpable objects instead...
-     */
-    loQry =
-        "DECLARE looid CURSOR FOR "
-        "SELECT oid FROM pg_largeobject_metadata ORDER BY 1";
+    pg_log_info("saving large objects \"%s\"", loinfo->dobj.name);

-    ExecuteSqlStatement(fout, loQry);
+    for (int i = 0; i < loinfo->numlos; i++)
+    {
+        Oid            loOid = loinfo->looids[i];
+        int            loFd;
+        int            cnt;

-    /* Command to fetch from cursor */
-    loFetchQry = "FETCH 1000 IN looid";
+        /* Open the LO */
+        loFd = lo_open(conn, loOid, INV_READ);
+        if (loFd == -1)
+            pg_fatal("could not open large object %u: %s",
+                     loOid, PQerrorMessage(conn));

-    do
-    {
-        /* Do a fetch */
-        res = ExecuteSqlQuery(fout, loFetchQry, PGRES_TUPLES_OK);
+        StartLO(fout, loOid);

-        /* Process the tuples, if any */
-        ntups = PQntuples(res);
-        for (i = 0; i < ntups; i++)
+        /* Now read it in chunks, sending data to archive */
+        do
         {
-            Oid            loOid;
-            int            loFd;
-
-            loOid = atooid(PQgetvalue(res, i, 0));
-            /* Open the LO */
-            loFd = lo_open(conn, loOid, INV_READ);
-            if (loFd == -1)
-                pg_fatal("could not open large object %u: %s",
+            cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
+            if (cnt < 0)
+                pg_fatal("error reading large object %u: %s",
                          loOid, PQerrorMessage(conn));

-            StartLO(fout, loOid);
-
-            /* Now read it in chunks, sending data to archive */
-            do
-            {
-                cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
-                if (cnt < 0)
-                    pg_fatal("error reading large object %u: %s",
-                             loOid, PQerrorMessage(conn));
-
-                WriteData(fout, buf, cnt);
-            } while (cnt > 0);
-
-            lo_close(conn, loFd);
+            WriteData(fout, buf, cnt);
+        } while (cnt > 0);

-            EndLO(fout, loOid);
-        }
+        lo_close(conn, loFd);

-        PQclear(res);
-    } while (ntups > 0);
+        EndLO(fout, loOid);
+    }

     return 1;
 }
@@ -10413,28 +10445,34 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
         case DO_LARGE_OBJECT_DATA:
             if (dobj->dump & DUMP_COMPONENT_DATA)
             {
+                LoInfo       *loinfo;
                 TocEntry   *te;

+                loinfo = (LoInfo *) findObjectByDumpId(dobj->dependencies[0]);
+                if (loinfo == NULL)
+                    pg_fatal("missing metadata for large objects \"%s\"",
+                             dobj->name);
+
                 te = ArchiveEntry(fout, dobj->catId, dobj->dumpId,
                                   ARCHIVE_OPTS(.tag = dobj->name,
+                                               .owner = loinfo->rolname,
                                                .description = "BLOBS",
                                                .section = SECTION_DATA,
-                                               .dumpFn = dumpLOs));
+                                               .deps = dobj->dependencies,
+                                               .nDeps = dobj->nDeps,
+                                               .dumpFn = dumpLOs,
+                                               .dumpArg = loinfo));

                 /*
                  * Set the TocEntry's dataLength in case we are doing a
                  * parallel dump and want to order dump jobs by table size.
                  * (We need some size estimate for every TocEntry with a
                  * DataDumper function.)  We don't currently have any cheap
-                 * way to estimate the size of LOs, but it doesn't matter;
-                 * let's just set the size to a large value so parallel dumps
-                 * will launch this job first.  If there's lots of LOs, we
-                 * win, and if there aren't, we don't lose much.  (If you want
-                 * to improve on this, really what you should be thinking
-                 * about is allowing LO dumping to be parallelized, not just
-                 * getting a smarter estimate for the single TOC entry.)
+                 * way to estimate the size of LOs, but fortunately it doesn't
+                 * matter too much as long as we get large batches of LOs
+                 * processed reasonably early.  Assume 8K per blob.
                  */
-                te->dataLength = INT_MAX;
+                te->dataLength = loinfo->numlos * (pgoff_t) 8192;
             }
             break;
         case DO_POLICY:
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 2fe3cbed9a..9105210693 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -589,11 +589,21 @@ typedef struct _defaultACLInfo
     char        defaclobjtype;
 } DefaultACLInfo;

+/*
+ * LoInfo represents a group of large objects (blobs) that share the same
+ * owner and ACL setting.  dobj.components has the DUMP_COMPONENT_COMMENT bit
+ * set if any blob in the group has a comment; similarly for sec labels.
+ * If there are many blobs with the same owner/ACL, we can divide them into
+ * multiple LoInfo groups, which will each spawn a BLOB METADATA and a BLOBS
+ * (data) TOC entry.  This allows more parallelism during restore.
+ */
 typedef struct _loInfo
 {
     DumpableObject dobj;
     DumpableAcl dacl;
     const char *rolname;
+    int            numlos;
+    Oid            looids[FLEXIBLE_ARRAY_MEMBER];
 } LoInfo;

 /*
@@ -680,6 +690,7 @@ typedef struct _SubscriptionInfo
 extern TableInfo *getSchemaData(Archive *fout, int *numTablesPtr);

 extern void AssignDumpId(DumpableObject *dobj);
+extern void recordAdditionalCatalogID(CatalogId catId, DumpableObject *dobj);
 extern DumpId createDumpId(void);
 extern DumpId getMaxDumpId(void);
 extern DumpableObject *findObjectByDumpId(DumpId dumpId);
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index eb3ec534b4..76548561c8 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -109,11 +109,11 @@ my %pgdump_runs = (
             '--format=directory', '--compress=gzip:1',
             "--file=$tempdir/compression_gzip_dir", 'postgres',
         ],
-        # Give coverage for manually compressed blob.toc files during
+        # Give coverage for manually compressed blobs.toc files during
         # restore.
         compress_cmd => {
             program => $ENV{'GZIP_PROGRAM'},
-            args => [ '-f', "$tempdir/compression_gzip_dir/blobs.toc", ],
+            args => [ '-f', "$tempdir/compression_gzip_dir/blobs_*.toc", ],
         },
         # Verify that only data files were compressed
         glob_patterns => [
@@ -172,16 +172,6 @@ my %pgdump_runs = (
             '--format=directory', '--compress=lz4:1',
             "--file=$tempdir/compression_lz4_dir", 'postgres',
         ],
-        # Give coverage for manually compressed blob.toc files during
-        # restore.
-        compress_cmd => {
-            program => $ENV{'LZ4'},
-            args => [
-                '-z', '-f', '--rm',
-                "$tempdir/compression_lz4_dir/blobs.toc",
-                "$tempdir/compression_lz4_dir/blobs.toc.lz4",
-            ],
-        },
         # Verify that data files were compressed
         glob_patterns => [
             "$tempdir/compression_lz4_dir/toc.dat",
@@ -242,14 +232,13 @@ my %pgdump_runs = (
             '--format=directory', '--compress=zstd:1',
             "--file=$tempdir/compression_zstd_dir", 'postgres',
         ],
-        # Give coverage for manually compressed blob.toc files during
+        # Give coverage for manually compressed blobs.toc files during
         # restore.
         compress_cmd => {
             program => $ENV{'ZSTD'},
             args => [
                 '-z', '-f',
-                '--rm', "$tempdir/compression_zstd_dir/blobs.toc",
-                "-o", "$tempdir/compression_zstd_dir/blobs.toc.zst",
+                '--rm', "$tempdir/compression_zstd_dir/blobs_*.toc",
             ],
         },
         # Verify that data files were compressed
@@ -413,7 +402,7 @@ my %pgdump_runs = (
         },
         glob_patterns => [
             "$tempdir/defaults_dir_format/toc.dat",
-            "$tempdir/defaults_dir_format/blobs.toc",
+            "$tempdir/defaults_dir_format/blobs_*.toc",
             $supports_gzip ? "$tempdir/defaults_dir_format/*.dat.gz"
             : "$tempdir/defaults_dir_format/*.dat",
         ],
@@ -4821,8 +4810,13 @@ foreach my $run (sort keys %pgdump_runs)
         # not defined.
         next if (!defined($compress_program) || $compress_program eq '');

-        my @full_compress_cmd =
-          ($compress_cmd->{program}, @{ $compress_cmd->{args} });
+        # Arguments may require globbing.
+        my @full_compress_cmd = ($compress_program);
+        foreach my $arg (@{ $compress_cmd->{args} })
+        {
+            push @full_compress_cmd, glob($arg);
+        }
+
         command_ok(\@full_compress_cmd, "$run: compression commands");
     }

--
2.39.3

From 17ace22d028b24a89561e76f94f9defd92da9e8d Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Wed, 20 Dec 2023 16:56:54 -0500
Subject: [PATCH v9 3/4] Move BLOBS METADATA TOC entries into SECTION_DATA.

Commit c0d5be5d6 put the new BLOB metadata TOC entries into
SECTION_PRE_DATA, which perhaps is defensible in some ways,
but it's a rather odd choice considering that we go out of our
way to treat blobs as data.  Moreover, because parallel restore
handles the PRE_DATA section serially, this means we're only
getting part of the parallelism speedup we could hope for.
Moving these entries into SECTION_DATA means that we can
parallelize the lo_create calls not only the data loading
when there are many blobs.  The dependencies established by
the previous patch ensure that we won't try to load data for
a blob we've not yet created.
---
 src/bin/pg_dump/pg_dump.c        | 4 ++--
 src/bin/pg_dump/t/002_pg_dump.pl | 8 ++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index ecb1156f5e..4b34638cb1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3710,7 +3710,7 @@ dumpLO(Archive *fout, const LoInfo *loinfo)
                      ARCHIVE_OPTS(.tag = loinfo->dobj.name,
                                   .owner = loinfo->rolname,
                                   .description = "BLOB METADATA",
-                                  .section = SECTION_PRE_DATA,
+                                  .section = SECTION_DATA,
                                   .createStmt = cquery->data,
                                   .dropStmt = "-- dummy"));

@@ -18534,12 +18534,12 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
             case DO_FDW:
             case DO_FOREIGN_SERVER:
             case DO_TRANSFORM:
-            case DO_LARGE_OBJECT:
                 /* Pre-data objects: must come before the pre-data boundary */
                 addObjectDependency(preDataBound, dobj->dumpId);
                 break;
             case DO_TABLE_DATA:
             case DO_SEQUENCE_SET:
+            case DO_LARGE_OBJECT:
             case DO_LARGE_OBJECT_DATA:
                 /* Data objects: must come between the boundaries */
                 addObjectDependency(dobj, preDataBound->dumpId);
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 76548561c8..f0ea6e3dd8 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -912,7 +912,7 @@ my %tests = (
             column_inserts => 1,
             data_only => 1,
             inserts => 1,
-            section_pre_data => 1,
+            section_data => 1,
             test_schema_plus_large_objects => 1,
         },
         unlike => {
@@ -1289,7 +1289,7 @@ my %tests = (
             column_inserts => 1,
             data_only => 1,
             inserts => 1,
-            section_pre_data => 1,
+            section_data => 1,
             test_schema_plus_large_objects => 1,
         },
         unlike => {
@@ -1497,7 +1497,7 @@ my %tests = (
             column_inserts => 1,
             data_only => 1,
             inserts => 1,
-            section_pre_data => 1,
+            section_data => 1,
             test_schema_plus_large_objects => 1,
         },
         unlike => {
@@ -4241,7 +4241,7 @@ my %tests = (
             column_inserts => 1,
             data_only => 1,
             inserts => 1,
-            section_pre_data => 1,
+            section_data => 1,
             test_schema_plus_large_objects => 1,
             binary_upgrade => 1,
         },
--
2.39.3

From 3ab3558a236e6ad17fe48087aac3cabb4b02aa3e Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Wed, 20 Dec 2023 17:42:39 -0500
Subject: [PATCH v9 4/4] Invent --transaction-size option for pg_restore.

This patch allows pg_restore to wrap its commands into transaction
blocks, somewhat like --single-transaction, except that we commit
and start a new block after every N objects.  Using this mode
with a size limit of 1000 or so objects greatly reduces the number
of transactions consumed by the restore, while preventing any
one transaction from taking enough locks to overrun the receiving
server's shared lock table.

(A value of 1000 works well with the default lock table size of
around 6400 locks.  Higher --transaction-size values can be used
if one has increased the receiving server's lock table size.)

In this patch I have just hard-wired pg_upgrade to use
--transaction-size 1000.  Perhaps there would be value in adding
another pg_upgrade option to allow user control of that, but I'm
unsure that it's worth the trouble; I think few users would use it,
and any who did would see not that much benefit.  However, we
might need to adjust the logic to make the size be 1000 divided
by the number of parallel restore jobs allowed.
---
 doc/src/sgml/ref/pg_restore.sgml     |  24 +++++
 src/bin/pg_dump/pg_backup.h          |   4 +-
 src/bin/pg_dump/pg_backup_archiver.c | 139 +++++++++++++++++++++++++--
 src/bin/pg_dump/pg_backup_archiver.h |   3 +
 src/bin/pg_dump/pg_backup_db.c       |  18 ++++
 src/bin/pg_dump/pg_restore.c         |  15 ++-
 src/bin/pg_upgrade/pg_upgrade.c      |   2 +
 7 files changed, 197 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index 1a23874da6..2e3ba80258 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -786,6 +786,30 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>

+     <varlistentry>
+      <term><option>--transaction-size=<replaceable class="parameter">N</replaceable></option></term>
+      <listitem>
+       <para>
+        Execute the restore as a series of transactions, each processing
+        up to <replaceable class="parameter">N</replaceable> database
+        objects.  This option implies <option>--exit-on-error</option>.
+       </para>
+       <para>
+        <option>--transaction-size</option> offers an intermediate choice
+        between the default behavior (one transaction per SQL command)
+        and <option>-1</option>/<option>--single-transaction</option>
+        (one transaction for all restored objects).
+        While <option>--single-transaction</option> has the least
+        overhead, it may be impractical for large databases because the
+        transaction will take a lock on each restored object, possibly
+        exhausting the server's lock table space.
+        Using <option>--transaction-size</option> with a size of a few
+        thousand objects offers nearly the same performance benefits while
+        capping the amount of lock table space needed.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--use-set-session-authorization</option></term>
       <listitem>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 9ef2f2017e..fbf5f1c515 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -149,7 +149,9 @@ typedef struct _restoreOptions
                                                  * compression */
     int            suppressDumpWarnings;    /* Suppress output of WARNING entries
                                          * to stderr */
-    bool        single_txn;
+
+    bool        single_txn;        /* restore all TOCs in one transaction */
+    int            txn_size;        /* restore this many TOCs per txn, if > 0 */

     bool       *idWanted;        /* array showing which dump IDs to emit */
     int            enable_row_security;
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 73b9972da4..ec74846998 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX)
             /* Otherwise, drop anything that's selected and has a dropStmt */
             if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
             {
+                bool        not_allowed_in_txn = false;
+
                 pg_log_info("dropping %s %s", te->desc, te->tag);
+
+                /*
+                 * In --transaction-size mode, we have to temporarily exit our
+                 * transaction block to drop objects that can't be dropped
+                 * within a transaction.
+                 */
+                if (ropt->txn_size > 0)
+                {
+                    if (strcmp(te->desc, "DATABASE") == 0 ||
+                        strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+                    {
+                        not_allowed_in_txn = true;
+                        if (AH->connection)
+                            CommitTransaction(AHX);
+                        else
+                            ahprintf(AH, "COMMIT;\n");
+                    }
+                }
+
                 /* Select owner and schema as necessary */
                 _becomeOwner(AH, te);
                 _selectOutputSchema(AH, te->namespace);
@@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX)
                         }
                     }
                 }
+
+                /*
+                 * In --transaction-size mode, re-establish the transaction
+                 * block if needed; otherwise, commit after every N drops.
+                 */
+                if (ropt->txn_size > 0)
+                {
+                    if (not_allowed_in_txn)
+                    {
+                        if (AH->connection)
+                            StartTransaction(AHX);
+                        else
+                            ahprintf(AH, "BEGIN;\n");
+                        AH->txnCount = 0;
+                    }
+                    else if (++AH->txnCount >= ropt->txn_size)
+                    {
+                        if (AH->connection)
+                        {
+                            CommitTransaction(AHX);
+                            StartTransaction(AHX);
+                        }
+                        else
+                            ahprintf(AH, "COMMIT;\nBEGIN;\n");
+                        AH->txnCount = 0;
+                    }
+                }
             }
         }

@@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX)
         }
     }

-    if (ropt->single_txn)
+    /*
+     * Close out any persistent transaction we may have.  While these two
+     * cases are started in different places, we can end both cases here.
+     */
+    if (ropt->single_txn || ropt->txn_size > 0)
     {
         if (AH->connection)
             CommitTransaction(AHX);
@@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
      */
     if ((reqs & REQ_SCHEMA) != 0)
     {
+        bool        object_is_db = false;
+
+        /*
+         * In --transaction-size mode, must exit our transaction block to
+         * create a database or set its properties.
+         */
+        if (strcmp(te->desc, "DATABASE") == 0 ||
+            strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+        {
+            object_is_db = true;
+            if (ropt->txn_size > 0)
+            {
+                if (AH->connection)
+                    CommitTransaction(&AH->public);
+                else
+                    ahprintf(AH, "COMMIT;\n\n");
+            }
+        }
+
         /* Show namespace in log message if available */
         if (te->namespace)
             pg_log_info("creating %s \"%s.%s\"",
@@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
         /*
          * If we created a DB, connect to it.  Also, if we changed DB
          * properties, reconnect to ensure that relevant GUC settings are
-         * applied to our session.
+         * applied to our session.  (That also restarts the transaction block
+         * in --transaction-size mode.)
          */
-        if (strcmp(te->desc, "DATABASE") == 0 ||
-            strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+        if (object_is_db)
         {
             pg_log_info("connecting to new database \"%s\"", te->tag);
             _reconnectToDB(AH, te->tag);
@@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
         }
     }

+    /*
+     * If we emitted anything for this TOC entry, that counts as one action
+     * against the transaction-size limit.  Commit if it's time to.
+     */
+    if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
+    {
+        if (++AH->txnCount >= ropt->txn_size)
+        {
+            if (AH->connection)
+            {
+                CommitTransaction(&AH->public);
+                StartTransaction(&AH->public);
+            }
+            else
+                ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
+            AH->txnCount = 0;
+        }
+    }
+
     if (AH->public.n_errors > 0 && status == WORKER_OK)
         status = WORKER_IGNORED_ERRORS;

@@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH)
 {
     RestoreOptions *ropt = AH->public.ropt;

-    if (!ropt->single_txn)
+    /*
+     * LOs must be restored within a transaction block, since we need the LO
+     * handle to stay open while we write it.  Establish a transaction unless
+     * there's one being used globally.
+     */
+    if (!(ropt->single_txn || ropt->txn_size > 0))
     {
         if (AH->connection)
             StartTransaction(&AH->public);
@@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH)
 {
     RestoreOptions *ropt = AH->public.ropt;

-    if (!ropt->single_txn)
+    if (!(ropt->single_txn || ropt->txn_size > 0))
     {
         if (AH->connection)
             CommitTransaction(&AH->public);
@@ -3170,6 +3265,19 @@ _doSetFixedOutputState(ArchiveHandle *AH)
     else
         ahprintf(AH, "SET row_security = off;\n");

+    /*
+     * In --transaction-size mode, we should always be in a transaction when
+     * we begin to restore objects.
+     */
+    if (ropt && ropt->txn_size > 0)
+    {
+        if (AH->connection)
+            StartTransaction(&AH->public);
+        else
+            ahprintf(AH, "\nBEGIN;\n");
+        AH->txnCount = 0;
+    }
+
     ahprintf(AH, "\n");
 }

@@ -4033,6 +4141,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
         }
     }

+    /*
+     * In --transaction-size mode, we must commit the open transaction before
+     * dropping the database connection.  This also ensures that child workers
+     * can see the objects we've created so far.
+     */
+    if (AH->public.ropt->txn_size > 0)
+        CommitTransaction(&AH->public);
+
     /*
      * Now close parent connection in prep for parallel steps.  We do this
      * mainly to ensure that we don't exceed the specified number of parallel
@@ -4772,6 +4888,10 @@ CloneArchive(ArchiveHandle *AH)
     clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
     memcpy(clone, AH, sizeof(ArchiveHandle));

+    /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
+    clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
+    memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
+
     /* Handle format-independent fields */
     memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));

@@ -4793,6 +4913,13 @@ CloneArchive(ArchiveHandle *AH)
     /* clones should not share lo_buf */
     clone->lo_buf = NULL;

+    /*
+     * Clone connections disregard --transaction-size; they must commit after
+     * each command so that the results are immediately visible to other
+     * workers.
+     */
+    clone->public.ropt->txn_size = 0;
+
     /*
      * Connect our new clone object to the database, using the same connection
      * parameters used for the original connection.
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index e4dd395582..1b9f142dea 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -324,6 +324,9 @@ struct _archiveHandle
     char       *currTablespace; /* current tablespace, or NULL */
     char       *currTableAm;    /* current table access method, or NULL */

+    /* in --transaction-size mode, this counts objects emitted in cur xact */
+    int            txnCount;
+
     void       *lo_buf;
     size_t        lo_buf_used;
     size_t        lo_buf_size;
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index c14d813b21..6b3bf174f2 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -554,6 +554,7 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
 {
     /* Make a writable copy of the command string */
     char       *buf = pg_strdup(te->defn);
+    RestoreOptions *ropt = AH->public.ropt;
     char       *st;
     char       *en;

@@ -562,6 +563,23 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
     {
         *en++ = '\0';
         ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd);
+
+        /* In --transaction-size mode, count each command as an action */
+        if (ropt && ropt->txn_size > 0)
+        {
+            if (++AH->txnCount >= ropt->txn_size)
+            {
+                if (AH->connection)
+                {
+                    CommitTransaction(&AH->public);
+                    StartTransaction(&AH->public);
+                }
+                else
+                    ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
+                AH->txnCount = 0;
+            }
+        }
+
         st = en;
     }
     ahprintf(AH, "\n");
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index c3beacdec1..5ea78cf7cc 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -120,6 +120,7 @@ main(int argc, char **argv)
         {"role", required_argument, NULL, 2},
         {"section", required_argument, NULL, 3},
         {"strict-names", no_argument, &strict_names, 1},
+        {"transaction-size", required_argument, NULL, 5},
         {"use-set-session-authorization", no_argument, &use_setsessauth, 1},
         {"no-comments", no_argument, &no_comments, 1},
         {"no-publications", no_argument, &no_publications, 1},
@@ -289,10 +290,18 @@ main(int argc, char **argv)
                 set_dump_section(optarg, &(opts->dumpSections));
                 break;

-            case 4:
+            case 4:                /* filter */
                 read_restore_filters(optarg, opts);
                 break;

+            case 5:                /* transaction-size */
+                if (!option_parse_int(optarg, "--transaction-size",
+                                      1, INT_MAX,
+                                      &opts->txn_size))
+                    exit(1);
+                opts->exit_on_error = true;
+                break;
+
             default:
                 /* getopt_long already emitted a complaint */
                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -337,6 +346,9 @@ main(int argc, char **argv)
     if (opts->dataOnly && opts->dropSchema)
         pg_fatal("options -c/--clean and -a/--data-only cannot be used together");

+    if (opts->single_txn && opts->txn_size > 0)
+        pg_fatal("options -1/--single-transaction and --transaction-size cannot be used together");
+
     /*
      * -C is not compatible with -1, because we can't create a database inside
      * a transaction block.
@@ -484,6 +496,7 @@ usage(const char *progname)
     printf(_("  --section=SECTION            restore named section (pre-data, data, or post-data)\n"));
     printf(_("  --strict-names               require table and/or schema include patterns to\n"
              "                               match at least one entity each\n"));
+    printf(_("  --transaction-size=N         commit after every N objects\n"));
     printf(_("  --use-set-session-authorization\n"
              "                               use SET SESSION AUTHORIZATION commands instead of\n"
              "                               ALTER OWNER commands to set ownership\n"));
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 3960af4036..5cfd2282e1 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -548,6 +548,7 @@ create_new_objects(void)
                   true,
                   true,
                   "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+                  "--transaction-size=1000 "
                   "--dbname postgres \"%s/%s\"",
                   new_cluster.bindir,
                   cluster_conn_opts(&new_cluster),
@@ -586,6 +587,7 @@ create_new_objects(void)
         parallel_exec_prog(log_file_name,
                            NULL,
                            "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+                           "--transaction-size=1000 "
                            "--dbname template1 \"%s/%s\"",
                            new_cluster.bindir,
                            cluster_conn_opts(&new_cluster),
--
2.39.3


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Michael Paquier
Дата:
Сообщение: Re: ci: Build standalone INSTALL file
Следующее
От: Jeremy Schneider
Дата:
Сообщение: Re: Built-in CTYPE provider