diff -rcN postgresql-jdbc-8.2-505.src.orig/build.xml postgresql-jdbc-8.2-505.src.copy/build.xml
*** postgresql-jdbc-8.2-505.src.orig/build.xml 2006-11-29 06:00:15.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/build.xml 2007-06-15 08:32:51.000000000 +0300
***************
*** 84,89 ****
--- 84,96 ----
+
+
+
+
+
+
+
***************
*** 116,121 ****
--- 123,129 ----
+
***************
*** 393,398 ****
--- 401,407 ----
+
***************
*** 428,433 ****
--- 437,443 ----
+
diff -rcN postgresql-jdbc-8.2-505.src.orig/doc/pgjdbc.xml postgresql-jdbc-8.2-505.src.copy/doc/pgjdbc.xml
*** postgresql-jdbc-8.2-505.src.orig/doc/pgjdbc.xml 2007-02-19 08:04:48.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/doc/pgjdbc.xml 2007-06-15 14:25:52.000000000 +0300
***************
*** 2480,2485 ****
--- 2480,2529 ----
+
+
+ Copy
+
+ Bulk data transfer with INSERT or SELECT
+ can be quite slow for large amounts of data.
+ PostgreSQL provides a special SQL
+ statement COPY for this purpose.
+ It can be used either to exchange data between a file and a table, which requires
+ superuser privileges to access files on database server, or between the client and
+ a table, which requires special copy subprotocol support from the database driver.
+ Latter is available as an extension of this JDBC driver for
+ protocol version 3 (PostgreSQL 7.4 and newer).
+
+
+
+ Copy API is available from PGConnection.getCopyAPI().
+ It provides a variety of one-shot import and export methods.
+ Streams and byte arrays are readily supported.
+ Other (say, object field based) exchangers can be based on available interfaces.
+
+
+
+ // get hold of the extension interface
+ import org.postgresql.copy.CopyManager;
+ CopyManager copyAPI = ((org.postgresql.PGConnection)conn).getCopyAPI();
+
+ // handy constants for default format
+ static final byte[] rowSep = "\n".getBytes();
+ static final byte[] fieldSep = "\".getBytes();
+ static final byte[] nullSeq = "\\N".getBytes();
+
+ // write some data to a fictional (varchar,int) table
+ byte[][] mydata = {
+ "First row\t123\n".getBytes(),
+ "Second".getBytes(), fieldSep, nullSeq, rowSep
+ };
+ copyAPI.copyIntoDB( "COPY mytable FROM STDIN", mydata );
+
+ // show contents of a fictional table
+ copyAPI.copyFromDB("COPY mytable TO STDOUT", System.out);
+
+
+
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/PGConnection.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/PGConnection.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/PGConnection.java 2005-04-20 03:10:58.000000000 +0300
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/PGConnection.java 2007-06-14 19:38:29.000000000 +0300
***************
*** 11,16 ****
--- 11,17 ----
import java.sql.*;
import org.postgresql.core.Encoding;
+ import org.postgresql.copy.CopyManager;
import org.postgresql.fastpath.Fastpath;
import org.postgresql.largeobject.LargeObjectManager;
***************
*** 30,35 ****
--- 31,42 ----
public PGNotification[] getNotifications() throws SQLException;
/**
+ * This returns the COPY API for the current connection.
+ * @since 8.4
+ */
+ public CopyManager getCopyAPI() throws SQLException;
+
+ /**
* This returns the LargeObject API for the current connection.
* @since 7.3
*/
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyManager.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyManager.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyManager.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyManager.java 2007-06-15 14:33:28.000000000 +0300
***************
*** 0 ****
--- 1,185 ----
+ /**
+ * Bulk data copy for PostgreSQL
+ */
+ package org.postgresql.copy;
+
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.io.InputStream;
+ import java.sql.SQLException;
+
+ import org.postgresql.core.*;
+ import org.postgresql.core.v3.QueryExecutorImpl; // COPY is only implemented for protocol v3
+
+ /**
+ * API for PostgreSQL protocol version 3 COPY block data transfer
+ * @author Kalle Hallivuori
+ * @since 8.2
+ */
+ public class CopyManager {
+ public static final int DEFAULT_BUFFER_SIZE = 10000;
+ private final QueryExecutorImpl queryExecutor;
+
+ public CopyManager(BaseConnection connection) {
+ this.queryExecutor = (QueryExecutorImpl)connection.getQueryExecutor();
+ }
+
+ /**
+ * Feeds contents of given stream into a database table according to given COPY FROM STDIN statement
+ * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one
+ * @param ins A "file" of comma separated values or whatever format you chose
+ * @return for PostgreSQL 8.2 and later, number of updated rows; for earlier versions, number of buffers written
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyIntoDB(String sql, InputStream ins) throws SQLException, IOException {
+ return copyIntoDB(sql, ins, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Feeds contents of given stream into a database table according to given COPY FROM STDIN statement
+ * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one
+ * @param ins A "file" of comma separated values or whatever format you chose
+ * @param bufferSize specify size of buffer used between ins and database
+ * @return for PostgreSQL 8.2 and later, number of updated rows; for earlier versions, number of buffers written
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyIntoDB(String sql, InputStream ins, int bufferSize) throws SQLException, IOException {
+ return copyIntoDB(sql, new CopydataFromInputStream(ins, bufferSize));
+ }
+
+ /**
+ * Feeds contents of given byte arrays into a database table according to given COPY FROM STDIN statement
+ * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one
+ * @param buf container for all bytes to feed for the COPY
+ * @return for PostgreSQL 8.2 and later, number of updated rows; for earlier versions, number of byte arrays written
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyIntoDB(String sql, byte[][] buf) throws SQLException, IOException {
+ return copyIntoDB(sql, buf, 0, buf.length, 0, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Feeds contents of given byte arrays into a database table according to given COPY FROM STDIN statement.
+ * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one
+ * @param buf arrays of bytes to write one after another, grouping does not matter
+ * @param from index of first array to start writing from
+ * @param upTo write up to this array; see lastLength
+ * @param lastLength if this is greater than zero, that many bytes from buf[upTo] will be written at end
+ * @param chunkSize preferred minimum size for data chunks to write to server
+ * @return prior to PostgreSQL 8.2: number of arrays written; beginning from 8.2: number of rows updated
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyIntoDB(String sql, byte[][] buf, int from, int upTo, int lastLength, int chunkSize) throws SQLException, IOException {
+ return copyIntoDB(sql, new CopydataFromByteaa(buf, from, upTo, lastLength, chunkSize));
+ }
+
+ /**
+ * Write contents of a database table into given stream according to given COPY TO STDOUT statement.
+ * @param sql COPY TO STDOUT statement; you can use buildCopyQuery to construct one
+ * @param outs results from database are written to this OutputStream
+ * @return 1 (number of last buffers written, using only one buffer)
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyFromDB(String sql, OutputStream outs) throws SQLException, IOException {
+ return copyFromDB(sql, outs, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Write contents of a database table into given stream according to given COPY TO STDOUT statement.
+ * @param sql COPY TO STDOUT statement; you can use buildCopyQuery to construct one
+ * @param outs results from database are written to this OutputStream
+ * @param bufferSize specify size of buffer used between database and outs
+ * @return prior to PostgreSQL 8.2: number of arrays written; beginning from 8.2: number of rows updated
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyFromDB(String sql, OutputStream outs, int bufferSize) throws SQLException, IOException {
+ return copyFromDB(sql, new CopydataToOutputStream(outs, bufferSize));
+ }
+
+ /**
+ * Read contents of a database table into given buffer according to given COPY TO STDOUT statement.
+ * Rest of input is skipped. A good idea for size of buffer is returned by SELECT COUNT(*) from same table.
+ * @param sql COPY TO STDOUT statement; you can use buildCopyQuery to construct one
+ * @param buf Buffer to hold the read data.
+ * @return index of next free slot in buf
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyFromDB(String sql, byte[][] buf) throws SQLException, IOException {
+ return copyFromDB(sql, buf, 0, buf.length);
+ }
+
+ /**
+ * Read contents of a database table into given buffer according to given COPY TO STDOUT statement.
+ * @param sql COPY TO STDOUT statement; you can use buildCopyQuery to construct one
+ * @param buf Buffer to hold the read data.
+ * @param from index to begin filling buf at (normally 0)
+ * @param upTo index where to stop filling buf at (normally buf.length)
+ * @return index of next free slot in buf
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyFromDB(String sql, byte[][] buf, int from, int upTo) throws SQLException, IOException {
+ return copyFromDB(sql, new CopydataToByteaa(buf, from, upTo));
+ }
+
+ /**
+ * Feeds data from given provider into a database table according to given COPY FROM STDIN statement
+ * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one
+ * @param provider party providing the data to write
+ * @return 1 (number of last buffers read, using only one buffer)
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyIntoDB(String sql, CopydataProvider provider) throws SQLException, IOException {
+ return queryExecutor.copy(sql, provider, null);
+ }
+
+ /**
+ * Read contents of a database table for given consumer according to given COPY TO STDOUT statement.
+ * @param sql COPY TO STDOUT statement
+ * @param consumer party responsible for passing on the received data
+ * @return 1 (number of last buffers read, using only one buffer)
+ * @throws SQLException if the copy operation fails
+ * @throws IOException if the database connection breaks
+ */
+ public int copyFromDB(String sql, CopydataConsumer consumer) throws SQLException, IOException {
+ return queryExecutor.copy(sql, null, consumer);
+ }
+
+ /**
+ * Construct a COPY statement
+ * @param tableName name of the table used
+ * @param fieldNames array of names of fields to access, in the order specified by the array
+ * @param writeToDB true: COPY FROM STDIN; false: COPY TO STDOUT
+ * @param with additional specifications such as separators, see COPY documentation
+ * @return specified SQL query
+ */
+ public String buildCopyQuery(String tableName, String[] fieldNames, boolean writeToDB, String[] with) {
+ StringBuilder sql = new StringBuilder("COPY ").append(tableName);
+ if(fieldNames != null) {
+ StringBuilder fields = new StringBuilder();
+ for(int i=0; i 0 )
+ fields.append(",");
+ fields.append(fieldNames[i]);
+ }
+ if(fields.length() > 0)
+ sql.append("(").append(fields).append(")");
+ }
+ sql.append(writeToDB ? " FROM STDIN" : " TO STDOUT");
+ if(with != null) {
+ for(int i=0; i
+ */
+ public class Copydata {
+ public byte[][] data;
+ public int from, at, upTo, lastLength; // offsets are not per array to save some memory
+ public boolean reuseArrays;
+
+ Copydata() {}
+
+ Copydata(int aaSize, int arraySize, boolean reuse) {
+ if(aaSize > 0)
+ data = (arraySize > 0) ? new byte[aaSize][arraySize] : new byte[aaSize][];
+ upTo = aaSize;
+ reuseArrays = reuse;
+ }
+ }
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataConsumer.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataConsumer.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataConsumer.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataConsumer.java 2007-06-15 14:33:28.000000000 +0300
***************
*** 0 ****
--- 1,36 ----
+ /**
+ * Bulk data copy for PostgreSQL
+ */
+ package org.postgresql.copy;
+
+ /**
+ * CopydataConsumers are fed data from PostgreSQL COPY TO STDOUT queries
+ * @author Kalle Hallivuori
+ * @since 8.2
+ */
+ public interface CopydataConsumer {
+
+ /**
+ * Receive a format header at start of COPY TO STDOUT
+ * Format code is 0 for text (CSV), 1 for binary (PostgreSQL internal).
+ * If you don't get a straight row of zeros, just cry out loud.
+ * Number of columns is header.length-1
+ * @param header element 0 is the overall format, thereafter formats for each column follow
+ * @see http://developer.postgresql.org/pgdocs/postgres/protocol-message-formats.html#CopyInResponse
+ */
+ void gotHeader(int[] header);
+
+ /**
+ * Called when a buffer for new data from query is needed. Return null to stop input.
+ * @return buffer to fill, that will eventually be handed back to consume()
+ */
+ Copydata getContainer();
+
+ /**
+ * Called to collect data during receiving results from database.
+ * Either the container is full (copydata.at==copydata.upTo) or all results have been received.
+ * @param copydata the container originated from getContainer() filled with data
+ * @throws Exception to stop collecting results
+ */
+ void consume(Copydata copydata) throws Exception;
+ }
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataFromByteaa.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataFromByteaa.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataFromByteaa.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataFromByteaa.java 2007-06-15 14:33:28.000000000 +0300
***************
*** 0 ****
--- 1,36 ----
+ /**
+ * Bulk data copy for PostgreSQL
+ */
+ package org.postgresql.copy;
+
+ import java.io.IOException;
+
+ /**
+ * Abuses Copydata to provide overly flat interface for passing data to COPY
+ * @author Kalle Hallivuori
+ */
+ public class CopydataFromByteaa extends Copydata implements CopydataProvider {
+
+ int prefChunkSize;
+
+ public CopydataFromByteaa(byte[][] data, int from, int upTo, int lastLength, int chunkSize) {
+ this.data = data;
+ this.from = from;
+ this.upTo = upTo;
+ this.lastLength = lastLength;
+ prefChunkSize = chunkSize;
+ }
+
+ public int getPreferredChunkSize() {
+ return prefChunkSize;
+ }
+
+ public void gotHeader(int[] header) {
+ // SEP
+ }
+
+ public Copydata provide() throws IOException {
+ return (this.at < this.upTo) ? this : null;
+ }
+
+ }
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataFromInputStream.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataFromInputStream.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataFromInputStream.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataFromInputStream.java 2007-06-15 14:33:28.000000000 +0300
***************
*** 0 ****
--- 1,41 ----
+ /**
+ * Bulk data copy for PostgreSQL
+ */
+ package org.postgresql.copy;
+
+ import java.io.IOException;
+ import java.io.InputStream;
+
+ /**
+ * Passes to COPY FROM STDIN a buffer of given size filled from given stream
+ * until end of stream.
+ * @author Kalle Hallivuori
+ *
+ */
+ public class CopydataFromInputStream implements CopydataProvider {
+
+ InputStream ins;
+ Copydata buf;
+
+ CopydataFromInputStream(InputStream ins, int bufferSize) {
+ this.ins = ins;
+ buf = new Copydata(1,bufferSize,true);
+ }
+
+ public int getPreferredChunkSize() {
+ return buf.data[0].length;
+ }
+
+ public void gotHeader(int[] header) {
+ // SEP
+ }
+
+ public Copydata provide() throws IOException {
+ int i = ins.read(buf.data[0], 0, buf.data[0].length);
+ if( i > 0 ) {
+ buf.lastLength = i;
+ return buf;
+ }
+ return null;
+ }
+ }
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataProvider.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataProvider.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataProvider.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataProvider.java 2007-06-15 14:33:28.000000000 +0300
***************
*** 0 ****
--- 1,36 ----
+ /**
+ * Bulk data copy for PostgreSQL
+ */
+ package org.postgresql.copy;
+
+ import java.io.IOException;
+
+ /**
+ * CopydataProviders offer data for PostgreSQL COPY FROM STDIN bulk data imports
+ * @author Kalle Hallivuori
+ * @since 8.2
+ */
+ public interface CopydataProvider {
+
+ /**
+ * Receive a format header at start of COPY FROM STDIN
+ * Format code is 0 for text (CSV), 1 for binary (PostgreSQL internal).
+ * If you don't get a straight row of zeros, just cry out loud.
+ * Number of columns is header.length-1
+ * @param header element 0 is the overall format, thereafter formats for each column follow
+ * @see http://developer.postgresql.org/pgdocs/postgres/protocol-message-formats.html#CopyInResponse
+ */
+ void gotHeader(int[] header);
+
+ /**
+ * @return Preferred copydata chunk size for database connection. Try values from one to tens of thousands.
+ */
+ int getPreferredChunkSize();
+
+ /**
+ * Called during bulk data import operation to get more data.
+ * @return data to write to database
+ * @throws IOException cancel copy operation
+ */
+ Copydata provide() throws IOException;
+ }
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataToByteaa.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataToByteaa.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataToByteaa.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataToByteaa.java 2007-06-15 14:33:28.000000000 +0300
***************
*** 0 ****
--- 1,40 ----
+ /**
+ * Bulk data copy for PostgreSQL
+ */
+ package org.postgresql.copy;
+
+ /**
+ * @author Kalle Hallivuori
+ */
+ public class CopydataToByteaa extends Copydata implements CopydataConsumer {
+
+ CopydataToByteaa(byte[][] buf, int from, int upTo) {
+ this.data = buf;
+ this.from = this.at = from;
+ this.upTo = upTo;
+ this.reuseArrays = false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.postgresql.copy.CopydataConsumer#consume(org.postgresql.copy.Copydata)
+ */
+ public void consume(Copydata copydata) throws Exception {
+ // we already got the data into self so do nothing
+ }
+
+ /* (non-Javadoc)
+ * @see org.postgresql.copy.CopydataConsumer#getContainer()
+ */
+ public Copydata getContainer() {
+ // by not resetting at we protect data we contain
+ return (at < upTo) ? this : null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.postgresql.copy.CopydataConsumer#gotHeader(int[])
+ */
+ public void gotHeader(int[] header) {
+ // SEP
+ }
+
+ }
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataToOutputStream.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataToOutputStream.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataToOutputStream.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataToOutputStream.java 2007-06-15 14:33:28.000000000 +0300
***************
*** 0 ****
--- 1,33 ----
+ /**
+ * Bulk data copy for PostgreSQL
+ */
+ package org.postgresql.copy;
+
+ import java.io.OutputStream;
+
+ /**
+ * @author Kalle Hallivuori
+ */
+ public class CopydataToOutputStream implements CopydataConsumer {
+
+ OutputStream outs;
+ Copydata buf;
+
+ CopydataToOutputStream(OutputStream outs, int bufferSize) {
+ this.outs = outs;
+ buf = new Copydata(1, bufferSize, true);
+ }
+
+ public void consume(Copydata copydata) throws Exception {
+ outs.write(buf.data[0], 0, buf.lastLength);
+ buf.at = buf.lastLength = 0;
+ }
+
+ public Copydata getContainer() {
+ return buf;
+ }
+
+ public void gotHeader(int[] header) {
+ // SEP
+ }
+ }
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/core/v3/QueryExecutorImpl.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/core/v3/QueryExecutorImpl.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/core/v3/QueryExecutorImpl.java 2006-12-01 10:53:45.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/core/v3/QueryExecutorImpl.java 2007-06-15 14:00:34.000000000 +0300
***************
*** 27,32 ****
--- 27,33 ----
import org.postgresql.util.PSQLState;
import org.postgresql.util.ServerErrorMessage;
import org.postgresql.util.GT;
+ import org.postgresql.copy.*;
/**
* QueryExecutor implementation for the V3 protocol.
***************
*** 630,635 ****
--- 631,867 ----
return returnValue;
}
+ //
+ // Copy subprotocol implementation
+ //
+
+ /**
+ * COPY FROM STDIN / COPY TO STDOUT
+ * @author kato@iki.fi
+ * @since 8.4
+ * @param sql The COPY FROM STDIN / COPY TO STDOUT statement to issue
+ * @param provider data provider for COPY FROM STDIN
+ * @param consumer data consumer for COPY TO STDOUT
+ * @throws SQLException on recovered communication failures
+ * @throws IOException passed from low level connection
+ */
+ public synchronized int copy(String sql, CopydataProvider provider, CopydataConsumer consumer) throws SQLException, IOException {
+
+ if(sql == null || (provider == null && consumer == null))
+ throw new PSQLException( GT.tr("You forgot to specify what to copy"), PSQLState.PROTOCOL_VIOLATION );
+
+ Copydata copydata = null;
+ boolean endReceiving = false;
+ int returnValue = -1;
+ SQLException error = null, errorChain = null;
+ int[] header = null;
+ int l_len;
+
+ // send starting query
+ byte buf[] = sql.getBytes();
+ pgStream.SendChar('Q');
+ pgStream.SendInteger4(buf.length + 4 + 1);
+ pgStream.Send(buf);
+ pgStream.SendChar(0);
+ pgStream.flush();
+
+ while(!endReceiving) {
+ int c = pgStream.ReceiveChar();
+ switch(c) {
+
+ case 'A': // Asynchronous Notify
+
+ receiveAsyncNotify();
+ break;
+
+ case 'C': // Command Complete
+
+ String status = receiveCommandStatus();
+ if(status.startsWith("COPY ")) // server >= 8.2 sends number of updated rows
+ returnValue = Integer.parseInt(status.substring(1 + status.lastIndexOf(' ')));
+ break;
+
+ case 'N': // Notice Response
+
+ receiveNoticeResponse(); // FIXME: Save this somewhere, handle it
+ break;
+
+ case 'E': // ErrorMessage (expected response to CopyFail)
+
+ error = receiveErrorResponse();
+ if (logger.logDebug())
+ logger.debug(" <=BE Error in copy: " + error);
+ break;
+
+ case 'G': // CopyInResponse
+
+ header = receiveCopyHeader();
+
+ if (logger.logDebug())
+ logger.debug(" <=BE CopyInResponse " + (header.length-1) + " columns of " + header[0]);
+ if(provider == null)
+ throw new PSQLException(GT.tr("COPY FROM STDIN missing CopydataProvider"), PSQLState.PROTOCOL_VIOLATION);
+
+ provider.gotHeader(header);
+ int prefChunkSize = provider.getPreferredChunkSize();
+ returnValue = 0;
+
+ // now send our data
+ try {
+ while( (copydata=provider.provide()) != null ) {
+ while(copydata.at < copydata.upTo) {
+ int chunkUpTo = copydata.at;
+ l_len = 0;
+
+ // count how many bytearrays make up the preferred (minimum) send chunk size
+ while(chunkUpTo < copydata.upTo && l_len <= prefChunkSize)
+ l_len += copydata.data[chunkUpTo++].length;
+ returnValue = chunkUpTo - copydata.at; // keep total "row" count in case of server < 8.2
+ if(chunkUpTo == copydata.upTo && copydata.lastLength > 0) { // include last, possibly partial row
+ l_len += copydata.lastLength;
+ returnValue++;
+ }
+
+ pgStream.SendChar('d');
+ pgStream.SendInteger4(l_len + 4);
+ while(copydata.at < chunkUpTo)
+ pgStream.Send(copydata.data[copydata.at++]);
+
+ if(chunkUpTo == copydata.upTo && copydata.lastLength > 0) // write last, possibly partial row
+ pgStream.Send(copydata.data[copydata.at++], 0, copydata.lastLength);
+ }
+ }
+ } catch(IOException ioe) { // presumably from provider: request server to fail copy
+ pgStream.SendChar('f');
+ buf = ioe.toString().getBytes();
+ pgStream.SendInteger4(4+buf.length+1);
+ if(buf.length > 0)
+ pgStream.Send(buf);
+ pgStream.SendChar('\0');
+ pgStream.flush();
+ }
+ if(copydata == null) {
+ pgStream.SendChar('c'); // CopyDone
+ pgStream.SendInteger4(4); // No message in specification
+ pgStream.flush();
+ }
+ break;
+
+ case 'H': // CopyOutResponse
+
+ header = receiveCopyHeader();
+
+ if (logger.logDebug())
+ logger.debug(" <=BE CopyOutResponse " + (header.length-1) + " columns of " + header[0]);
+ if(consumer == null)
+ throw new PSQLException(GT.tr("COPY TO STDOUT missing CopydataConsumer"), PSQLState.PROTOCOL_VIOLATION);
+
+ consumer.gotHeader(header);
+ copydata = consumer.getContainer();
+ break;
+
+ case 'd': // CopyData
+
+ returnValue++; // count total rows
+
+ if (logger.logDebug())
+ logger.debug(" <=BE CopyData row " + returnValue);
+
+ l_len = pgStream.ReceiveIntegerR(4) - 4;
+ if(copydata != null) {
+ if(copydata.reuseArrays && copydata.data[copydata.at] != null &&
+ (copydata.data[copydata.at].length == l_len || (copydata.upTo == 1 && copydata.data[copydata.at].length >= l_len) )
+ ) {
+ copydata.lastLength = l_len; // let consumer know length of read data
+ } else {
+ copydata.data[copydata.at] = new byte[l_len];
+ }
+ pgStream.Receive(copydata.data[copydata.at], 0, l_len);
+
+ if( ++copydata.at == copydata.upTo) { // buffer full, flush to consumer
+ try {
+ consumer.consume(copydata);
+ copydata = consumer.getContainer();
+ } catch(Exception e) {
+ error = new PSQLException(GT.tr("Copy consumer failed: {0}", e.toString()), PSQLState.DATA_ERROR, e);
+ copydata = null;
+ }
+ }
+ } else {
+ pgStream.Receive(l_len); // not consuming: discard rest
+ }
+ break;
+
+ case 'c': // CopyDone (expected after all copydata received)
+
+ l_len = pgStream.ReceiveIntegerR(4) - 4;
+ if(l_len > 0)
+ pgStream.Receive(l_len); // not in specification; should never appear
+ if (logger.logDebug())
+ logger.debug(" <=BE CopyDone");
+ if(copydata != null) {
+ try {
+ consumer.consume(copydata);
+ } catch(Exception e) {
+ error = new PSQLException(GT.tr("Copy consumer failed on last call"), PSQLState.DATA_ERROR, e);
+ }
+ }
+ break;
+
+ case 'Z': // ReadyForQuery: After FE:CopyDone => BE:CommandComplete
+
+ receiveRFQ();
+ endReceiving = true;
+ break;
+
+ default:
+ throw new IOException(GT.tr("Unexpected packet type during copy: {0}", Integer.toString(c)));
+ }
+
+ // Collect errors into a neat chain for completeness
+ if(error != null) {
+ if(errorChain == null)
+ errorChain = error;
+ else {
+ error.setNextException(errorChain);
+ errorChain = error;
+ }
+ error = null;
+ }
+ }
+
+ if(errorChain != null)
+ throw errorChain;
+
+ return returnValue;
+ }
+
+ /**
+ * Used upon Copy in/out start messages from server to resolve copy format header
+ * @param handler the ResultHandler handling current query
+ * @return int[] with overall format number followed by format of each field
+ * @throws IOException if reading from stream fails
+ */
+ private int[] receiveCopyHeader() throws IOException, PSQLException {
+ int l_len = pgStream.ReceiveIntegerR(4) - (4+1+2);
+
+ if(l_len<0) // assert sanity
+ throw new PSQLException(GT.tr("Too short copy header"), PSQLState.PROTOCOL_VIOLATION);
+
+ int copyFormat = pgStream.ReceiveIntegerR(1);
+ int numColumns = pgStream.ReceiveIntegerR(2);
+
+ if(l_len!=numColumns*2) // assert consistency
+ throw new PSQLException(GT.tr("Inconsistent copy header length: {0}", Integer.toString(l_len) + " != " + Integer.toString(numColumns*2)), PSQLState.PROTOCOL_VIOLATION);
+
+ int[] result = new int[numColumns+1];
+ result[0] = copyFormat;
+ for(int i=1; i<=numColumns; i++)
+ result[i] = pgStream.ReceiveIntegerR(2);
+
+ return result;
+ }
+
/*
* Send a query to the backend.
*/
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/jdbc2/AbstractJdbc2Connection.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/jdbc2/AbstractJdbc2Connection.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/jdbc2/AbstractJdbc2Connection.java 2007-04-16 21:31:44.000000000 +0300
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/jdbc2/AbstractJdbc2Connection.java 2007-06-14 19:38:30.000000000 +0300
***************
*** 23,28 ****
--- 23,29 ----
import org.postgresql.util.PGobject;
import org.postgresql.util.PSQLException;
import org.postgresql.util.GT;
+ import org.postgresql.copy.CopyManager;
/**
* This class defines methods of the jdbc2 specification.
***************
*** 1090,1093 ****
--- 1091,1102 ----
{
return bindStringAsVarchar;
}
+
+ private CopyManager copyManager = null;
+ public CopyManager getCopyAPI()
+ {
+ if (copyManager == null)
+ copyManager = new CopyManager(this);
+ return copyManager;
+ }
}
diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/test/copy/CopyTest.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/test/copy/CopyTest.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/test/copy/CopyTest.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/test/copy/CopyTest.java 2007-06-15 08:32:20.000000000 +0300
***************
*** 0 ****
--- 1,133 ----
+ /**
+ *
+ */
+ package org.postgresql.test.copy;
+
+ import java.sql.Connection;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.sql.Statement;
+
+ import junit.framework.TestCase;
+
+ import org.postgresql.PGConnection;
+ import org.postgresql.copy.CopyManager;
+ import org.postgresql.test.TestUtil;
+
+ /**
+ * @author kato@iki.fi
+ *
+ */
+ public class CopyTest extends TestCase {
+
+ private Connection con;
+ private Statement stmt;
+ private CopyManager copyAPI;
+ private byte[][] data = {
+ "First Row\t1\t1.10\n".getBytes(), // 0 required to match DB output for numeric(5,2)
+ "Second Row\t2\t2.20\n".getBytes() // 0 required to match DB output for numeric(5,2)
+ };
+
+ public CopyTest(String name) {
+ super(name);
+ }
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ con = TestUtil.openDB();
+ stmt = con.createStatement();
+
+ // Drop the test table if it already exists for some
+ // reason. It is not an error if it doesn't exist.
+ try {
+ stmt.executeUpdate("DROP TABLE copytest");
+ } catch (SQLException e) {
+ // Intentionally ignore. We cannot distinguish
+ // "table does not exist" from other errors, since
+ // PostgreSQL doesn't support error codes yet.
+ }
+
+ stmt.executeUpdate(
+ "CREATE TABLE copytest(stringvalue text, intvalue int, numvalue numeric(5,2))");
+
+ copyAPI = ((PGConnection)con).getCopyAPI();
+ }
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ con.setAutoCommit(true);
+ if (stmt != null) {
+ stmt.executeUpdate("DROP TABLE copytest");
+ stmt.close();
+ }
+ if (con != null) {
+ TestUtil.closeDB(con);
+ }
+ }
+
+ private int getCount() throws SQLException {
+ if(stmt==null)
+ stmt = con.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT count(*) FROM copytest");
+ rs.next();
+ int result = rs.getInt(1);
+ rs.close();
+ return result;
+ }
+
+ public void testCopyIntoDBfail() {
+ String sql = "COPY copytest FROM STDIN";
+ String at = "start";
+ int rowCount = -1;
+ try {
+ copyAPI.copyIntoDB(sql, data); // FIXME: use own provider to fail
+ at = "using connection after writing copy";
+ rowCount = getCount();
+ } catch(Exception e) {
+ fail("copyIntoDB at " + at + ": " + e.toString());
+ }
+ // enable after above fix -- assertEquals(0, rowCount);
+ }
+
+ public void testCopyIntoDBsucceed() {
+ String sql = "COPY copytest FROM STDIN";
+ String at = "start";
+ int rowCount = -1;
+ try {
+ copyAPI.copyIntoDB(sql, data);
+ at = "using connection after writing copy";
+ rowCount = getCount();
+ } catch(Exception e) {
+ fail("copyIntoDB at " + at + ": " + e.toString());
+ }
+ assertEquals(data.length, rowCount);
+ }
+
+ public void testCopyOutOfDB() {
+ String sql = "COPY copytest TO STDOUT";
+ byte[][] copydata = new byte[data.length][];
+ String at = "init";
+
+ try {
+ if(getCount()==0)
+ testCopyIntoDBsucceed(); // ensure we have some data.
+ at = "start";
+ copyAPI.copyFromDB(sql, copydata);
+ at = "using connection after reading copy";
+ getCount();
+ // deep comparison of data written and read
+ at = "comparison of written and read data";
+ for(int i=0; i