Re: Deadlock detection
| От | Oliver Jowett |
|---|---|
| Тема | Re: Deadlock detection |
| Дата | |
| Msg-id | 4977C8FD.7040204@opencloud.com обсуждение исходный текст |
| Ответ на | Re: Deadlock detection (Oliver Jowett <oliver@opencloud.com>) |
| Ответы |
Re: Deadlock detection
|
| Список | pgsql-jdbc |
Oliver Jowett wrote:
> I have a bit of time spare today, I might look at putting together that
> OutputStream wrapper.
Try this. I have not tested at all - it compiles but that's as far as I
got - but it should give you an idea of what I had in mind.
-O
? org/postgresql/core/AntiDeadlockStream.java
Index: org/postgresql/core/PGStream.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/PGStream.java,v
retrieving revision 1.22
diff -u -r1.22 PGStream.java
--- org/postgresql/core/PGStream.java 8 Jan 2008 06:56:27 -0000 1.22
+++ org/postgresql/core/PGStream.java 22 Jan 2009 01:10:13 -0000
@@ -34,6 +34,7 @@
{
private final String host;
private final int port;
+ private final boolean antiDeadlock;
private final byte[] _int4buf;
private final byte[] _int2buf;
@@ -52,12 +53,14 @@
*
* @param host the hostname to connect to
* @param port the port number that the postmaster is sitting on
+ * @param antiDeadlock true to insert an anti-deadlock outputstream
* @exception IOException if an IOException occurs below it.
*/
- public PGStream(String host, int port) throws IOException
+ public PGStream(String host, int port, boolean antiDeadlock) throws IOException
{
this.host = host;
this.port = port;
+ this.antiDeadlock = antiDeadlock;
changeSocket(new Socket(host, port));
setEncoding(Encoding.getJVMEncoding("US-ASCII"));
@@ -74,6 +77,10 @@
return port;
}
+ public boolean getAntiDeadlock() {
+ return antiDeadlock;
+ }
+
public Socket getSocket() {
return connection;
}
@@ -110,6 +117,8 @@
// Buffer sizes submitted by Sverre H Huseby <sverrehu@online.no>
pg_input = new VisibleBufferedInputStream(connection.getInputStream(), 8192);
pg_output = new BufferedOutputStream(connection.getOutputStream(), 8192);
+ if (antiDeadlock)
+ pg_output = new AntiDeadlockStream(pg_output, 8192, 30000);
if (encoding != null)
setEncoding(encoding);
Index: org/postgresql/core/v2/ConnectionFactoryImpl.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v2/ConnectionFactoryImpl.java,v
retrieving revision 1.17
diff -u -r1.17 ConnectionFactoryImpl.java
--- org/postgresql/core/v2/ConnectionFactoryImpl.java 30 Sep 2008 03:42:48 -0000 1.17
+++ org/postgresql/core/v2/ConnectionFactoryImpl.java 22 Jan 2009 01:10:14 -0000
@@ -59,7 +59,7 @@
PGStream newStream = null;
try
{
- newStream = new PGStream(host, port);
+ newStream = new PGStream(host, port, Boolean.valueOf(info.getProperty("antiDeadlock")).booleanValue());
// Construct and send an ssl startup packet if requested.
if (trySSL)
@@ -147,7 +147,7 @@
// We have to reconnect to continue.
pgStream.close();
- return new PGStream(pgStream.getHost(), pgStream.getPort());
+ return new PGStream(pgStream.getHost(), pgStream.getPort(), pgStream.getAntiDeadlock());
case 'N':
if (logger.logDebug())
Index: org/postgresql/core/v2/ProtocolConnectionImpl.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v2/ProtocolConnectionImpl.java,v
retrieving revision 1.12
diff -u -r1.12 ProtocolConnectionImpl.java
--- org/postgresql/core/v2/ProtocolConnectionImpl.java 1 Apr 2008 07:19:20 -0000 1.12
+++ org/postgresql/core/v2/ProtocolConnectionImpl.java 22 Jan 2009 01:10:14 -0000
@@ -90,7 +90,7 @@
if (logger.logDebug())
logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")");
- cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort());
+ cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort(), false);
cancelStream.SendInteger4(16);
cancelStream.SendInteger2(1234);
cancelStream.SendInteger2(5678);
Index: org/postgresql/core/v3/ConnectionFactoryImpl.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v3/ConnectionFactoryImpl.java,v
retrieving revision 1.19
diff -u -r1.19 ConnectionFactoryImpl.java
--- org/postgresql/core/v3/ConnectionFactoryImpl.java 29 Nov 2008 07:40:30 -0000 1.19
+++ org/postgresql/core/v3/ConnectionFactoryImpl.java 22 Jan 2009 01:10:14 -0000
@@ -73,7 +73,7 @@
PGStream newStream = null;
try
{
- newStream = new PGStream(host, port);
+ newStream = new PGStream(host, port, Boolean.valueOf(info.getProperty("antiDeadlock")).booleanValue());
// Construct and send an ssl startup packet if requested.
if (trySSL)
@@ -178,7 +178,7 @@
// We have to reconnect to continue.
pgStream.close();
- return new PGStream(pgStream.getHost(), pgStream.getPort());
+ return new PGStream(pgStream.getHost(), pgStream.getPort(), pgStream.getAntiDeadlock());
case 'N':
if (logger.logDebug())
Index: org/postgresql/core/v3/ProtocolConnectionImpl.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v3/ProtocolConnectionImpl.java,v
retrieving revision 1.13
diff -u -r1.13 ProtocolConnectionImpl.java
--- org/postgresql/core/v3/ProtocolConnectionImpl.java 1 Apr 2008 07:19:20 -0000 1.13
+++ org/postgresql/core/v3/ProtocolConnectionImpl.java 22 Jan 2009 01:10:14 -0000
@@ -90,7 +90,7 @@
if (logger.logDebug())
logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")");
- cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort());
+ cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort(), false);
cancelStream.SendInteger4(16);
cancelStream.SendInteger2(1234);
cancelStream.SendInteger2(5678);
package org.postgresql.core;
import java.io.*;
/**
* Temporary hack to try to detect/avoid socket deadlocks caused
* by blocking on write while we have lots of pending data to read
* from the server (i.e. the server is also blocked on write).
*
* see the thread at http://archives.postgresql.org/pgsql-jdbc/2009-01/msg00045.php
*
* @author Oliver Jowett <oliver@opencloud.com>
*/
class AntiDeadlockStream extends OutputStream implements Runnable {
private static final class BufferLock {}
private final BufferLock bufferLock = new BufferLock();
private final OutputStream wrapped;
private final long flushTimeout;
private byte[] buffer;
private int bufferSize;
private byte[] swapBuffer;
private boolean closeRequest;
private boolean flushRequest;
private boolean closeComplete;
private IOException failedException;
AntiDeadlockStream(OutputStream wrapped, int initialSize, long flushTimeout) {
this.wrapped = wrapped;
this.flushTimeout = flushTimeout;
this.buffer = new byte[initialSize];
this.swapBuffer = new byte[initialSize];
new Thread(this, "AntiDeadlock thread").start();
}
public void close() throws IOException {
synchronized (bufferLock) {
closeRequest = true;
bufferLock.notifyAll();
while (!closeComplete) {
if (failedException != null)
throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));
try {
bufferLock.wait();
} catch (InterruptedException ie) {
throw new InterruptedIOException();
}
}
}
}
public void flush() throws IOException {
synchronized (bufferLock) {
long expiry = -1;
flushRequest = true;
bufferLock.notifyAll();
while (true) {
if (failedException != null)
throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));
if (closeRequest)
throw new IOException("Stream is closed");
if (bufferSize == 0)
return;
long delay;
if (expiry == -1) {
delay = flushTimeout;
expiry = System.currentTimeMillis() + delay;
} else {
delay = expiry - System.currentTimeMillis();
}
if (delay <= 0) {
System.err.println("Warning: possible socket deadlock detected (timeout=" + flushTimeout + ",
remainingbuffer=" + bufferSize);
new Throwable("Deadlock call stack").fillInStackTrace().printStackTrace(System.err);
return;
}
try {
bufferLock.wait(delay);
} catch (InterruptedException ie) {
throw new InterruptedIOException();
}
}
}
}
public void write(int b) throws IOException {
write(new byte[] { (byte)b }, 0, 1);
}
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
public void write(byte[] b, int off, int len) throws IOException {
if (b == null)
throw new NullPointerException();
if (off < 0 || len < 0 || off+len > b.length)
throw new IndexOutOfBoundsException();
synchronized (bufferLock) {
if (closeRequest)
throw new IOException("Stream is closed");
if (failedException != null)
throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));
int needs = bufferSize + len;
int newSize = buffer.length;
while (newSize < needs)
newSize *= 2;
if (newSize != buffer.length) {
byte[] newBuffer = new byte[newSize];
System.arraycopy(buffer, 0, newBuffer, 0, bufferSize);
buffer = newBuffer;
}
if (bufferSize == 0)
bufferLock.notifyAll();
System.arraycopy(b, off, buffer, bufferSize, len);
bufferSize += len;
}
}
//
// Runnable
//
public void run() {
while (true) {
boolean doFlush;
boolean doClose;
int writeLength;
synchronized (bufferLock) {
if (bufferSize == 0 && !closeRequest && !flushRequest) {
try {
bufferLock.wait();
} catch (InterruptedException ie) {
failedException = new InterruptedIOException("write thread interrupted");
bufferLock.notifyAll();
return;
}
continue;
}
byte[] oldBuffer = buffer;
buffer = swapBuffer;
swapBuffer = buffer;
writeLength = bufferSize;
doFlush = flushRequest;
doClose = closeRequest;
flushRequest = false;
bufferLock.notifyAll();
}
try {
if (writeLength > 0)
wrapped.write(swapBuffer, 0, writeLength);
if (flushRequest)
wrapped.flush();
if (closeRequest) {
wrapped.close();
synchronized (bufferLock) {
closeComplete = true;
bufferLock.notifyAll();
}
return;
}
} catch (IOException ioe) {
synchronized (bufferLock) {
failedException = ioe;
bufferLock.notifyAll();
try {
wrapped.close();
} catch (IOException ioe2) {
// Ignore it.
}
return;
}
}
}
}
}
В списке pgsql-jdbc по дате отправления: