diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-29 16:37:13 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-29 16:37:13 +0000 |
commit | 45b41e212c827905f711c188457ed6cdcb97aab3 (patch) | |
tree | f9b24e181c8db1ea4c22d5b872b1fef0a3a7bddd /java/client/src | |
parent | 1f21d6b6a37c98886de34fb33f74e7519d2dabe6 (diff) | |
download | qpid-python-45b41e212c827905f711c188457ed6cdcb97aab3.tar.gz |
QPID-327 : Patch supplied by Rob Godfrey - [race condition] PoolingFilter : Possible race condition when completing a Job
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501096 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
5 files changed, 30 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 477a679b90..66fa9de92c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -127,10 +127,17 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID()); - clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName()); - clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion()); - clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo()); + try + { + clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID()); + clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName()); + clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion()); + clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo()); + } + catch (Exception e) + { + e.printStackTrace(); + } // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. @@ -141,6 +148,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener new AMQShortString(selectedLocale), // locale new AMQShortString(mechanism), // mechanism saslResponse)); // response + } catch (UnsupportedEncodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index ed1b5ae6f9..9015cabf43 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -24,12 +24,14 @@ import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoServiceConfig; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -131,6 +133,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); } + + try + { + + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + threadModel.getAsynchronousReadFilter().createNewJobForSession(session); + threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); + } + catch (RuntimeException e) + { + e.printStackTrace(); + } + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index d6364f45b0..5e6244d7cc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -71,7 +71,7 @@ public class SocketTransportConnection implements ITransportConnection boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); if (readWriteThreading) { - cfg.setThreadModel(new ReadWriteThreadModel()); + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); } SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index e9e23aefdb..99a4c5f30d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -70,7 +70,7 @@ public class TransportConnection IoServiceConfig config = _acceptor.getDefaultConfig(); - config.setThreadModel(new ReadWriteThreadModel()); + config.setThreadModel(ReadWriteThreadModel.getInstance()); } public static ITransportConnection getInstance() throws AMQTransportConnectionException diff --git a/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java b/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java index de3d558f7c..f0ac0e6902 100644 --- a/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java +++ b/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java @@ -82,7 +82,7 @@ public class AcceptorTest extends TestCase sc.setSendBufferSize(32768); sc.setReceiveBufferSize(32768); - config.setThreadModel(new ReadWriteThreadModel()); + config.setThreadModel(ReadWriteThreadModel.getInstance()); acceptor.bind(new InetSocketAddress(PORT), new TestHandler()); |