diff options
author | Alan Conway <aconway@apache.org> | 2011-09-30 20:55:40 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-30 20:55:40 +0000 |
commit | d0a7182866f7ea9a684a55b540814ce687a0fc41 (patch) | |
tree | f48a63d8e8641132fb0e627887d435615c634322 | |
parent | e53cb60510f8e7afee95bcbbad445b280ff29ce2 (diff) | |
download | qpid-python-d0a7182866f7ea9a684a55b540814ce687a0fc41.tar.gz |
QPID-2920: Renamed Stoppable to Activity, clearer naming.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1177829 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Activity.h (renamed from qpid/cpp/src/qpid/sys/Stoppable.h) | 40 |
4 files changed, 50 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 6b96bc64fa..0b43fd958e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { ClusterAcquireScope acquireScope; // Outside the lock - Stoppable::Scope consumeScope(consuming); + Activity::Scope consumeScope(consuming); Mutex::ScopedLock locker(messageLock); if (!consumeScope) { QPID_LOG(trace, "Queue stopped, can't consume: " << name); @@ -1288,6 +1288,8 @@ void Queue::startConsumers() { notifyListener(); } +bool Queue::isConsumingStopped() { return consuming.isStopped(); } + // Called when all busy threads exited after stopConsumers() void Queue::consumingStopped() { QPID_LOG(trace, "Stopped consumers on " << getName()); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 150348590b..aae858f804 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -31,11 +31,10 @@ #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" #include "qpid/broker/QueueObserver.h" - #include "qpid/framing/FieldTable.h" +#include "qpid/sys/Activity.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/Stoppable.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" @@ -130,7 +129,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; - sys::Stoppable consuming; // Allow consumer threads to be stopped, used by cluster + sys::Activity consuming; // Allow consumer threads to be stopped, used by cluster boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); @@ -398,6 +397,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Start consumers. */ void startConsumers(); + /** Return true if consumers are stopped */ + bool isConsumingStopped(); + /** Context information used in a cluster. */ boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; } void setClusterContext(boost::intrusive_ptr<RefCounted> context) { clusterContext = context; } diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index f95c2c40b4..fd21ccf79c 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -52,9 +52,9 @@ using namespace broker; namespace { const ProtocolVersion pv; // shorthand -// noReplicate means the current thread is handling a message -// received from the cluster so it should not be replicated. -QPID_TSS bool tssNoReplicate = false; +// True means the current thread is handling a local event that should be replicated. +// False means we're handling a cluster event it should not be replicated. +QPID_TSS bool tssReplicate = true; } // FIXME aconway 2011-09-26: de-const the broker::Cluster interface, @@ -72,13 +72,13 @@ Multicaster& BrokerContext::mcaster(const std::string& name) { } BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() { - assert(!tssNoReplicate); - tssNoReplicate = true; + assert(tssReplicate); + tssReplicate = false; } BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() { - assert(tssNoReplicate); - tssNoReplicate = false; + assert(!tssReplicate); + tssReplicate = true; } BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q) @@ -88,7 +88,7 @@ BrokerContext::~BrokerContext() {} bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg) { - if (tssNoReplicate) return true; + if (!tssReplicate) return true; // FIXME aconway 2010-10-20: replicate message in fragments // (frames), using fixed size bufffers. std::string data(msg->encodedSize(),char()); @@ -104,18 +104,18 @@ void BrokerContext::routing(const boost::intrusive_ptr<broker::Message>&) {} void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {} void BrokerContext::acquire(const broker::QueuedMessage& qm) { - if (tssNoReplicate) return; - mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position)); + if (tssReplicate) + mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position)); } void BrokerContext::dequeue(const broker::QueuedMessage& qm) { - if (!tssNoReplicate) + if (tssReplicate) mcaster(qm).mcast( ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position)); } void BrokerContext::requeue(const broker::QueuedMessage& qm) { - if (!tssNoReplicate) + if (tssReplicate) mcaster(qm).mcast(ClusterMessageRequeueBody( pv, qm.queue->getName(), @@ -125,7 +125,7 @@ void BrokerContext::requeue(const broker::QueuedMessage& qm) { // FIXME aconway 2011-06-08: should be be using shared_ptr to q here? void BrokerContext::create(broker::Queue& q) { - if (tssNoReplicate) return; + if (!tssReplicate) return; assert(!QueueContext::get(q)); boost::intrusive_ptr<QueueContext> context( new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName()))); @@ -137,12 +137,12 @@ void BrokerContext::create(broker::Queue& q) { } void BrokerContext::destroy(broker::Queue& q) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringDestroyQueueBody(pv, q.getName())); } void BrokerContext::create(broker::Exchange& ex) { - if (tssNoReplicate) return; + if (!tssReplicate) return; std::string data(ex.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); ex.encode(buf); @@ -150,7 +150,7 @@ void BrokerContext::create(broker::Exchange& ex) { } void BrokerContext::destroy(broker::Exchange& ex) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(ex.getName()).mcast( ClusterWiringDestroyExchangeBody(pv, ex.getName())); } @@ -158,14 +158,14 @@ void BrokerContext::destroy(broker::Exchange& ex) { void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringBindBody(pv, q.getName(), ex.getName(), key, args)); } void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringUnbindBody(pv, q.getName(), ex.getName(), key, args)); } diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Activity.h index 6f10935c27..36db9da216 100644 --- a/qpid/cpp/src/qpid/sys/Stoppable.h +++ b/qpid/cpp/src/qpid/sys/Activity.h @@ -1,5 +1,5 @@ -#ifndef QPID_SYS_STOPPABLE_H -#define QPID_SYS_STOPPABLE_H +#ifndef QPID_SYS_ACTIVITY_H +#define QPID_SYS_ACTIVITY_H /* * @@ -28,39 +28,42 @@ namespace qpid { namespace sys { /** - * An activity that may be executed by multiple threads, and can be stopped. - * - * Stopping prevents new threads from entering and calls a callback - * when all busy threads have left. + * An activity that may be executed by multiple threads concurrently. + * An activity has 3 states: + * - active: may have active threads, new threads may enter. + * - stopping: may have active threads but no new threads may enter. + * - stopped: no active threads and no new threads may enter. */ -class Stoppable { +class Activity { public: /** - * Initially not stopped. + * Initially active. *@param stoppedCallback: called when all threads have stopped. */ - Stoppable(boost::function<void()> stoppedCallback) + Activity(boost::function<void()> stoppedCallback) : busy(0), stopped(false), notify(stoppedCallback) {} - /** Mark the scope of a busy thread like this: + /** Mark the scope of an activity thread like this: * <pre> * { - * Stoppable::Scope working(stoppable); - * if (working) { do stuff } + * Activity::Scope working(activity); + * if (working) { do stuff } // Only if activity is active. * } * </pre> */ class Scope { - Stoppable& state; + Activity& state; bool entered; public: - Scope(Stoppable& s) : state(s) { entered = state.enter(); } + Scope(Activity& s) : state(s) { entered = state.enter(); } ~Scope() { if (entered) state.exit(); } operator bool() const { return entered; } }; friend class Scope; + // FIXME aconway 2011-09-30: fix pre-conditions with asserts, don't allow + // multiple stops/starts. /** * Set state to "stopped", so no new threads can enter. * Notify function will be called when all busy threads have left. @@ -80,8 +83,13 @@ class Stoppable { stopped = false; } - private: + /** True if Activity is stopped with no */ + bool isStopped() { + sys::Monitor::ScopedLock l(lock); + return stopped && busy == 0; + } + private: // Busy thread enters scope bool enter() { sys::Monitor::ScopedLock l(lock); @@ -110,4 +118,4 @@ class Stoppable { }} // namespace qpid::sys -#endif /*!QPID_SYS_STOPPABLE_H*/ +#endif /*!QPID_SYS_ACTIVITY_H*/ |