diff options
author | Alan Conway <aconway@apache.org> | 2011-09-06 21:46:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-06 21:46:33 +0000 |
commit | 229474e3ee0e8dfd134fab2d5a595a388c4b71ed (patch) | |
tree | 1773dc3afe66a6b6919c46cc1acd3820af889318 | |
parent | aa388eb275cc51b5a7189025c1702b828275eebe (diff) | |
download | qpid-python-229474e3ee0e8dfd134fab2d5a595a388c4b71ed.tar.gz |
QPID-2920: Allow stopping consumers on queues.
Stop consumers from dispatching and wait for already dispatching consumers to exit.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165881 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Stoppable.h | 91 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 39 |
4 files changed, 166 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 20d9361909..b6b896ce58 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -373,11 +373,18 @@ void Queue::removeListener(Consumer::shared_ptr c) bool Queue::dispatch(Consumer::shared_ptr c) { - QueuedMessage msg(this); - if (getNextMessage(msg, c)) { - c->deliver(msg); - return true; - } else { + Stoppable::Scope doDispatch(dispatching); + if (doDispatch) { + QueuedMessage msg(this); + if (getNextMessage(msg, c)) { + c->deliver(msg); + return true; + } else { + return false; + } + } else { // Dispatching is stopped + Mutex::ScopedLock locker(messageLock); + listeners.addListener(c); // FIXME aconway 2011-05-05: return false; } } @@ -1265,3 +1272,13 @@ void Queue::UsageBarrier::destroy() parent.deleted = true; while (count) parent.messageLock.wait(); } + +// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()? +void Queue::stop() { + dispatching.stop(); +} + +void Queue::start() { + dispatching.start(); + notifyListener(); +} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 8435e75cab..ce13bebefe 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -36,6 +36,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Stoppable.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" @@ -70,6 +71,7 @@ class Exchange; class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { + // Used to prevent destruction of the queue while it is in use. struct UsageBarrier { Queue& parent; @@ -129,6 +131,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; + // Allow dispatching consumer threads to be stopped. + sys::Stoppable dispatching; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -385,9 +389,21 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } void setDequeueSincePurge(uint32_t value); + + /** Stop consumers. Return when all consumer threads are stopped. + *@pre Queue is active and not already stopping. + */ + void stop(); + + /** Start consumers. + *@pre Queue is stopped and idle: no thread in dispatch. + */ + void start(); + + /** Context data attached and used by cluster code. */ + boost::intrusive_ptr<qpid::RefCounted> clusterContext; }; -} -} +}} // qpid::broker #endif /*!_broker_Queue_h*/ diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h new file mode 100644 index 0000000000..af21af46ba --- /dev/null +++ b/qpid/cpp/src/qpid/sys/Stoppable.h @@ -0,0 +1,91 @@ +#ifndef QPID_SYS_STOPPABLE_H +#define QPID_SYS_STOPPABLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace sys { + +/** + * An activity that may be executed by multiple threads, and can be stopped. + * Stopping prevents new threads from entering and waits till exiting busy threads leave. + */ +class Stoppable { + public: + Stoppable() : busy(0), stopped(false) {} + ~Stoppable() { stop(); } + + /** Mark the scope of a busy thread like this: + * <pre> + * { + * Stoppable::Scope working(stoppable); + * if (working) { do stuff } + * } + * </pre> + */ + class Scope { + Stoppable& state; + bool entered; + public: + Scope(Stoppable& s) : state(s) { entered = s.enter(); } + ~Scope() { if (entered) state.exit(); } + operator bool() const { return entered; } + }; + + friend class Scope; + + /** Mark stopped, wait for all threads to leave their busy scope. */ + void stop() { + sys::Monitor::ScopedLock l(lock); + stopped = true; + while (busy > 0) lock.wait(); + } + + /** Set the state to started. + *@pre state is stopped and no theads are busy. + */ + void start() { + sys::Monitor::ScopedLock l(lock); + assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling. + stopped = false; + } + + private: + uint busy; + bool stopped; + sys::Monitor lock; + + bool enter() { + sys::Monitor::ScopedLock l(lock); + if (!stopped) ++busy; + return !stopped; + } + + void exit() { + sys::Monitor::ScopedLock l(lock); + assert(busy > 0); + if (--busy == 0) lock.notifyAll(); + } +}; + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_STOPPABLE_H*/ diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index d94a5cab7f..ac02ad38ad 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -1,4 +1,4 @@ - /* +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -41,6 +41,7 @@ #include <iostream> #include "boost/format.hpp" +#include <boost/enable_shared_from_this.hpp> using boost::intrusive_ptr; using namespace qpid; @@ -57,16 +58,22 @@ public: typedef boost::shared_ptr<TestConsumer> shared_ptr; intrusive_ptr<Message> last; - bool received; - TestConsumer(bool acquire = true):Consumer(acquire), received(false) {}; + bool received, notified; + + TestConsumer(bool acquire = true): + Consumer(acquire), received(false), notified(false) {}; virtual bool deliver(QueuedMessage& msg){ last = msg.payload; received = true; return true; }; - void notify() {} + void notify() { + notified = true; + } + OwnershipToken* getSession() { return 0; } + void reset() { last = intrusive_ptr<Message>(); received = false; } }; class FailOnDeliver : public Deliverable @@ -1114,6 +1121,30 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); } +QPID_AUTO_TEST_CASE(testStopStart) { + boost::shared_ptr<Queue> q(new Queue("foo")); + boost::shared_ptr<TestConsumer> c(new TestConsumer); + intrusive_ptr<Message> m = create_message("x","y"); + q->consume(c); + // Initially q is started. + q->deliver(m); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK(c->received); + c->reset(); + // Stop q, should not receive message + q->stop(); + q->deliver(m); + BOOST_CHECK(!q->dispatch(c)); + BOOST_CHECK(!c->received); + BOOST_CHECK(!c->notified); + // Start q, should be notified and delivered + q->start(); + q->deliver(m); + BOOST_CHECK(c->notified); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK(c->received); +} + QPID_AUTO_TEST_SUITE_END() |