patch: streaming of bytea parameter values

Поиск
Список
Период
Сортировка
This patch allows bytea parameters set via setBytes() /
setBinaryStream() to be streamed to the backend. With the patch applied,
the additional memory overhead for bytea parameters is small and
independent of how large the parameter data is.

Note that it doesn't touch the ResultSet path, so you'll still need a
pile of extra memory if you execute queries that return large bytea values.

It passes the driver's testcases against a 7.4 server, and doesn't show
any problems after brief testing with our bytea-using application. I've
had a report of strange things happening on the server->client path, but
don't have any details at the moment.

This needs testing under a real workload, and testing against a 7.3
server (the V2 protocol path is currently untested). Any volunteers?

-O
? org/postgresql/core/StreamableBytea.java
Index: org/postgresql/core/Encoding.java
===================================================================
RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/core/Encoding.java,v
retrieving revision 1.14
diff -u -c -r1.14 Encoding.java
*** org/postgresql/core/Encoding.java    16 Feb 2004 11:35:20 -0000    1.14
--- org/postgresql/core/Encoding.java    21 Apr 2004 03:25:55 -0000
***************
*** 15,23 ****
--- 15,27 ----
  import java.io.InputStream;
  import java.io.InputStreamReader;
  import java.io.Reader;
+ import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
+ import java.io.Writer;
  import java.io.UnsupportedEncodingException;
  import java.sql.SQLException;
  import java.util.Hashtable;
+ import java.util.Vector;
  import org.postgresql.util.PSQLException;
  import org.postgresql.util.PSQLState;

***************
*** 31,36 ****
--- 35,45 ----
       */
      private static final Hashtable encodings = new Hashtable();

+     /*
+      * Encodings that are "7-bit safe"; see below.
+      */
+     private static final Vector sevenBitEncodings = new Vector();
+
      static {
          //Note: this list should match the set of supported server
          // encodings found in backend/util/mb/encnames.c
***************
*** 72,84 ****
--- 81,109 ----
          encodings.put("LATIN6", new String[0]);
          encodings.put("LATIN8", new String[0]);
          encodings.put("LATIN10", new String[0]);
+
+         //
+         // These are the Java encodings known to be 7-bit safe i.e. you can
+         // represent characters 32..127 as a single byte with the corresponding value.
+         // NB: this list needs extending by someone who knows about encodings,
+         // I've only populated it with those I'm familiar with.
+         //
+         // Beware of case sensitivity here.
+         //
+         sevenBitEncodings.addElement("ASCII");
+         sevenBitEncodings.addElement("us-ascii");
+         sevenBitEncodings.addElement("UTF8");
+         sevenBitEncodings.addElement("UTF-8");
+         sevenBitEncodings.addElement("ISO8859_1");
      }

      private final String encoding;
+     private final boolean sevenBit;

      private Encoding(String encoding)
      {
          this.encoding = encoding;
+         this.sevenBit = (encoding != null && sevenBitEncodings.contains(encoding));
      }

      /*
***************
*** 136,141 ****
--- 161,174 ----
          return encoding;
      }

+     /**
+      * @return true if this encoding is "7 bit safe"
+      */
+     public boolean isSevenBitSafe()
+     {
+         return sevenBit;
+     }
+
      /*
       * Encode a string to an array of bytes.
       */
***************
*** 212,217 ****
--- 245,272 ----
              else
              {
                  return new InputStreamReader(in, encoding);
+             }
+         }
+         catch (UnsupportedEncodingException e)
+         {
+             throw new PSQLException("postgresql.res.encoding", PSQLState.DATA_ERROR, e);
+         }
+     }
+
+     /*
+      * Get a Writer that encodes to the given OutputStream.
+      */
+     public Writer getEncodingWriter(OutputStream out) throws SQLException
+     {
+         try
+         {
+             if (encoding == null)
+             {
+                 return new OutputStreamWriter(out);
+             }
+             else
+             {
+                 return new OutputStreamWriter(out, encoding);
              }
          }
          catch (UnsupportedEncodingException e)
Index: org/postgresql/core/QueryExecutor.java
===================================================================
RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/core/QueryExecutor.java,v
retrieving revision 1.34
diff -u -c -r1.34 QueryExecutor.java
*** org/postgresql/core/QueryExecutor.java    10 Apr 2004 13:53:10 -0000    1.34
--- org/postgresql/core/QueryExecutor.java    21 Apr 2004 03:25:55 -0000
***************
*** 14,19 ****
--- 14,21 ----

  import java.util.Vector;
  import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.Writer;
  import java.sql.*;
  import org.postgresql.Driver;
  import org.postgresql.util.PSQLException;
***************
*** 321,326 ****
--- 323,391 ----
          }
      }

+     /**
+      * Send a streamable bytea parameter encoded as a seven-bit text representation.
+      */
+     private void streamByteaSevenBit(StreamableBytea param, boolean escapeEverything) throws IOException {
+         pgStream.SendChar('\'');
+
+         InputStream stream = param.getStream();
+         byte[] buffer = new byte[] { (byte)'\\', (byte)'\\', 0, 0, 0 };
+         for (int remaining = param.getLength(); remaining > 0; --remaining) {
+             int nextByte = stream.read();
+
+             if (escapeEverything ||
+                 nextByte < 32 || nextByte > 127 ||
+                 nextByte == '\\' || nextByte == '\'') {
+                 // Needs escaping.
+                 buffer[2] = (byte)( (int)'0' + ((nextByte >> 6) & 3));
+                 buffer[3] = (byte)( (int)'0' + ((nextByte >> 3) & 7));
+                 buffer[4] = (byte)( (int)'0' + (nextByte & 7));
+                 pgStream.Send(buffer, 5);
+             } else {
+                 pgStream.SendChar(nextByte);
+             }
+         }
+
+         pgStream.SendChar('\'');
+         param.close();
+     }
+
+     /**
+      * Send a streamable bytea encoded as a text representation with an arbitary encoding.
+      */
+     private void streamBytea(StreamableBytea param, Encoding encoding) throws IOException, SQLException {
+         if (encoding.isSevenBitSafe()) {
+             streamByteaSevenBit(param, false);
+             return;
+         }
+
+         // NB: we escape everything in this path as I don't like assuming
+         // that byte values 32..127 will make it through the encoding
+         // unscathed..
+
+         Writer writer = encoding.getEncodingWriter(pgStream.pg_output);
+
+         InputStream stream = param.getStream();
+         char[] buffer = new char[] { '\\', '\\', 0, 0, 0 };
+
+         writer.write('\'');
+         for (int remaining = param.getLength(); remaining > 0; --remaining) {
+             int nextByte = stream.read();
+
+             buffer[2] = (char)( '0' + ((nextByte >> 6) & 3));
+             buffer[3] = (char)( '0' + ((nextByte >> 3) & 7));
+             buffer[4] = (char)( '0' + (nextByte & 7));
+
+             writer.write(buffer, 0, 5);
+         }
+
+         writer.write('\'');
+         writer.flush(); // But do not close()!
+
+         param.close();
+     }
+
      /*
       * Send a query to the backend.
       */
***************
*** 337,357 ****
              int j = 0;
              int l_msgSize = 4;
              Encoding l_encoding = connection.getEncoding();
              pgStream.SendChar('Q');
              for (int i = 0 ; i < m_binds.length ; ++i)
              {
                  l_parts[j] = l_encoding.encode(m_sqlFrags[i]);
                  l_msgSize += l_parts[j].length;
                  j++;
!                 l_parts[j] = l_encoding.encode(m_binds[i].toString());
!                 l_msgSize += l_parts[j].length;
                  j++;
              }
              l_parts[j] = l_encoding.encode(m_sqlFrags[m_binds.length]);
              l_msgSize += l_parts[j].length;
              pgStream.SendInteger(l_msgSize+1,4);
              for (int k = 0; k < l_parts.length; k++) {
!                 pgStream.Send(l_parts[k]);
              }
              pgStream.SendChar(0);
              pgStream.flush();
--- 402,440 ----
              int j = 0;
              int l_msgSize = 4;
              Encoding l_encoding = connection.getEncoding();
+
              pgStream.SendChar('Q');
+
+             // Compute and send message size.
              for (int i = 0 ; i < m_binds.length ; ++i)
              {
                  l_parts[j] = l_encoding.encode(m_sqlFrags[i]);
                  l_msgSize += l_parts[j].length;
                  j++;
!                 if (m_binds[i] instanceof StreamableBytea) {
!                     l_parts[j] = null;
!                     // Magic constants:
!                     //  2 bytes for the leading and trailing single quotes
!                     //  every input byte becomes '\\nnn' with one output byte per character => 5 bytes.
!                     l_msgSize += 2 + ((StreamableBytea)m_binds[i]).getLength() * 5;
!                 } else {
!                     l_parts[j] = l_encoding.encode(m_binds[i].toString());
!                     l_msgSize += l_parts[j].length;
!                 }
                  j++;
              }
              l_parts[j] = l_encoding.encode(m_sqlFrags[m_binds.length]);
              l_msgSize += l_parts[j].length;
              pgStream.SendInteger(l_msgSize+1,4);
+
+             // Now the message itself.
              for (int k = 0; k < l_parts.length; k++) {
!                 if (l_parts[k] == null) {
!                     StreamableBytea param = (StreamableBytea)m_binds[k/2];
!                     streamByteaSevenBit(param, true); // Escape everything to get a predictable output length.
!                 } else {
!                     pgStream.Send(l_parts[k]);
!                 }
              }
              pgStream.SendChar(0);
              pgStream.flush();
***************
*** 378,384 ****
              for (int i = 0 ; i < m_binds.length ; ++i)
              {
                  pgStream.Send(connection.getEncoding().encode(m_sqlFrags[i]));
!                 pgStream.Send(connection.getEncoding().encode(m_binds[i].toString()));
              }

              pgStream.Send(connection.getEncoding().encode(m_sqlFrags[m_binds.length]));
--- 461,470 ----
              for (int i = 0 ; i < m_binds.length ; ++i)
              {
                  pgStream.Send(connection.getEncoding().encode(m_sqlFrags[i]));
!                 if (m_binds[i] instanceof StreamableBytea)
!                     streamBytea((StreamableBytea)m_binds[i], connection.getEncoding());
!                 else
!                     pgStream.Send(connection.getEncoding().encode(m_binds[i].toString()));
              }

              pgStream.Send(connection.getEncoding().encode(m_sqlFrags[m_binds.length]));
Index: org/postgresql/jdbc1/AbstractJdbc1Statement.java
===================================================================
RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java,v
retrieving revision 1.52
diff -u -c -r1.52 AbstractJdbc1Statement.java
*** org/postgresql/jdbc1/AbstractJdbc1Statement.java    29 Mar 2004 19:17:11 -0000    1.52
--- org/postgresql/jdbc1/AbstractJdbc1Statement.java    21 Apr 2004 03:25:56 -0000
***************
*** 5,10 ****
--- 5,11 ----
  import org.postgresql.core.BaseStatement;
  import org.postgresql.core.Field;
  import org.postgresql.core.QueryExecutor;
+ import org.postgresql.core.StreamableBytea;
  import org.postgresql.largeobject.LargeObject;
  import org.postgresql.largeobject.LargeObjectManager;
  import org.postgresql.util.PGbytea;
***************
*** 12,17 ****
--- 13,19 ----
  import org.postgresql.util.PSQLException;
  import org.postgresql.util.PSQLState;
  import java.io.IOException;
+ import java.io.ByteArrayInputStream;
  import java.io.InputStream;
  import java.io.InputStreamReader;
  import java.io.OutputStream;
***************
*** 1225,1231 ****
              }
              else
              {
!                 setString(parameterIndex, PGbytea.toPGString(x), PG_BYTEA);
              }
          }
          else
--- 1227,1241 ----
              }
              else
              {
!                 final byte[] clonedParameter = (byte[])x.clone();
!                 final ByteArrayInputStream bais = new ByteArrayInputStream(clonedParameter);
!                 StreamableBytea param = new StreamableBytea() {
!                         public int getLength()          { return clonedParameter.length; }
!                         public InputStream getStream()  { return bais; }
!                         public void close()             { try { bais.close(); } catch (IOException ioe) {} }
!                     };
!
!                 bind(parameterIndex, param, PG_BYTEA);
              }
          }
          else
***************
*** 1514,1520 ****
       * @param x the parameter value
       * @exception SQLException if a database access error occurs
       */
!     public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException
      {
          checkClosed();
          if (connection.haveMinimumCompatibleVersion("7.2"))
--- 1524,1530 ----
       * @param x the parameter value
       * @exception SQLException if a database access error occurs
       */
!     public void setBinaryStream(int parameterIndex, final InputStream x, final int length) throws SQLException
      {
          checkClosed();
          if (connection.haveMinimumCompatibleVersion("7.2"))
***************
*** 1529,1568 ****
              //As the spec/javadoc for this method indicate this is to be used for
              //large binary values (i.e. LONGVARBINARY)    PG doesn't have a separate
              //long binary datatype, but with toast the bytea datatype is capable of
!             //handling very large values.  Thus the implementation ends up calling
!             //setBytes() since there is no current way to stream the value to the server
!             byte[] l_bytes = new byte[length];
!             int l_bytesRead = 0;
!             try
!             {
!                 while (true)
!                 {
!                     int n = x.read(l_bytes, l_bytesRead, length - l_bytesRead);
!                     if (n == -1)
!                         break;
!
!                     l_bytesRead += n;
!
!                     if (l_bytesRead == length)
!                         break;
!
!                 }
!             }
!             catch (IOException l_ioe)
!             {
!                 throw new PSQLException("postgresql.unusual", PSQLState.UNEXPECTED_ERROR, l_ioe);
!             }
!             if (l_bytesRead == length)
!             {
!                 setBytes(parameterIndex, l_bytes);
!             }
!             else
!             {
!                 //the stream contained less data than they said
!                 byte[] l_bytes2 = new byte[l_bytesRead];
!                 System.arraycopy(l_bytes, 0, l_bytes2, 0, l_bytesRead);
!                 setBytes(parameterIndex, l_bytes2);
!             }
          }
          else
          {
--- 1539,1553 ----
              //As the spec/javadoc for this method indicate this is to be used for
              //large binary values (i.e. LONGVARBINARY)    PG doesn't have a separate
              //long binary datatype, but with toast the bytea datatype is capable of
!             //handling very large values.
!
!             StreamableBytea param = new StreamableBytea() {
!                     public int getLength()          { return length; }
!                     public InputStream getStream()  { return x; }
!                     public void close()             { /* no-op; is this correct? */ }
!                 };
!
!             bind(parameterIndex, param, PG_BYTEA);
          }
          else
          {
*** /dev/null    Mon Feb  2 17:15:26 2004
--- org/postgresql/core/StreamableBytea.java    Wed Apr 21 14:37:37 2004
***************
*** 0 ****
--- 1,25 ----
+ package org.postgresql.core;
+
+ import java.io.InputStream;
+
+ /**
+  * Wrapper for bytea representations that can stream themselves to a
+  * binary form on demand.
+  */
+ public interface StreamableBytea {
+     /**
+      * @return the total length of the stream returned by {@link #getStream()}.
+      */
+     int getLength();
+
+     /**
+      * @return a stream that the raw bytea data can be read from.
+      */
+     InputStream getStream();
+
+     /**
+      * Free any underlying resources associated with this parameter.
+      * getStream() / getLength() should not be called after close() is called.
+      */
+     void close();
+ }

В списке pgsql-jdbc по дате отправления:

Предыдущее
От: Dave Cramer
Дата:
Сообщение: Re: slow seqscan
Следующее
От: Bruce Momjian
Дата:
Сообщение: Re: [PATCHES] EXECUTE command tag returns actual command