From 229474e3ee0e8dfd134fab2d5a595a388c4b71ed Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 6 Sep 2011 21:46:33 +0000 Subject: QPID-2920: Allow stopping consumers on queues. Stop consumers from dispatching and wait for already dispatching consumers to exit. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165881 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 27 ++++++++--- qpid/cpp/src/qpid/broker/Queue.h | 20 ++++++++- qpid/cpp/src/qpid/sys/Stoppable.h | 91 ++++++++++++++++++++++++++++++++++++++ qpid/cpp/src/tests/QueueTest.cpp | 39 ++++++++++++++-- 4 files changed, 166 insertions(+), 11 deletions(-) create mode 100644 qpid/cpp/src/qpid/sys/Stoppable.h diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 20d9361909..b6b896ce58 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -373,11 +373,18 @@ void Queue::removeListener(Consumer::shared_ptr c) bool Queue::dispatch(Consumer::shared_ptr c) { - QueuedMessage msg(this); - if (getNextMessage(msg, c)) { - c->deliver(msg); - return true; - } else { + Stoppable::Scope doDispatch(dispatching); + if (doDispatch) { + QueuedMessage msg(this); + if (getNextMessage(msg, c)) { + c->deliver(msg); + return true; + } else { + return false; + } + } else { // Dispatching is stopped + Mutex::ScopedLock locker(messageLock); + listeners.addListener(c); // FIXME aconway 2011-05-05: return false; } } @@ -1265,3 +1272,13 @@ void Queue::UsageBarrier::destroy() parent.deleted = true; while (count) parent.messageLock.wait(); } + +// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()? +void Queue::stop() { + dispatching.stop(); +} + +void Queue::start() { + dispatching.start(); + notifyListener(); +} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 8435e75cab..ce13bebefe 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -36,6 +36,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Stoppable.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" @@ -70,6 +71,7 @@ class Exchange; class Queue : public boost::enable_shared_from_this, public PersistableQueue, public management::Manageable { + // Used to prevent destruction of the queue while it is in use. struct UsageBarrier { Queue& parent; @@ -129,6 +131,8 @@ class Queue : public boost::enable_shared_from_this, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr autoDeleteTask; + // Allow dispatching consumer threads to be stopped. + sys::Stoppable dispatching; void push(boost::intrusive_ptr& msg, bool isRecovery=false); void setPolicy(std::auto_ptr policy); @@ -385,9 +389,21 @@ class Queue : public boost::enable_shared_from_this, uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } void setDequeueSincePurge(uint32_t value); + + /** Stop consumers. Return when all consumer threads are stopped. + *@pre Queue is active and not already stopping. + */ + void stop(); + + /** Start consumers. + *@pre Queue is stopped and idle: no thread in dispatch. + */ + void start(); + + /** Context data attached and used by cluster code. */ + boost::intrusive_ptr clusterContext; }; -} -} +}} // qpid::broker #endif /*!_broker_Queue_h*/ diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h new file mode 100644 index 0000000000..af21af46ba --- /dev/null +++ b/qpid/cpp/src/qpid/sys/Stoppable.h @@ -0,0 +1,91 @@ +#ifndef QPID_SYS_STOPPABLE_H +#define QPID_SYS_STOPPABLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace sys { + +/** + * An activity that may be executed by multiple threads, and can be stopped. + * Stopping prevents new threads from entering and waits till exiting busy threads leave. + */ +class Stoppable { + public: + Stoppable() : busy(0), stopped(false) {} + ~Stoppable() { stop(); } + + /** Mark the scope of a busy thread like this: + *
+     * {
+     *   Stoppable::Scope working(stoppable);
+     *   if (working) { do stuff }
+     * }
+     * 
+ */ + class Scope { + Stoppable& state; + bool entered; + public: + Scope(Stoppable& s) : state(s) { entered = s.enter(); } + ~Scope() { if (entered) state.exit(); } + operator bool() const { return entered; } + }; + + friend class Scope; + + /** Mark stopped, wait for all threads to leave their busy scope. */ + void stop() { + sys::Monitor::ScopedLock l(lock); + stopped = true; + while (busy > 0) lock.wait(); + } + + /** Set the state to started. + *@pre state is stopped and no theads are busy. + */ + void start() { + sys::Monitor::ScopedLock l(lock); + assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling. + stopped = false; + } + + private: + uint busy; + bool stopped; + sys::Monitor lock; + + bool enter() { + sys::Monitor::ScopedLock l(lock); + if (!stopped) ++busy; + return !stopped; + } + + void exit() { + sys::Monitor::ScopedLock l(lock); + assert(busy > 0); + if (--busy == 0) lock.notifyAll(); + } +}; + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_STOPPABLE_H*/ diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index d94a5cab7f..ac02ad38ad 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -1,4 +1,4 @@ - /* +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -41,6 +41,7 @@ #include #include "boost/format.hpp" +#include using boost::intrusive_ptr; using namespace qpid; @@ -57,16 +58,22 @@ public: typedef boost::shared_ptr shared_ptr; intrusive_ptr last; - bool received; - TestConsumer(bool acquire = true):Consumer(acquire), received(false) {}; + bool received, notified; + + TestConsumer(bool acquire = true): + Consumer(acquire), received(false), notified(false) {}; virtual bool deliver(QueuedMessage& msg){ last = msg.payload; received = true; return true; }; - void notify() {} + void notify() { + notified = true; + } + OwnershipToken* getSession() { return 0; } + void reset() { last = intrusive_ptr(); received = false; } }; class FailOnDeliver : public Deliverable @@ -1114,6 +1121,30 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); } +QPID_AUTO_TEST_CASE(testStopStart) { + boost::shared_ptr q(new Queue("foo")); + boost::shared_ptr c(new TestConsumer); + intrusive_ptr m = create_message("x","y"); + q->consume(c); + // Initially q is started. + q->deliver(m); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK(c->received); + c->reset(); + // Stop q, should not receive message + q->stop(); + q->deliver(m); + BOOST_CHECK(!q->dispatch(c)); + BOOST_CHECK(!c->received); + BOOST_CHECK(!c->notified); + // Start q, should be notified and delivered + q->start(); + q->deliver(m); + BOOST_CHECK(c->notified); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK(c->received); +} + QPID_AUTO_TEST_SUITE_END() -- cgit v1.2.1