summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java55
1 files changed, 45 insertions, 10 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 733bee2d81..3b3d660c9c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.io.Serializable;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -282,7 +283,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
-
+
protected Thread _dispatcherThread;
/** Holds the message factory factory for this session. */
@@ -625,6 +626,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
+ _closing.set(true);
synchronized (getFailoverMutex())
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
@@ -636,7 +638,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- sendClose(timeout);
+ // If the connection is open or we are in the process
+ // of closing the connection then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_connection.isClosed() || _connection.isClosing())
+ {
+ sendClose(timeout);
+ }
}
catch (AMQException e)
{
@@ -683,7 +691,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Failover failed and ain't coming back. Knife the dispatcher.
_dispatcherThread.interrupt();
}
- }
+
+ }
+
+ //if we don't have an exception then we can perform closing operations
+ _closing.set(e == null);
if (!_closed.getAndSet(true))
{
@@ -1210,9 +1222,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// this is done so that we can produce to a temporary queue before we create a consumer
result.setQueueName(result.getRoutingKey());
- createQueue(result.getAMQQueueName(), result.isAutoDelete(),
+ createQueue(result.getAMQQueueName(), result.isAutoDelete(),
result.isDurable(), result.isExclusive());
- bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
+ bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
new FieldTable(), result.getExchangeName(), result);
return result;
}
@@ -1674,11 +1686,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// if (rawSelector != null)
// ft.put("headers", rawSelector.getDataAsBytes());
// rawSelector is used by HeadersExchange and is not a JMS Selector
- if (rawSelector != null)
+ if (rawSelector != null)
{
ft.addAll(rawSelector);
}
-
+
if (messageSelector != null)
{
ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
@@ -1918,13 +1930,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_dispatcher = new Dispatcher();
try
{
- _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
-
+ _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
+
}
catch(Exception e)
{
throw new Error("Error creating Dispatcher thread",e);
- }
+ }
_dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
_dispatcherThread.setDaemon(true);
_dispatcher.setConnectionStopped(initiallyStopped);
@@ -2971,4 +2983,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
}
+
+ /**
+ * Checks if the Session and its parent connection are closed
+ *
+ * @return <tt>true</tt> if this is closed, <tt>false</tt> otherwise.
+ */
+ @Override
+ public boolean isClosed()
+ {
+ return _closed.get() || _connection.isClosed();
+ }
+
+ /**
+ * Checks if the Session and its parent connection are capable of performing
+ * closing operations
+ *
+ * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
+ */
+ @Override
+ public boolean isClosing()
+ {
+ return _closing.get()|| _connection.isClosing();
+ }
}