diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-15 12:52:46 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-15 12:52:46 +0000 |
commit | fe6d706ad7085de5dd0526ec0c1d7983ed61aaf3 (patch) | |
tree | 8ef787a82ae27e5de991375fe0603c594f20795a /java | |
parent | d53fcdfd308d40d391fce4d561b212a3c2a1036c (diff) | |
download | qpid-python-fe6d706ad7085de5dd0526ec0c1d7983ed61aaf3.tar.gz |
QPID-294 : Patch supplied by Rob Godfrey - Fix race condition on client connection
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@496302 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 40 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index a0aa1d544b..0d2877c926 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -47,6 +47,7 @@ import java.util.Iterator; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; + public class AMQProtocolHandler extends IoHandlerAdapter { private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); @@ -69,7 +70,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ private volatile AMQProtocolSession _protocolSession; -// private AMQStateManager _stateManager = new AMQStateManager(); + private AMQStateManager _stateManager = new AMQStateManager(); private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); @@ -142,7 +143,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); } - _protocolSession = new AMQProtocolSession(this, session, _connection); + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } @@ -278,7 +279,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToWaiters(Exception e) { - _protocolSession.getStateManager().error(e); + getStateManager().error(e); if(!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -317,7 +318,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter try { - boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt); + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); if(!_frameListeners.isEmpty()) { Iterator it = _frameListeners.iterator(); @@ -329,12 +330,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter } if (!wasAnyoneInterested) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); } } catch (AMQException e) { - _protocolSession.getStateManager().error(e); + getStateManager().error(e); if(!_frameListeners.isEmpty()) { Iterator it = _frameListeners.iterator(); @@ -395,7 +396,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void attainState(AMQState s) throws AMQException { - _protocolSession.getStateManager().attainState(s); + getStateManager().attainState(s); } /** @@ -487,7 +488,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void closeConnection() throws AMQException { - _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING); + getStateManager().changeState(AMQState.CONNECTION_CLOSING); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -557,11 +558,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQStateManager getStateManager() { - return _protocolSession.getStateManager(); + return _stateManager; } public void setStateManager(AMQStateManager stateManager) { + _stateManager = stateManager; _protocolSession.setStateManager(stateManager); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 1af7ca55e6..8523e1cfce 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -113,6 +113,18 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _stateManager = new AMQStateManager(this); } + public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager) + { + _protocolHandler = protocolHandler; + _minaProtocolSession = protocolSession; + // properties of the connection are made available to the event handlers + _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); + + _stateManager = stateManager; + _stateManager.setProtocolSession(this); + + } + public void init() { // start the process of setting up the connection. This is the first place that diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index ea5cfce2ea..1d871f7bc8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -41,7 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - private final AMQProtocolSession _protocolSession; + private AMQProtocolSession _protocolSession; /** * The current state @@ -56,6 +56,12 @@ public class AMQStateManager implements AMQMethodListener private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); + public AMQStateManager() + { + this(null); + } + + public AMQStateManager(AMQProtocolSession protocolSession) { this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession); @@ -230,4 +236,14 @@ public class AMQStateManager implements AMQMethodListener } // at this point the state will have changed. } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void setProtocolSession(AMQProtocolSession session) + { + _protocolSession = session; + } } |