summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java9
6 files changed, 35 insertions, 7 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 1184ba1d19..6631bc3559 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -117,6 +117,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
+ private AtomicBoolean _stopped = new AtomicBoolean(false);
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -1110,7 +1111,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void stop()
{
- ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ if (!_stopped.getAndSet(true))
+ {
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ }
+ else
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Queue " + getName() + " already stopped");
+ }
+ }
}
public void deliverAsync()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 71f6c8ed44..9229863c35 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -301,6 +301,10 @@ public class VirtualHost implements Accessable
public void close() throws Exception
{
+
+ //Stop Connections
+ _connectionRegistry.close();
+
//Stop the Queues processing
if (_queueRegistry != null)
{
@@ -316,9 +320,6 @@ public class VirtualHost implements Accessable
_houseKeepingTimer.cancel();
}
- //Stop Connections
- _connectionRegistry.close();
-
//Close MessageStore
if (_messageStore != null)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index a592c9353a..aa25e207a9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -60,10 +60,15 @@ public class DestWildExchangeTest extends TestCase
_protocolSession = new InternalTestProtocolSession();
}
+ public void tearDown()
+ {
+ ApplicationRegistry.remove(1);
+ }
+
public void testNoRoute() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 0524494bfd..dec4de4cc6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -102,6 +102,7 @@ public class MessageStoreTest extends TestCase
try
{
_virtualHost = new VirtualHost(virtualHostName, configuration, null);
+ ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost);
}
catch (Exception e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 67eb180dbf..28eab73995 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -56,7 +56,8 @@ public class InternalBrokerBaseCase extends TestCase
super.setUp();
_registry = new TestApplicationRegistry();
ApplicationRegistry.initialise(_registry);
- _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
+ _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
+
_messageStore = _virtualHost.getMessageStore();
QUEUE_NAME = new AMQShortString("test");
@@ -80,7 +81,7 @@ public class InternalBrokerBaseCase extends TestCase
public void tearDown() throws Exception
{
- ApplicationRegistry.removeAll();
+ ApplicationRegistry.remove(1);
super.tearDown();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index ce9c6ae4cb..20a30b3ed3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -160,4 +160,13 @@ public class ReferenceCountingExecutorService
{
return _pool;
}
+
+ /**
+ * Return the ReferenceCount to this ExecutorService
+ * @return reference count
+ */
+ public int getReferenceCount()
+ {
+ return _refCount;
+ }
}