summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-15 12:52:46 +0000
committerRobert Greig <rgreig@apache.org>2007-01-15 12:52:46 +0000
commitfe6d706ad7085de5dd0526ec0c1d7983ed61aaf3 (patch)
tree8ef787a82ae27e5de991375fe0603c594f20795a /java
parentd53fcdfd308d40d391fce4d561b212a3c2a1036c (diff)
downloadqpid-python-fe6d706ad7085de5dd0526ec0c1d7983ed61aaf3.tar.gz
QPID-294 : Patch supplied by Rob Godfrey - Fix race condition on client connection
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@496302 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java18
3 files changed, 40 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index a0aa1d544b..0d2877c926 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -47,6 +47,7 @@ import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+
public class AMQProtocolHandler extends IoHandlerAdapter
{
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
@@ -69,7 +70,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
private volatile AMQProtocolSession _protocolSession;
-// private AMQStateManager _stateManager = new AMQStateManager();
+ private AMQStateManager _stateManager = new AMQStateManager();
private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
@@ -142,7 +143,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
- _protocolSession = new AMQProtocolSession(this, session, _connection);
+ _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
@@ -278,7 +279,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void propagateExceptionToWaiters(Exception e)
{
- _protocolSession.getStateManager().error(e);
+ getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -317,7 +318,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
try
{
- boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt);
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
@@ -329,12 +330,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
}
}
catch (AMQException e)
{
- _protocolSession.getStateManager().error(e);
+ getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
@@ -395,7 +396,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void attainState(AMQState s) throws AMQException
{
- _protocolSession.getStateManager().attainState(s);
+ getStateManager().attainState(s);
}
/**
@@ -487,7 +488,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void closeConnection() throws AMQException
{
- _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING);
+ getStateManager().changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -557,11 +558,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQStateManager getStateManager()
{
- return _protocolSession.getStateManager();
+ return _stateManager;
}
public void setStateManager(AMQStateManager stateManager)
{
+ _stateManager = stateManager;
_protocolSession.setStateManager(stateManager);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 1af7ca55e6..8523e1cfce 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -113,6 +113,18 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_stateManager = new AMQStateManager(this);
}
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = protocolSession;
+ // properties of the connection are made available to the event handlers
+ _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+
+ _stateManager = stateManager;
+ _stateManager.setProtocolSession(this);
+
+ }
+
public void init()
{
// start the process of setting up the connection. This is the first place that
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index ea5cfce2ea..1d871f7bc8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -41,7 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- private final AMQProtocolSession _protocolSession;
+ private AMQProtocolSession _protocolSession;
/**
* The current state
@@ -56,6 +56,12 @@ public class AMQStateManager implements AMQMethodListener
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+ public AMQStateManager()
+ {
+ this(null);
+ }
+
+
public AMQStateManager(AMQProtocolSession protocolSession)
{
this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
@@ -230,4 +236,14 @@ public class AMQStateManager implements AMQMethodListener
}
// at this point the state will have changed.
}
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void setProtocolSession(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
}