diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 48 |
1 files changed, 28 insertions, 20 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a847658846..335990b8af 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -462,28 +462,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // that can be called from a different thread of control from the one controlling the session synchronized(_connection.getFailoverMutex()) { - _closed.set(true); - - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try + //Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { - _connection.getProtocolHandler().closeSession(this); - final AMQFrame frame = ChannelCloseBody.createAMQFrame( - getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0); - _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully + // we pass null since this is not an error case + closeProducersAndConsumers(null); - } - catch (AMQException e) - { - throw new JMSException("Error closing session: " + e); - } - finally - { - _connection.deregisterSession(_channelId); + try + { + _connection.getProtocolHandler().closeSession(this); + final AMQFrame frame = ChannelCloseBody.createAMQFrame( + getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0); + _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully + + } + catch (AMQException e) + { + throw new JMSException("Error closing session: " + e); + } + finally + { + _connection.deregisterSession(_channelId); + } } } } @@ -723,6 +725,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a QueueReceiver + * * @param destination * @return QueueReceiver - a wrapper around our MessageConsumer * @throws JMSException @@ -736,6 +739,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a QueueReceiver using a message selector + * * @param destination * @param messageSelector * @return QueueReceiver - a wrapper around our MessageConsumer @@ -957,6 +961,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a QueueReceiver wrapping a MessageConsumer + * * @param queue * @return QueueReceiver * @throws JMSException @@ -970,6 +975,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a QueueReceiver wrapping a MessageConsumer using a message selector + * * @param queue * @param messageSelector * @return QueueReceiver @@ -1012,6 +1018,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a non-durable subscriber + * * @param topic * @return TopicSubscriber - a wrapper round our MessageConsumer * @throws JMSException @@ -1024,6 +1031,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Creates a non-durable subscriber with a message selector + * * @param topic * @param messageSelector * @param noLocal |