summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-09-02 14:30:47 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-09-02 14:30:47 +0000
commit4861d93f7ae5c084603fcd0fccc73054620a7093 (patch)
tree4eec670f357ddf2704cde90ccf1e1e1e50b97e34
parentd3b8773888202a69c6df43a7783cc7faed667aeb (diff)
downloadqpid-python-4861d93f7ae5c084603fcd0fccc73054620a7093.tar.gz
QPID-1266 - Provide a stop() method on AMQQueue to stop all processing on that queue thus shutting down the ThreadPool.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@691263 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java11
3 files changed, 18 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 90d7109df8..03ccbe7ce4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -161,6 +161,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void deliverAsync();
+ void stop();
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 29c3f68286..1184ba1d19 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1102,12 +1102,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
_deleteTaskList.clear();
- ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ stop();
}
return getMessageCount();
}
+ public void stop()
+ {
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ }
+
public void deliverAsync()
{
_stateChangeCount.incrementAndGet();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index b25a56344e..71f6c8ed44 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -301,11 +301,20 @@ public class VirtualHost implements Accessable
public void close() throws Exception
{
+ //Stop the Queues processing
+ if (_queueRegistry != null)
+ {
+ for (AMQQueue queue : _queueRegistry.getQueues())
+ {
+ queue.stop();
+ }
+ }
+
//Stop Housekeeping
if (_houseKeepingTimer != null)
{
_houseKeepingTimer.cancel();
- }
+ }
//Stop Connections
_connectionRegistry.close();