summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java51
1 files changed, 31 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 118be75705..2e3e417c95 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -65,6 +65,7 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -205,9 +206,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
- protected static final boolean DECLARE_QUEUES =
+ protected final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
- protected static final boolean DECLARE_EXCHANGES =
+ protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
/** System property to enable strict AMQP compliance. */
@@ -629,6 +630,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public void close(long timeout) throws JMSException
{
+ close(timeout, true);
+ }
+
+ private void close(long timeout, boolean sendClose) throws JMSException
+ {
if (_logger.isInfoEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -654,9 +660,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// If the connection is open or we are in the process
// of closing the connection then send a cance
// no point otherwise as the connection will be gone
- if (!_connection.isClosed() || _connection.isClosing())
+ if (!_connection.isClosed() || _connection.isClosing())
{
- sendClose(timeout);
+ if (sendClose)
+ {
+ sendClose(timeout);
+ }
}
}
catch (AMQException e)
@@ -712,25 +721,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!_closed.getAndSet(true))
{
- synchronized (getFailoverMutex())
+ synchronized (_messageDeliveryLock)
{
- synchronized (_messageDeliveryLock)
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ AMQException amqe;
+ if (e instanceof AMQException)
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException("Closing session forcibly", e);
- }
-
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
}
+
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
}
}
}
@@ -1737,6 +1743,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
+ if (e instanceof AMQChannelClosedException)
+ {
+ close(-1, false);
+ }
+
JMSException ex = new JMSException("Error registering consumer: " + e);
ex.setLinkedException(e);