diff options
Diffstat (limited to 'qpid/java/client/src/main')
4 files changed, 81 insertions, 13 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 269937d0bd..5347e20e96 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -58,6 +58,7 @@ import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -889,7 +890,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (!_closed.getAndSet(true)) { - doClose(sessions, timeout); + _closing.set(true); + try{ + doClose(sessions, timeout); + }finally{ + _closing.set(false); + } } } @@ -1283,8 +1289,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // in the case of an IOException, MINA has closed the protocol session so we set _closed to true // so that any generic client code that tries to close the connection will not mess up this error // handling sequence - if (cause instanceof IOException) + if (cause instanceof IOException || cause instanceof AMQDisconnectedException) { + // If we have an IOE/AMQDisconnect there is no connection to close on. + _closing.set(false); closer = !_closed.getAndSet(true); _protocolHandler.getProtocolSession().notifyError(je); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 733bee2d81..3b3d660c9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.io.Serializable; +import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -282,7 +283,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the dispatcher thread for this session. */ protected Dispatcher _dispatcher; - + protected Thread _dispatcherThread; /** Holds the message factory factory for this session. */ @@ -625,6 +626,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { + _closing.set(true); synchronized (getFailoverMutex()) { // We must close down all producers and consumers in an orderly fashion. This is the only method @@ -636,7 +638,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - sendClose(timeout); + // If the connection is open or we are in the process + // of closing the connection then send a cance + // no point otherwise as the connection will be gone + if (!_connection.isClosed() || _connection.isClosing()) + { + sendClose(timeout); + } } catch (AMQException e) { @@ -683,7 +691,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Failover failed and ain't coming back. Knife the dispatcher. _dispatcherThread.interrupt(); } - } + + } + + //if we don't have an exception then we can perform closing operations + _closing.set(e == null); if (!_closed.getAndSet(true)) { @@ -1210,9 +1222,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // this is done so that we can produce to a temporary queue before we create a consumer result.setQueueName(result.getRoutingKey()); - createQueue(result.getAMQQueueName(), result.isAutoDelete(), + createQueue(result.getAMQQueueName(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); - bindQueue(result.getAMQQueueName(), result.getRoutingKey(), + bindQueue(result.getAMQQueueName(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(), result); return result; } @@ -1674,11 +1686,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // if (rawSelector != null) // ft.put("headers", rawSelector.getDataAsBytes()); // rawSelector is used by HeadersExchange and is not a JMS Selector - if (rawSelector != null) + if (rawSelector != null) { ft.addAll(rawSelector); } - + if (messageSelector != null) { ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); @@ -1918,13 +1930,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _dispatcher = new Dispatcher(); try { - _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); - + _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); + } catch(Exception e) { throw new Error("Error creating Dispatcher thread",e); - } + } _dispatcherThread.setName("Dispatcher-Channel-" + _channelId); _dispatcherThread.setDaemon(true); _dispatcher.setConnectionStopped(initiallyStopped); @@ -2971,4 +2983,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } } + + /** + * Checks if the Session and its parent connection are closed + * + * @return <tt>true</tt> if this is closed, <tt>false</tt> otherwise. + */ + @Override + public boolean isClosed() + { + return _closed.get() || _connection.isClosed(); + } + + /** + * Checks if the Session and its parent connection are capable of performing + * closing operations + * + * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise. + */ + @Override + public boolean isClosing() + { + return _closing.get()|| _connection.isClosing(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 76422c6297..add68c5b27 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -541,6 +541,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa if (!_closed.getAndSet(true)) { + _closing.set(true); if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -561,7 +562,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { try { - sendCancel(); + // If the session is open or we are in the process + // of closing the session then send a cance + // no point otherwise as the connection will be gone + if (!_session.isClosed() || _session.isClosing()) + { + sendCancel(); + } } catch (AMQException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java index 7e119343a1..e6771e122c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java @@ -52,6 +52,13 @@ public abstract class Closeable protected final AtomicBoolean _closed = new AtomicBoolean(false); /** + * Are we in the process of closing. We have this distinction so we can + * still signal we are in the process of closing so other objects can tell + * the difference and tidy up. + */ + protected final AtomicBoolean _closing = new AtomicBoolean(false); + + /** * Checks if this is closed, and raises a JMSException if it is. * * @throws JMSException If this is closed. @@ -75,6 +82,17 @@ public abstract class Closeable } /** + * Checks if this is closis. + * + * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise. + */ + public boolean isClosing() + { + return _closing.get(); + } + + + /** * Closes this object. * * @throws JMSException If this cannot be closed for any reason. |