From d0a7182866f7ea9a684a55b540814ce687a0fc41 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 30 Sep 2011 20:55:40 +0000 Subject: QPID-2920: Renamed Stoppable to Activity, clearer naming. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1177829 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 4 +- qpid/cpp/src/qpid/broker/Queue.h | 8 +- qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 36 +++---- qpid/cpp/src/qpid/sys/Activity.h | 121 ++++++++++++++++++++++++ qpid/cpp/src/qpid/sys/Stoppable.h | 113 ---------------------- 5 files changed, 147 insertions(+), 135 deletions(-) create mode 100644 qpid/cpp/src/qpid/sys/Activity.h delete mode 100644 qpid/cpp/src/qpid/sys/Stoppable.h diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 6b96bc64fa..0b43fd958e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { ClusterAcquireScope acquireScope; // Outside the lock - Stoppable::Scope consumeScope(consuming); + Activity::Scope consumeScope(consuming); Mutex::ScopedLock locker(messageLock); if (!consumeScope) { QPID_LOG(trace, "Queue stopped, can't consume: " << name); @@ -1288,6 +1288,8 @@ void Queue::startConsumers() { notifyListener(); } +bool Queue::isConsumingStopped() { return consuming.isStopped(); } + // Called when all busy threads exited after stopConsumers() void Queue::consumingStopped() { QPID_LOG(trace, "Stopped consumers on " << getName()); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 150348590b..aae858f804 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -31,11 +31,10 @@ #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" #include "qpid/broker/QueueObserver.h" - #include "qpid/framing/FieldTable.h" +#include "qpid/sys/Activity.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/Stoppable.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" @@ -130,7 +129,7 @@ class Queue : public boost::enable_shared_from_this, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr autoDeleteTask; - sys::Stoppable consuming; // Allow consumer threads to be stopped, used by cluster + sys::Activity consuming; // Allow consumer threads to be stopped, used by cluster boost::intrusive_ptr clusterContext; // Used by cluster void push(boost::intrusive_ptr& msg, bool isRecovery=false); @@ -398,6 +397,9 @@ class Queue : public boost::enable_shared_from_this, /** Start consumers. */ void startConsumers(); + /** Return true if consumers are stopped */ + bool isConsumingStopped(); + /** Context information used in a cluster. */ boost::intrusive_ptr getClusterContext() { return clusterContext; } void setClusterContext(boost::intrusive_ptr context) { clusterContext = context; } diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index f95c2c40b4..fd21ccf79c 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -52,9 +52,9 @@ using namespace broker; namespace { const ProtocolVersion pv; // shorthand -// noReplicate means the current thread is handling a message -// received from the cluster so it should not be replicated. -QPID_TSS bool tssNoReplicate = false; +// True means the current thread is handling a local event that should be replicated. +// False means we're handling a cluster event it should not be replicated. +QPID_TSS bool tssReplicate = true; } // FIXME aconway 2011-09-26: de-const the broker::Cluster interface, @@ -72,13 +72,13 @@ Multicaster& BrokerContext::mcaster(const std::string& name) { } BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() { - assert(!tssNoReplicate); - tssNoReplicate = true; + assert(tssReplicate); + tssReplicate = false; } BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() { - assert(tssNoReplicate); - tssNoReplicate = false; + assert(!tssReplicate); + tssReplicate = true; } BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr q) @@ -88,7 +88,7 @@ BrokerContext::~BrokerContext() {} bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr& msg) { - if (tssNoReplicate) return true; + if (!tssReplicate) return true; // FIXME aconway 2010-10-20: replicate message in fragments // (frames), using fixed size bufffers. std::string data(msg->encodedSize(),char()); @@ -104,18 +104,18 @@ void BrokerContext::routing(const boost::intrusive_ptr&) {} void BrokerContext::routed(const boost::intrusive_ptr&) {} void BrokerContext::acquire(const broker::QueuedMessage& qm) { - if (tssNoReplicate) return; - mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position)); + if (tssReplicate) + mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position)); } void BrokerContext::dequeue(const broker::QueuedMessage& qm) { - if (!tssNoReplicate) + if (tssReplicate) mcaster(qm).mcast( ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position)); } void BrokerContext::requeue(const broker::QueuedMessage& qm) { - if (!tssNoReplicate) + if (tssReplicate) mcaster(qm).mcast(ClusterMessageRequeueBody( pv, qm.queue->getName(), @@ -125,7 +125,7 @@ void BrokerContext::requeue(const broker::QueuedMessage& qm) { // FIXME aconway 2011-06-08: should be be using shared_ptr to q here? void BrokerContext::create(broker::Queue& q) { - if (tssNoReplicate) return; + if (!tssReplicate) return; assert(!QueueContext::get(q)); boost::intrusive_ptr context( new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName()))); @@ -137,12 +137,12 @@ void BrokerContext::create(broker::Queue& q) { } void BrokerContext::destroy(broker::Queue& q) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringDestroyQueueBody(pv, q.getName())); } void BrokerContext::create(broker::Exchange& ex) { - if (tssNoReplicate) return; + if (!tssReplicate) return; std::string data(ex.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); ex.encode(buf); @@ -150,7 +150,7 @@ void BrokerContext::create(broker::Exchange& ex) { } void BrokerContext::destroy(broker::Exchange& ex) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(ex.getName()).mcast( ClusterWiringDestroyExchangeBody(pv, ex.getName())); } @@ -158,14 +158,14 @@ void BrokerContext::destroy(broker::Exchange& ex) { void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringBindBody(pv, q.getName(), ex.getName(), key, args)); } void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { - if (tssNoReplicate) return; + if (!tssReplicate) return; mcaster(q).mcast(ClusterWiringUnbindBody(pv, q.getName(), ex.getName(), key, args)); } diff --git a/qpid/cpp/src/qpid/sys/Activity.h b/qpid/cpp/src/qpid/sys/Activity.h new file mode 100644 index 0000000000..36db9da216 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/Activity.h @@ -0,0 +1,121 @@ +#ifndef QPID_SYS_ACTIVITY_H +#define QPID_SYS_ACTIVITY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +namespace qpid { +namespace sys { + +/** + * An activity that may be executed by multiple threads concurrently. + * An activity has 3 states: + * - active: may have active threads, new threads may enter. + * - stopping: may have active threads but no new threads may enter. + * - stopped: no active threads and no new threads may enter. + */ +class Activity { + public: + /** + * Initially active. + *@param stoppedCallback: called when all threads have stopped. + */ + Activity(boost::function stoppedCallback) + : busy(0), stopped(false), notify(stoppedCallback) {} + + /** Mark the scope of an activity thread like this: + *
+     * {
+     *   Activity::Scope working(activity);
+     *   if (working) { do stuff } // Only if activity is active.
+     * }
+     * 
+ */ + class Scope { + Activity& state; + bool entered; + public: + Scope(Activity& s) : state(s) { entered = state.enter(); } + ~Scope() { if (entered) state.exit(); } + operator bool() const { return entered; } + }; + + friend class Scope; + + // FIXME aconway 2011-09-30: fix pre-conditions with asserts, don't allow + // multiple stops/starts. + /** + * Set state to "stopped", so no new threads can enter. + * Notify function will be called when all busy threads have left. + * No-op if already stopping. + */ + void stop() { + sys::Monitor::ScopedLock l(lock); + stopped = true; + check(l); + } + + /** Set the state to "started", allow threads to enter. + * If already stopping this will prevent notify function from being called. + */ + void start() { + sys::Monitor::ScopedLock l(lock); + stopped = false; + } + + /** True if Activity is stopped with no */ + bool isStopped() { + sys::Monitor::ScopedLock l(lock); + return stopped && busy == 0; + } + + private: + // Busy thread enters scope + bool enter() { + sys::Monitor::ScopedLock l(lock); + if (!stopped) ++busy; + return !stopped; + } + + // Busy thread exits scope + void exit() { + sys::Monitor::ScopedLock l(lock); + assert(busy > 0); + --busy; + check(l); + } + + void check(const sys::Monitor::ScopedLock&) { + // Called with lock held. + if (stopped && busy == 0 && notify) notify(); + } + + uint busy; + bool stopped; + sys::Monitor lock; + boost::function< void() > notify; +}; + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_ACTIVITY_H*/ diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h deleted file mode 100644 index 6f10935c27..0000000000 --- a/qpid/cpp/src/qpid/sys/Stoppable.h +++ /dev/null @@ -1,113 +0,0 @@ -#ifndef QPID_SYS_STOPPABLE_H -#define QPID_SYS_STOPPABLE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include - -namespace qpid { -namespace sys { - -/** - * An activity that may be executed by multiple threads, and can be stopped. - * - * Stopping prevents new threads from entering and calls a callback - * when all busy threads have left. - */ -class Stoppable { - public: - /** - * Initially not stopped. - *@param stoppedCallback: called when all threads have stopped. - */ - Stoppable(boost::function stoppedCallback) - : busy(0), stopped(false), notify(stoppedCallback) {} - - /** Mark the scope of a busy thread like this: - *
-     * {
-     *   Stoppable::Scope working(stoppable);
-     *   if (working) { do stuff }
-     * }
-     * 
- */ - class Scope { - Stoppable& state; - bool entered; - public: - Scope(Stoppable& s) : state(s) { entered = state.enter(); } - ~Scope() { if (entered) state.exit(); } - operator bool() const { return entered; } - }; - - friend class Scope; - - /** - * Set state to "stopped", so no new threads can enter. - * Notify function will be called when all busy threads have left. - * No-op if already stopping. - */ - void stop() { - sys::Monitor::ScopedLock l(lock); - stopped = true; - check(l); - } - - /** Set the state to "started", allow threads to enter. - * If already stopping this will prevent notify function from being called. - */ - void start() { - sys::Monitor::ScopedLock l(lock); - stopped = false; - } - - private: - - // Busy thread enters scope - bool enter() { - sys::Monitor::ScopedLock l(lock); - if (!stopped) ++busy; - return !stopped; - } - - // Busy thread exits scope - void exit() { - sys::Monitor::ScopedLock l(lock); - assert(busy > 0); - --busy; - check(l); - } - - void check(const sys::Monitor::ScopedLock&) { - // Called with lock held. - if (stopped && busy == 0 && notify) notify(); - } - - uint busy; - bool stopped; - sys::Monitor lock; - boost::function< void() > notify; -}; - -}} // namespace qpid::sys - -#endif /*!QPID_SYS_STOPPABLE_H*/ -- cgit v1.2.1