diff options
6 files changed, 35 insertions, 7 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 1184ba1d19..6631bc3559 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -117,6 +117,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); + private AtomicBoolean _stopped = new AtomicBoolean(false); protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -1110,7 +1111,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void stop() { - ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + if (!_stopped.getAndSet(true)) + { + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + } + else + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Queue " + getName() + " already stopped"); + } + } } public void deliverAsync() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 71f6c8ed44..9229863c35 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -301,6 +301,10 @@ public class VirtualHost implements Accessable public void close() throws Exception { + + //Stop Connections + _connectionRegistry.close(); + //Stop the Queues processing if (_queueRegistry != null) { @@ -316,9 +320,6 @@ public class VirtualHost implements Accessable _houseKeepingTimer.cancel(); } - //Stop Connections - _connectionRegistry.close(); - //Close MessageStore if (_messageStore != null) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index a592c9353a..aa25e207a9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -60,10 +60,15 @@ public class DestWildExchangeTest extends TestCase _protocolSession = new InternalTestProtocolSession(); } + public void tearDown() + { + ApplicationRegistry.remove(1); + } + public void testNoRoute() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null); _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 0524494bfd..dec4de4cc6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -102,6 +102,7 @@ public class MessageStoreTest extends TestCase try { _virtualHost = new VirtualHost(virtualHostName, configuration, null); + ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost); } catch (Exception e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 67eb180dbf..28eab73995 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -56,7 +56,8 @@ public class InternalBrokerBaseCase extends TestCase super.setUp(); _registry = new TestApplicationRegistry(); ApplicationRegistry.initialise(_registry); - _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); + _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = _virtualHost.getMessageStore(); QUEUE_NAME = new AMQShortString("test"); @@ -80,7 +81,7 @@ public class InternalBrokerBaseCase extends TestCase public void tearDown() throws Exception { - ApplicationRegistry.removeAll(); + ApplicationRegistry.remove(1); super.tearDown(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index ce9c6ae4cb..20a30b3ed3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -160,4 +160,13 @@ public class ReferenceCountingExecutorService { return _pool; } + + /** + * Return the ReferenceCount to this ExecutorService + * @return reference count + */ + public int getReferenceCount() + { + return _refCount; + } } |