summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-02-25 04:56:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-02-25 04:56:42 +0000
commitdbce7647235272887a2fb4680698c6689800fe24 (patch)
treec9f95a29baf1844b75a9e222cf588ea58a5b5e27
parent83230ae84f5a501c16fcfa8c7d0884ef6476be1f (diff)
downloadqpid-python-dbce7647235272887a2fb4680698c6689800fe24.tar.gz
QPID-810 : Moved check for closingChannels higher in stack and close channel on any AMQException being thrown from the body.handle methods.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630733 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java56
1 files changed, 33 insertions, 23 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 143ee5fa40..6f40594cb4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -109,7 +109,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private List<Integer> _closingChannelsList = new ArrayList<Integer>();
+ private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
@@ -208,9 +208,39 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_logger.debug("Frame Received: " + frame);
}
+
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
+ {
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+ }
+
+ return;
+ }
+ }
- body.handle(channelId, this);
+
+ try
+ {
+ body.handle(channelId, this);
+ }
+ catch (AMQException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
}
@@ -259,27 +289,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((evt.getMethod() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
- }
-
- return;
- }
- }
-
try
{
try
@@ -341,6 +350,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_logger.info("Closing connection due to: " + e.getMessage());
}
+ markChannelawaitingCloseOk(channelId);
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));