summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java43
1 files changed, 29 insertions, 14 deletions
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 0bfafa92b4..bb5beb6eaf 100644
--- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -23,20 +23,24 @@ import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
-
-import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
-
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.ssl.BogusSSLContextFactory;
@@ -190,7 +194,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.info("sessionClose() not allowed to failover");
_connection.exceptionReceived(
new AMQDisconnectedException("Server closed connection and reconnection " +
- "not permitted."));
+ "not permitted."));
}
else
{
@@ -242,7 +246,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
if (cause instanceof AMQConnectionClosedException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attemp failover
+ // this will attemp failover
sessionClosed(session);
}
@@ -397,14 +401,25 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param listener the blocking listener. Note the calling thread will block.
*/
private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener)
- throws AMQException
+ BlockingMethodFrameListener listener)
+ throws AMQException
{
- _frameListeners.add(listener);
- _protocolSession.writeFrame(frame);
- return listener.blockForFrame();
- // When control resumes before this line, a reply will have been received
- // that matches the criteria defined in the blocking listener
+ try
+ {
+ _frameListeners.add(listener);
+ _protocolSession.writeFrame(frame);
+
+ AMQMethodEvent e = listener.blockForFrame();
+ return e;
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
+ }
+ finally
+ {
+ // If we don't remove the listener then no-one will
+ _frameListeners.remove(listener);
+ }
+
}
/**