diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 55 |
1 files changed, 45 insertions, 10 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 733bee2d81..3b3d660c9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.io.Serializable; +import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -282,7 +283,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the dispatcher thread for this session. */ protected Dispatcher _dispatcher; - + protected Thread _dispatcherThread; /** Holds the message factory factory for this session. */ @@ -625,6 +626,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { + _closing.set(true); synchronized (getFailoverMutex()) { // We must close down all producers and consumers in an orderly fashion. This is the only method @@ -636,7 +638,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - sendClose(timeout); + // If the connection is open or we are in the process + // of closing the connection then send a cance + // no point otherwise as the connection will be gone + if (!_connection.isClosed() || _connection.isClosing()) + { + sendClose(timeout); + } } catch (AMQException e) { @@ -683,7 +691,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Failover failed and ain't coming back. Knife the dispatcher. _dispatcherThread.interrupt(); } - } + + } + + //if we don't have an exception then we can perform closing operations + _closing.set(e == null); if (!_closed.getAndSet(true)) { @@ -1210,9 +1222,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // this is done so that we can produce to a temporary queue before we create a consumer result.setQueueName(result.getRoutingKey()); - createQueue(result.getAMQQueueName(), result.isAutoDelete(), + createQueue(result.getAMQQueueName(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); - bindQueue(result.getAMQQueueName(), result.getRoutingKey(), + bindQueue(result.getAMQQueueName(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(), result); return result; } @@ -1674,11 +1686,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // if (rawSelector != null) // ft.put("headers", rawSelector.getDataAsBytes()); // rawSelector is used by HeadersExchange and is not a JMS Selector - if (rawSelector != null) + if (rawSelector != null) { ft.addAll(rawSelector); } - + if (messageSelector != null) { ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); @@ -1918,13 +1930,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _dispatcher = new Dispatcher(); try { - _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); - + _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); + } catch(Exception e) { throw new Error("Error creating Dispatcher thread",e); - } + } _dispatcherThread.setName("Dispatcher-Channel-" + _channelId); _dispatcherThread.setDaemon(true); _dispatcher.setConnectionStopped(initiallyStopped); @@ -2971,4 +2983,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } } + + /** + * Checks if the Session and its parent connection are closed + * + * @return <tt>true</tt> if this is closed, <tt>false</tt> otherwise. + */ + @Override + public boolean isClosed() + { + return _closed.get() || _connection.isClosed(); + } + + /** + * Checks if the Session and its parent connection are capable of performing + * closing operations + * + * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise. + */ + @Override + public boolean isClosing() + { + return _closing.get()|| _connection.isClosing(); + } } |