diff -rcN postgresql-jdbc-8.2-505.src.orig/build.xml postgresql-jdbc-8.2-505.src.copy/build.xml *** postgresql-jdbc-8.2-505.src.orig/build.xml 2006-11-29 06:00:15.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/build.xml 2007-06-15 08:32:51.000000000 +0300 *************** *** 84,89 **** --- 84,96 ---- + + + + + + + *************** *** 116,121 **** --- 123,129 ---- + *************** *** 393,398 **** --- 401,407 ---- + *************** *** 428,433 **** --- 437,443 ---- + diff -rcN postgresql-jdbc-8.2-505.src.orig/doc/pgjdbc.xml postgresql-jdbc-8.2-505.src.copy/doc/pgjdbc.xml *** postgresql-jdbc-8.2-505.src.orig/doc/pgjdbc.xml 2007-02-19 08:04:48.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/doc/pgjdbc.xml 2007-06-15 14:25:52.000000000 +0300 *************** *** 2480,2485 **** --- 2480,2529 ---- + + + Copy + + Bulk data transfer with INSERT or SELECT + can be quite slow for large amounts of data. + PostgreSQL provides a special SQL + statement COPY for this purpose. + It can be used either to exchange data between a file and a table, which requires + superuser privileges to access files on database server, or between the client and + a table, which requires special copy subprotocol support from the database driver. + Latter is available as an extension of this JDBC driver for + protocol version 3 (PostgreSQL 7.4 and newer). + + + + Copy API is available from PGConnection.getCopyAPI(). + It provides a variety of one-shot import and export methods. + Streams and byte arrays are readily supported. + Other (say, object field based) exchangers can be based on available interfaces. + + + + // get hold of the extension interface + import org.postgresql.copy.CopyManager; + CopyManager copyAPI = ((org.postgresql.PGConnection)conn).getCopyAPI(); + + // handy constants for default format + static final byte[] rowSep = "\n".getBytes(); + static final byte[] fieldSep = "\".getBytes(); + static final byte[] nullSeq = "\\N".getBytes(); + + // write some data to a fictional (varchar,int) table + byte[][] mydata = { + "First row\t123\n".getBytes(), + "Second".getBytes(), fieldSep, nullSeq, rowSep + }; + copyAPI.copyIntoDB( "COPY mytable FROM STDIN", mydata ); + + // show contents of a fictional table + copyAPI.copyFromDB("COPY mytable TO STDOUT", System.out); + + + diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/PGConnection.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/PGConnection.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/PGConnection.java 2005-04-20 03:10:58.000000000 +0300 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/PGConnection.java 2007-06-14 19:38:29.000000000 +0300 *************** *** 11,16 **** --- 11,17 ---- import java.sql.*; import org.postgresql.core.Encoding; + import org.postgresql.copy.CopyManager; import org.postgresql.fastpath.Fastpath; import org.postgresql.largeobject.LargeObjectManager; *************** *** 30,35 **** --- 31,42 ---- public PGNotification[] getNotifications() throws SQLException; /** + * This returns the COPY API for the current connection. + * @since 8.4 + */ + public CopyManager getCopyAPI() throws SQLException; + + /** * This returns the LargeObject API for the current connection. * @since 7.3 */ diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyManager.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyManager.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyManager.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyManager.java 2007-06-15 14:33:28.000000000 +0300 *************** *** 0 **** --- 1,185 ---- + /** + * Bulk data copy for PostgreSQL + */ + package org.postgresql.copy; + + import java.io.IOException; + import java.io.OutputStream; + import java.io.InputStream; + import java.sql.SQLException; + + import org.postgresql.core.*; + import org.postgresql.core.v3.QueryExecutorImpl; // COPY is only implemented for protocol v3 + + /** + * API for PostgreSQL protocol version 3 COPY block data transfer + * @author Kalle Hallivuori + * @since 8.2 + */ + public class CopyManager { + public static final int DEFAULT_BUFFER_SIZE = 10000; + private final QueryExecutorImpl queryExecutor; + + public CopyManager(BaseConnection connection) { + this.queryExecutor = (QueryExecutorImpl)connection.getQueryExecutor(); + } + + /** + * Feeds contents of given stream into a database table according to given COPY FROM STDIN statement + * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one + * @param ins A "file" of comma separated values or whatever format you chose + * @return for PostgreSQL 8.2 and later, number of updated rows; for earlier versions, number of buffers written + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyIntoDB(String sql, InputStream ins) throws SQLException, IOException { + return copyIntoDB(sql, ins, DEFAULT_BUFFER_SIZE); + } + + /** + * Feeds contents of given stream into a database table according to given COPY FROM STDIN statement + * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one + * @param ins A "file" of comma separated values or whatever format you chose + * @param bufferSize specify size of buffer used between ins and database + * @return for PostgreSQL 8.2 and later, number of updated rows; for earlier versions, number of buffers written + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyIntoDB(String sql, InputStream ins, int bufferSize) throws SQLException, IOException { + return copyIntoDB(sql, new CopydataFromInputStream(ins, bufferSize)); + } + + /** + * Feeds contents of given byte arrays into a database table according to given COPY FROM STDIN statement + * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one + * @param buf container for all bytes to feed for the COPY + * @return for PostgreSQL 8.2 and later, number of updated rows; for earlier versions, number of byte arrays written + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyIntoDB(String sql, byte[][] buf) throws SQLException, IOException { + return copyIntoDB(sql, buf, 0, buf.length, 0, DEFAULT_BUFFER_SIZE); + } + + /** + * Feeds contents of given byte arrays into a database table according to given COPY FROM STDIN statement. + * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one + * @param buf arrays of bytes to write one after another, grouping does not matter + * @param from index of first array to start writing from + * @param upTo write up to this array; see lastLength + * @param lastLength if this is greater than zero, that many bytes from buf[upTo] will be written at end + * @param chunkSize preferred minimum size for data chunks to write to server + * @return prior to PostgreSQL 8.2: number of arrays written; beginning from 8.2: number of rows updated + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyIntoDB(String sql, byte[][] buf, int from, int upTo, int lastLength, int chunkSize) throws SQLException, IOException { + return copyIntoDB(sql, new CopydataFromByteaa(buf, from, upTo, lastLength, chunkSize)); + } + + /** + * Write contents of a database table into given stream according to given COPY TO STDOUT statement. + * @param sql COPY TO STDOUT statement; you can use buildCopyQuery to construct one + * @param outs results from database are written to this OutputStream + * @return 1 (number of last buffers written, using only one buffer) + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyFromDB(String sql, OutputStream outs) throws SQLException, IOException { + return copyFromDB(sql, outs, DEFAULT_BUFFER_SIZE); + } + + /** + * Write contents of a database table into given stream according to given COPY TO STDOUT statement. + * @param sql COPY TO STDOUT statement; you can use buildCopyQuery to construct one + * @param outs results from database are written to this OutputStream + * @param bufferSize specify size of buffer used between database and outs + * @return prior to PostgreSQL 8.2: number of arrays written; beginning from 8.2: number of rows updated + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyFromDB(String sql, OutputStream outs, int bufferSize) throws SQLException, IOException { + return copyFromDB(sql, new CopydataToOutputStream(outs, bufferSize)); + } + + /** + * Read contents of a database table into given buffer according to given COPY TO STDOUT statement. + * Rest of input is skipped. A good idea for size of buffer is returned by SELECT COUNT(*) from same table. + * @param sql COPY TO STDOUT statement; you can use buildCopyQuery to construct one + * @param buf Buffer to hold the read data. + * @return index of next free slot in buf + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyFromDB(String sql, byte[][] buf) throws SQLException, IOException { + return copyFromDB(sql, buf, 0, buf.length); + } + + /** + * Read contents of a database table into given buffer according to given COPY TO STDOUT statement. + * @param sql COPY TO STDOUT statement; you can use buildCopyQuery to construct one + * @param buf Buffer to hold the read data. + * @param from index to begin filling buf at (normally 0) + * @param upTo index where to stop filling buf at (normally buf.length) + * @return index of next free slot in buf + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyFromDB(String sql, byte[][] buf, int from, int upTo) throws SQLException, IOException { + return copyFromDB(sql, new CopydataToByteaa(buf, from, upTo)); + } + + /** + * Feeds data from given provider into a database table according to given COPY FROM STDIN statement + * @param sql COPY FROM STDIN statement; you can use buildCopyQuery to construct one + * @param provider party providing the data to write + * @return 1 (number of last buffers read, using only one buffer) + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyIntoDB(String sql, CopydataProvider provider) throws SQLException, IOException { + return queryExecutor.copy(sql, provider, null); + } + + /** + * Read contents of a database table for given consumer according to given COPY TO STDOUT statement. + * @param sql COPY TO STDOUT statement + * @param consumer party responsible for passing on the received data + * @return 1 (number of last buffers read, using only one buffer) + * @throws SQLException if the copy operation fails + * @throws IOException if the database connection breaks + */ + public int copyFromDB(String sql, CopydataConsumer consumer) throws SQLException, IOException { + return queryExecutor.copy(sql, null, consumer); + } + + /** + * Construct a COPY statement + * @param tableName name of the table used + * @param fieldNames array of names of fields to access, in the order specified by the array + * @param writeToDB true: COPY FROM STDIN; false: COPY TO STDOUT + * @param with additional specifications such as separators, see COPY documentation + * @return specified SQL query + */ + public String buildCopyQuery(String tableName, String[] fieldNames, boolean writeToDB, String[] with) { + StringBuilder sql = new StringBuilder("COPY ").append(tableName); + if(fieldNames != null) { + StringBuilder fields = new StringBuilder(); + for(int i=0; i 0 ) + fields.append(","); + fields.append(fieldNames[i]); + } + if(fields.length() > 0) + sql.append("(").append(fields).append(")"); + } + sql.append(writeToDB ? " FROM STDIN" : " TO STDOUT"); + if(with != null) { + for(int i=0; i + */ + public class Copydata { + public byte[][] data; + public int from, at, upTo, lastLength; // offsets are not per array to save some memory + public boolean reuseArrays; + + Copydata() {} + + Copydata(int aaSize, int arraySize, boolean reuse) { + if(aaSize > 0) + data = (arraySize > 0) ? new byte[aaSize][arraySize] : new byte[aaSize][]; + upTo = aaSize; + reuseArrays = reuse; + } + } diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataConsumer.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataConsumer.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataConsumer.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataConsumer.java 2007-06-15 14:33:28.000000000 +0300 *************** *** 0 **** --- 1,36 ---- + /** + * Bulk data copy for PostgreSQL + */ + package org.postgresql.copy; + + /** + * CopydataConsumers are fed data from PostgreSQL COPY TO STDOUT queries + * @author Kalle Hallivuori + * @since 8.2 + */ + public interface CopydataConsumer { + + /** + * Receive a format header at start of COPY TO STDOUT + * Format code is 0 for text (CSV), 1 for binary (PostgreSQL internal). + * If you don't get a straight row of zeros, just cry out loud. + * Number of columns is header.length-1 + * @param header element 0 is the overall format, thereafter formats for each column follow + * @see http://developer.postgresql.org/pgdocs/postgres/protocol-message-formats.html#CopyInResponse + */ + void gotHeader(int[] header); + + /** + * Called when a buffer for new data from query is needed. Return null to stop input. + * @return buffer to fill, that will eventually be handed back to consume() + */ + Copydata getContainer(); + + /** + * Called to collect data during receiving results from database. + * Either the container is full (copydata.at==copydata.upTo) or all results have been received. + * @param copydata the container originated from getContainer() filled with data + * @throws Exception to stop collecting results + */ + void consume(Copydata copydata) throws Exception; + } diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataFromByteaa.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataFromByteaa.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataFromByteaa.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataFromByteaa.java 2007-06-15 14:33:28.000000000 +0300 *************** *** 0 **** --- 1,36 ---- + /** + * Bulk data copy for PostgreSQL + */ + package org.postgresql.copy; + + import java.io.IOException; + + /** + * Abuses Copydata to provide overly flat interface for passing data to COPY + * @author Kalle Hallivuori + */ + public class CopydataFromByteaa extends Copydata implements CopydataProvider { + + int prefChunkSize; + + public CopydataFromByteaa(byte[][] data, int from, int upTo, int lastLength, int chunkSize) { + this.data = data; + this.from = from; + this.upTo = upTo; + this.lastLength = lastLength; + prefChunkSize = chunkSize; + } + + public int getPreferredChunkSize() { + return prefChunkSize; + } + + public void gotHeader(int[] header) { + // SEP + } + + public Copydata provide() throws IOException { + return (this.at < this.upTo) ? this : null; + } + + } diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataFromInputStream.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataFromInputStream.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataFromInputStream.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataFromInputStream.java 2007-06-15 14:33:28.000000000 +0300 *************** *** 0 **** --- 1,41 ---- + /** + * Bulk data copy for PostgreSQL + */ + package org.postgresql.copy; + + import java.io.IOException; + import java.io.InputStream; + + /** + * Passes to COPY FROM STDIN a buffer of given size filled from given stream + * until end of stream. + * @author Kalle Hallivuori + * + */ + public class CopydataFromInputStream implements CopydataProvider { + + InputStream ins; + Copydata buf; + + CopydataFromInputStream(InputStream ins, int bufferSize) { + this.ins = ins; + buf = new Copydata(1,bufferSize,true); + } + + public int getPreferredChunkSize() { + return buf.data[0].length; + } + + public void gotHeader(int[] header) { + // SEP + } + + public Copydata provide() throws IOException { + int i = ins.read(buf.data[0], 0, buf.data[0].length); + if( i > 0 ) { + buf.lastLength = i; + return buf; + } + return null; + } + } diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataProvider.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataProvider.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataProvider.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataProvider.java 2007-06-15 14:33:28.000000000 +0300 *************** *** 0 **** --- 1,36 ---- + /** + * Bulk data copy for PostgreSQL + */ + package org.postgresql.copy; + + import java.io.IOException; + + /** + * CopydataProviders offer data for PostgreSQL COPY FROM STDIN bulk data imports + * @author Kalle Hallivuori + * @since 8.2 + */ + public interface CopydataProvider { + + /** + * Receive a format header at start of COPY FROM STDIN + * Format code is 0 for text (CSV), 1 for binary (PostgreSQL internal). + * If you don't get a straight row of zeros, just cry out loud. + * Number of columns is header.length-1 + * @param header element 0 is the overall format, thereafter formats for each column follow + * @see http://developer.postgresql.org/pgdocs/postgres/protocol-message-formats.html#CopyInResponse + */ + void gotHeader(int[] header); + + /** + * @return Preferred copydata chunk size for database connection. Try values from one to tens of thousands. + */ + int getPreferredChunkSize(); + + /** + * Called during bulk data import operation to get more data. + * @return data to write to database + * @throws IOException cancel copy operation + */ + Copydata provide() throws IOException; + } diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataToByteaa.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataToByteaa.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataToByteaa.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataToByteaa.java 2007-06-15 14:33:28.000000000 +0300 *************** *** 0 **** --- 1,40 ---- + /** + * Bulk data copy for PostgreSQL + */ + package org.postgresql.copy; + + /** + * @author Kalle Hallivuori + */ + public class CopydataToByteaa extends Copydata implements CopydataConsumer { + + CopydataToByteaa(byte[][] buf, int from, int upTo) { + this.data = buf; + this.from = this.at = from; + this.upTo = upTo; + this.reuseArrays = false; + } + + /* (non-Javadoc) + * @see org.postgresql.copy.CopydataConsumer#consume(org.postgresql.copy.Copydata) + */ + public void consume(Copydata copydata) throws Exception { + // we already got the data into self so do nothing + } + + /* (non-Javadoc) + * @see org.postgresql.copy.CopydataConsumer#getContainer() + */ + public Copydata getContainer() { + // by not resetting at we protect data we contain + return (at < upTo) ? this : null; + } + + /* (non-Javadoc) + * @see org.postgresql.copy.CopydataConsumer#gotHeader(int[]) + */ + public void gotHeader(int[] header) { + // SEP + } + + } diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataToOutputStream.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataToOutputStream.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopydataToOutputStream.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopydataToOutputStream.java 2007-06-15 14:33:28.000000000 +0300 *************** *** 0 **** --- 1,33 ---- + /** + * Bulk data copy for PostgreSQL + */ + package org.postgresql.copy; + + import java.io.OutputStream; + + /** + * @author Kalle Hallivuori + */ + public class CopydataToOutputStream implements CopydataConsumer { + + OutputStream outs; + Copydata buf; + + CopydataToOutputStream(OutputStream outs, int bufferSize) { + this.outs = outs; + buf = new Copydata(1, bufferSize, true); + } + + public void consume(Copydata copydata) throws Exception { + outs.write(buf.data[0], 0, buf.lastLength); + buf.at = buf.lastLength = 0; + } + + public Copydata getContainer() { + return buf; + } + + public void gotHeader(int[] header) { + // SEP + } + } diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/core/v3/QueryExecutorImpl.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/core/v3/QueryExecutorImpl.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/core/v3/QueryExecutorImpl.java 2006-12-01 10:53:45.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/core/v3/QueryExecutorImpl.java 2007-06-15 14:00:34.000000000 +0300 *************** *** 27,32 **** --- 27,33 ---- import org.postgresql.util.PSQLState; import org.postgresql.util.ServerErrorMessage; import org.postgresql.util.GT; + import org.postgresql.copy.*; /** * QueryExecutor implementation for the V3 protocol. *************** *** 630,635 **** --- 631,867 ---- return returnValue; } + // + // Copy subprotocol implementation + // + + /** + * COPY FROM STDIN / COPY TO STDOUT + * @author kato@iki.fi + * @since 8.4 + * @param sql The COPY FROM STDIN / COPY TO STDOUT statement to issue + * @param provider data provider for COPY FROM STDIN + * @param consumer data consumer for COPY TO STDOUT + * @throws SQLException on recovered communication failures + * @throws IOException passed from low level connection + */ + public synchronized int copy(String sql, CopydataProvider provider, CopydataConsumer consumer) throws SQLException, IOException { + + if(sql == null || (provider == null && consumer == null)) + throw new PSQLException( GT.tr("You forgot to specify what to copy"), PSQLState.PROTOCOL_VIOLATION ); + + Copydata copydata = null; + boolean endReceiving = false; + int returnValue = -1; + SQLException error = null, errorChain = null; + int[] header = null; + int l_len; + + // send starting query + byte buf[] = sql.getBytes(); + pgStream.SendChar('Q'); + pgStream.SendInteger4(buf.length + 4 + 1); + pgStream.Send(buf); + pgStream.SendChar(0); + pgStream.flush(); + + while(!endReceiving) { + int c = pgStream.ReceiveChar(); + switch(c) { + + case 'A': // Asynchronous Notify + + receiveAsyncNotify(); + break; + + case 'C': // Command Complete + + String status = receiveCommandStatus(); + if(status.startsWith("COPY ")) // server >= 8.2 sends number of updated rows + returnValue = Integer.parseInt(status.substring(1 + status.lastIndexOf(' '))); + break; + + case 'N': // Notice Response + + receiveNoticeResponse(); // FIXME: Save this somewhere, handle it + break; + + case 'E': // ErrorMessage (expected response to CopyFail) + + error = receiveErrorResponse(); + if (logger.logDebug()) + logger.debug(" <=BE Error in copy: " + error); + break; + + case 'G': // CopyInResponse + + header = receiveCopyHeader(); + + if (logger.logDebug()) + logger.debug(" <=BE CopyInResponse " + (header.length-1) + " columns of " + header[0]); + if(provider == null) + throw new PSQLException(GT.tr("COPY FROM STDIN missing CopydataProvider"), PSQLState.PROTOCOL_VIOLATION); + + provider.gotHeader(header); + int prefChunkSize = provider.getPreferredChunkSize(); + returnValue = 0; + + // now send our data + try { + while( (copydata=provider.provide()) != null ) { + while(copydata.at < copydata.upTo) { + int chunkUpTo = copydata.at; + l_len = 0; + + // count how many bytearrays make up the preferred (minimum) send chunk size + while(chunkUpTo < copydata.upTo && l_len <= prefChunkSize) + l_len += copydata.data[chunkUpTo++].length; + returnValue = chunkUpTo - copydata.at; // keep total "row" count in case of server < 8.2 + if(chunkUpTo == copydata.upTo && copydata.lastLength > 0) { // include last, possibly partial row + l_len += copydata.lastLength; + returnValue++; + } + + pgStream.SendChar('d'); + pgStream.SendInteger4(l_len + 4); + while(copydata.at < chunkUpTo) + pgStream.Send(copydata.data[copydata.at++]); + + if(chunkUpTo == copydata.upTo && copydata.lastLength > 0) // write last, possibly partial row + pgStream.Send(copydata.data[copydata.at++], 0, copydata.lastLength); + } + } + } catch(IOException ioe) { // presumably from provider: request server to fail copy + pgStream.SendChar('f'); + buf = ioe.toString().getBytes(); + pgStream.SendInteger4(4+buf.length+1); + if(buf.length > 0) + pgStream.Send(buf); + pgStream.SendChar('\0'); + pgStream.flush(); + } + if(copydata == null) { + pgStream.SendChar('c'); // CopyDone + pgStream.SendInteger4(4); // No message in specification + pgStream.flush(); + } + break; + + case 'H': // CopyOutResponse + + header = receiveCopyHeader(); + + if (logger.logDebug()) + logger.debug(" <=BE CopyOutResponse " + (header.length-1) + " columns of " + header[0]); + if(consumer == null) + throw new PSQLException(GT.tr("COPY TO STDOUT missing CopydataConsumer"), PSQLState.PROTOCOL_VIOLATION); + + consumer.gotHeader(header); + copydata = consumer.getContainer(); + break; + + case 'd': // CopyData + + returnValue++; // count total rows + + if (logger.logDebug()) + logger.debug(" <=BE CopyData row " + returnValue); + + l_len = pgStream.ReceiveIntegerR(4) - 4; + if(copydata != null) { + if(copydata.reuseArrays && copydata.data[copydata.at] != null && + (copydata.data[copydata.at].length == l_len || (copydata.upTo == 1 && copydata.data[copydata.at].length >= l_len) ) + ) { + copydata.lastLength = l_len; // let consumer know length of read data + } else { + copydata.data[copydata.at] = new byte[l_len]; + } + pgStream.Receive(copydata.data[copydata.at], 0, l_len); + + if( ++copydata.at == copydata.upTo) { // buffer full, flush to consumer + try { + consumer.consume(copydata); + copydata = consumer.getContainer(); + } catch(Exception e) { + error = new PSQLException(GT.tr("Copy consumer failed: {0}", e.toString()), PSQLState.DATA_ERROR, e); + copydata = null; + } + } + } else { + pgStream.Receive(l_len); // not consuming: discard rest + } + break; + + case 'c': // CopyDone (expected after all copydata received) + + l_len = pgStream.ReceiveIntegerR(4) - 4; + if(l_len > 0) + pgStream.Receive(l_len); // not in specification; should never appear + if (logger.logDebug()) + logger.debug(" <=BE CopyDone"); + if(copydata != null) { + try { + consumer.consume(copydata); + } catch(Exception e) { + error = new PSQLException(GT.tr("Copy consumer failed on last call"), PSQLState.DATA_ERROR, e); + } + } + break; + + case 'Z': // ReadyForQuery: After FE:CopyDone => BE:CommandComplete + + receiveRFQ(); + endReceiving = true; + break; + + default: + throw new IOException(GT.tr("Unexpected packet type during copy: {0}", Integer.toString(c))); + } + + // Collect errors into a neat chain for completeness + if(error != null) { + if(errorChain == null) + errorChain = error; + else { + error.setNextException(errorChain); + errorChain = error; + } + error = null; + } + } + + if(errorChain != null) + throw errorChain; + + return returnValue; + } + + /** + * Used upon Copy in/out start messages from server to resolve copy format header + * @param handler the ResultHandler handling current query + * @return int[] with overall format number followed by format of each field + * @throws IOException if reading from stream fails + */ + private int[] receiveCopyHeader() throws IOException, PSQLException { + int l_len = pgStream.ReceiveIntegerR(4) - (4+1+2); + + if(l_len<0) // assert sanity + throw new PSQLException(GT.tr("Too short copy header"), PSQLState.PROTOCOL_VIOLATION); + + int copyFormat = pgStream.ReceiveIntegerR(1); + int numColumns = pgStream.ReceiveIntegerR(2); + + if(l_len!=numColumns*2) // assert consistency + throw new PSQLException(GT.tr("Inconsistent copy header length: {0}", Integer.toString(l_len) + " != " + Integer.toString(numColumns*2)), PSQLState.PROTOCOL_VIOLATION); + + int[] result = new int[numColumns+1]; + result[0] = copyFormat; + for(int i=1; i<=numColumns; i++) + result[i] = pgStream.ReceiveIntegerR(2); + + return result; + } + /* * Send a query to the backend. */ diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/jdbc2/AbstractJdbc2Connection.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/jdbc2/AbstractJdbc2Connection.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/jdbc2/AbstractJdbc2Connection.java 2007-04-16 21:31:44.000000000 +0300 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/jdbc2/AbstractJdbc2Connection.java 2007-06-14 19:38:30.000000000 +0300 *************** *** 23,28 **** --- 23,29 ---- import org.postgresql.util.PGobject; import org.postgresql.util.PSQLException; import org.postgresql.util.GT; + import org.postgresql.copy.CopyManager; /** * This class defines methods of the jdbc2 specification. *************** *** 1090,1093 **** --- 1091,1102 ---- { return bindStringAsVarchar; } + + private CopyManager copyManager = null; + public CopyManager getCopyAPI() + { + if (copyManager == null) + copyManager = new CopyManager(this); + return copyManager; + } } diff -rcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/test/copy/CopyTest.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/test/copy/CopyTest.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/test/copy/CopyTest.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/test/copy/CopyTest.java 2007-06-15 08:32:20.000000000 +0300 *************** *** 0 **** --- 1,133 ---- + /** + * + */ + package org.postgresql.test.copy; + + import java.sql.Connection; + import java.sql.ResultSet; + import java.sql.SQLException; + import java.sql.Statement; + + import junit.framework.TestCase; + + import org.postgresql.PGConnection; + import org.postgresql.copy.CopyManager; + import org.postgresql.test.TestUtil; + + /** + * @author kato@iki.fi + * + */ + public class CopyTest extends TestCase { + + private Connection con; + private Statement stmt; + private CopyManager copyAPI; + private byte[][] data = { + "First Row\t1\t1.10\n".getBytes(), // 0 required to match DB output for numeric(5,2) + "Second Row\t2\t2.20\n".getBytes() // 0 required to match DB output for numeric(5,2) + }; + + public CopyTest(String name) { + super(name); + } + + /* (non-Javadoc) + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + con = TestUtil.openDB(); + stmt = con.createStatement(); + + // Drop the test table if it already exists for some + // reason. It is not an error if it doesn't exist. + try { + stmt.executeUpdate("DROP TABLE copytest"); + } catch (SQLException e) { + // Intentionally ignore. We cannot distinguish + // "table does not exist" from other errors, since + // PostgreSQL doesn't support error codes yet. + } + + stmt.executeUpdate( + "CREATE TABLE copytest(stringvalue text, intvalue int, numvalue numeric(5,2))"); + + copyAPI = ((PGConnection)con).getCopyAPI(); + } + + /* (non-Javadoc) + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + con.setAutoCommit(true); + if (stmt != null) { + stmt.executeUpdate("DROP TABLE copytest"); + stmt.close(); + } + if (con != null) { + TestUtil.closeDB(con); + } + } + + private int getCount() throws SQLException { + if(stmt==null) + stmt = con.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT count(*) FROM copytest"); + rs.next(); + int result = rs.getInt(1); + rs.close(); + return result; + } + + public void testCopyIntoDBfail() { + String sql = "COPY copytest FROM STDIN"; + String at = "start"; + int rowCount = -1; + try { + copyAPI.copyIntoDB(sql, data); // FIXME: use own provider to fail + at = "using connection after writing copy"; + rowCount = getCount(); + } catch(Exception e) { + fail("copyIntoDB at " + at + ": " + e.toString()); + } + // enable after above fix -- assertEquals(0, rowCount); + } + + public void testCopyIntoDBsucceed() { + String sql = "COPY copytest FROM STDIN"; + String at = "start"; + int rowCount = -1; + try { + copyAPI.copyIntoDB(sql, data); + at = "using connection after writing copy"; + rowCount = getCount(); + } catch(Exception e) { + fail("copyIntoDB at " + at + ": " + e.toString()); + } + assertEquals(data.length, rowCount); + } + + public void testCopyOutOfDB() { + String sql = "COPY copytest TO STDOUT"; + byte[][] copydata = new byte[data.length][]; + String at = "init"; + + try { + if(getCount()==0) + testCopyIntoDBsucceed(); // ensure we have some data. + at = "start"; + copyAPI.copyFromDB(sql, copydata); + at = "using connection after reading copy"; + getCount(); + // deep comparison of data written and read + at = "comparison of written and read data"; + for(int i=0; i