summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-06 21:46:33 +0000
committerAlan Conway <aconway@apache.org>2011-09-06 21:46:33 +0000
commit229474e3ee0e8dfd134fab2d5a595a388c4b71ed (patch)
tree1773dc3afe66a6b6919c46cc1acd3820af889318
parentaa388eb275cc51b5a7189025c1702b828275eebe (diff)
downloadqpid-python-229474e3ee0e8dfd134fab2d5a595a388c4b71ed.tar.gz
QPID-2920: Allow stopping consumers on queues.
Stop consumers from dispatching and wait for already dispatching consumers to exit. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165881 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp27
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h20
-rw-r--r--qpid/cpp/src/qpid/sys/Stoppable.h91
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp39
4 files changed, 166 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 20d9361909..b6b896ce58 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -373,11 +373,18 @@ void Queue::removeListener(Consumer::shared_ptr c)
bool Queue::dispatch(Consumer::shared_ptr c)
{
- QueuedMessage msg(this);
- if (getNextMessage(msg, c)) {
- c->deliver(msg);
- return true;
- } else {
+ Stoppable::Scope doDispatch(dispatching);
+ if (doDispatch) {
+ QueuedMessage msg(this);
+ if (getNextMessage(msg, c)) {
+ c->deliver(msg);
+ return true;
+ } else {
+ return false;
+ }
+ } else { // Dispatching is stopped
+ Mutex::ScopedLock locker(messageLock);
+ listeners.addListener(c); // FIXME aconway 2011-05-05:
return false;
}
}
@@ -1265,3 +1272,13 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
+
+// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
+void Queue::stop() {
+ dispatching.stop();
+}
+
+void Queue::start() {
+ dispatching.start();
+ notifyListener();
+}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 8435e75cab..ce13bebefe 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -36,6 +36,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Stoppable.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
@@ -70,6 +71,7 @@ class Exchange;
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
+ // Used to prevent destruction of the queue while it is in use.
struct UsageBarrier
{
Queue& parent;
@@ -129,6 +131,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+ // Allow dispatching consumer threads to be stopped.
+ sys::Stoppable dispatching;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -385,9 +389,21 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
+
+ /** Stop consumers. Return when all consumer threads are stopped.
+ *@pre Queue is active and not already stopping.
+ */
+ void stop();
+
+ /** Start consumers.
+ *@pre Queue is stopped and idle: no thread in dispatch.
+ */
+ void start();
+
+ /** Context data attached and used by cluster code. */
+ boost::intrusive_ptr<qpid::RefCounted> clusterContext;
};
-}
-}
+}} // qpid::broker
#endif /*!_broker_Queue_h*/
diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h
new file mode 100644
index 0000000000..af21af46ba
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/Stoppable.h
@@ -0,0 +1,91 @@
+#ifndef QPID_SYS_STOPPABLE_H
+#define QPID_SYS_STOPPABLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace sys {
+
+/**
+ * An activity that may be executed by multiple threads, and can be stopped.
+ * Stopping prevents new threads from entering and waits till exiting busy threads leave.
+ */
+class Stoppable {
+ public:
+ Stoppable() : busy(0), stopped(false) {}
+ ~Stoppable() { stop(); }
+
+ /** Mark the scope of a busy thread like this:
+ * <pre>
+ * {
+ * Stoppable::Scope working(stoppable);
+ * if (working) { do stuff }
+ * }
+ * </pre>
+ */
+ class Scope {
+ Stoppable& state;
+ bool entered;
+ public:
+ Scope(Stoppable& s) : state(s) { entered = s.enter(); }
+ ~Scope() { if (entered) state.exit(); }
+ operator bool() const { return entered; }
+ };
+
+ friend class Scope;
+
+ /** Mark stopped, wait for all threads to leave their busy scope. */
+ void stop() {
+ sys::Monitor::ScopedLock l(lock);
+ stopped = true;
+ while (busy > 0) lock.wait();
+ }
+
+ /** Set the state to started.
+ *@pre state is stopped and no theads are busy.
+ */
+ void start() {
+ sys::Monitor::ScopedLock l(lock);
+ assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling.
+ stopped = false;
+ }
+
+ private:
+ uint busy;
+ bool stopped;
+ sys::Monitor lock;
+
+ bool enter() {
+ sys::Monitor::ScopedLock l(lock);
+ if (!stopped) ++busy;
+ return !stopped;
+ }
+
+ void exit() {
+ sys::Monitor::ScopedLock l(lock);
+ assert(busy > 0);
+ if (--busy == 0) lock.notifyAll();
+ }
+};
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_STOPPABLE_H*/
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index d94a5cab7f..ac02ad38ad 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -1,4 +1,4 @@
- /*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -41,6 +41,7 @@
#include <iostream>
#include "boost/format.hpp"
+#include <boost/enable_shared_from_this.hpp>
using boost::intrusive_ptr;
using namespace qpid;
@@ -57,16 +58,22 @@ public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
intrusive_ptr<Message> last;
- bool received;
- TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+ bool received, notified;
+
+ TestConsumer(bool acquire = true):
+ Consumer(acquire), received(false), notified(false) {};
virtual bool deliver(QueuedMessage& msg){
last = msg.payload;
received = true;
return true;
};
- void notify() {}
+ void notify() {
+ notified = true;
+ }
+
OwnershipToken* getSession() { return 0; }
+ void reset() { last = intrusive_ptr<Message>(); received = false; }
};
class FailOnDeliver : public Deliverable
@@ -1114,6 +1121,30 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
}
+QPID_AUTO_TEST_CASE(testStopStart) {
+ boost::shared_ptr<Queue> q(new Queue("foo"));
+ boost::shared_ptr<TestConsumer> c(new TestConsumer);
+ intrusive_ptr<Message> m = create_message("x","y");
+ q->consume(c);
+ // Initially q is started.
+ q->deliver(m);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK(c->received);
+ c->reset();
+ // Stop q, should not receive message
+ q->stop();
+ q->deliver(m);
+ BOOST_CHECK(!q->dispatch(c));
+ BOOST_CHECK(!c->received);
+ BOOST_CHECK(!c->notified);
+ // Start q, should be notified and delivered
+ q->start();
+ q->deliver(m);
+ BOOST_CHECK(c->notified);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK(c->received);
+}
+
QPID_AUTO_TEST_SUITE_END()