summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-30 20:55:40 +0000
committerAlan Conway <aconway@apache.org>2011-09-30 20:55:40 +0000
commitd0a7182866f7ea9a684a55b540814ce687a0fc41 (patch)
treef48a63d8e8641132fb0e627887d435615c634322
parente53cb60510f8e7afee95bcbbad445b280ff29ce2 (diff)
downloadqpid-python-d0a7182866f7ea9a684a55b540814ce687a0fc41.tar.gz
QPID-2920: Renamed Stoppable to Activity, clearer naming.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1177829 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp36
-rw-r--r--qpid/cpp/src/qpid/sys/Activity.h (renamed from qpid/cpp/src/qpid/sys/Stoppable.h)40
4 files changed, 50 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 6b96bc64fa..0b43fd958e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
ClusterAcquireScope acquireScope; // Outside the lock
- Stoppable::Scope consumeScope(consuming);
+ Activity::Scope consumeScope(consuming);
Mutex::ScopedLock locker(messageLock);
if (!consumeScope) {
QPID_LOG(trace, "Queue stopped, can't consume: " << name);
@@ -1288,6 +1288,8 @@ void Queue::startConsumers() {
notifyListener();
}
+bool Queue::isConsumingStopped() { return consuming.isStopped(); }
+
// Called when all busy threads exited after stopConsumers()
void Queue::consumingStopped() {
QPID_LOG(trace, "Stopped consumers on " << getName());
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 150348590b..aae858f804 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -31,11 +31,10 @@
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
#include "qpid/broker/QueueObserver.h"
-
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Activity.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Stoppable.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
@@ -130,7 +129,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
- sys::Stoppable consuming; // Allow consumer threads to be stopped, used by cluster
+ sys::Activity consuming; // Allow consumer threads to be stopped, used by cluster
boost::intrusive_ptr<RefCounted> clusterContext; // Used by cluster
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
@@ -398,6 +397,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Start consumers. */
void startConsumers();
+ /** Return true if consumers are stopped */
+ bool isConsumingStopped();
+
/** Context information used in a cluster. */
boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; }
void setClusterContext(boost::intrusive_ptr<RefCounted> context) { clusterContext = context; }
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index f95c2c40b4..fd21ccf79c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -52,9 +52,9 @@ using namespace broker;
namespace {
const ProtocolVersion pv; // shorthand
-// noReplicate means the current thread is handling a message
-// received from the cluster so it should not be replicated.
-QPID_TSS bool tssNoReplicate = false;
+// True means the current thread is handling a local event that should be replicated.
+// False means we're handling a cluster event it should not be replicated.
+QPID_TSS bool tssReplicate = true;
}
// FIXME aconway 2011-09-26: de-const the broker::Cluster interface,
@@ -72,13 +72,13 @@ Multicaster& BrokerContext::mcaster(const std::string& name) {
}
BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() {
- assert(!tssNoReplicate);
- tssNoReplicate = true;
+ assert(tssReplicate);
+ tssReplicate = false;
}
BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() {
- assert(tssNoReplicate);
- tssNoReplicate = false;
+ assert(!tssReplicate);
+ tssReplicate = true;
}
BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q)
@@ -88,7 +88,7 @@ BrokerContext::~BrokerContext() {}
bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
{
- if (tssNoReplicate) return true;
+ if (!tssReplicate) return true;
// FIXME aconway 2010-10-20: replicate message in fragments
// (frames), using fixed size bufffers.
std::string data(msg->encodedSize(),char());
@@ -104,18 +104,18 @@ void BrokerContext::routing(const boost::intrusive_ptr<broker::Message>&) {}
void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {}
void BrokerContext::acquire(const broker::QueuedMessage& qm) {
- if (tssNoReplicate) return;
- mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
+ if (tssReplicate)
+ mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
}
void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
- if (!tssNoReplicate)
+ if (tssReplicate)
mcaster(qm).mcast(
ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position));
}
void BrokerContext::requeue(const broker::QueuedMessage& qm) {
- if (!tssNoReplicate)
+ if (tssReplicate)
mcaster(qm).mcast(ClusterMessageRequeueBody(
pv,
qm.queue->getName(),
@@ -125,7 +125,7 @@ void BrokerContext::requeue(const broker::QueuedMessage& qm) {
// FIXME aconway 2011-06-08: should be be using shared_ptr to q here?
void BrokerContext::create(broker::Queue& q) {
- if (tssNoReplicate) return;
+ if (!tssReplicate) return;
assert(!QueueContext::get(q));
boost::intrusive_ptr<QueueContext> context(
new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName())));
@@ -137,12 +137,12 @@ void BrokerContext::create(broker::Queue& q) {
}
void BrokerContext::destroy(broker::Queue& q) {
- if (tssNoReplicate) return;
+ if (!tssReplicate) return;
mcaster(q).mcast(ClusterWiringDestroyQueueBody(pv, q.getName()));
}
void BrokerContext::create(broker::Exchange& ex) {
- if (tssNoReplicate) return;
+ if (!tssReplicate) return;
std::string data(ex.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
ex.encode(buf);
@@ -150,7 +150,7 @@ void BrokerContext::create(broker::Exchange& ex) {
}
void BrokerContext::destroy(broker::Exchange& ex) {
- if (tssNoReplicate) return;
+ if (!tssReplicate) return;
mcaster(ex.getName()).mcast(
ClusterWiringDestroyExchangeBody(pv, ex.getName()));
}
@@ -158,14 +158,14 @@ void BrokerContext::destroy(broker::Exchange& ex) {
void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex,
const std::string& key, const framing::FieldTable& args)
{
- if (tssNoReplicate) return;
+ if (!tssReplicate) return;
mcaster(q).mcast(ClusterWiringBindBody(pv, q.getName(), ex.getName(), key, args));
}
void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
const std::string& key, const framing::FieldTable& args)
{
- if (tssNoReplicate) return;
+ if (!tssReplicate) return;
mcaster(q).mcast(ClusterWiringUnbindBody(pv, q.getName(), ex.getName(), key, args));
}
diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Activity.h
index 6f10935c27..36db9da216 100644
--- a/qpid/cpp/src/qpid/sys/Stoppable.h
+++ b/qpid/cpp/src/qpid/sys/Activity.h
@@ -1,5 +1,5 @@
-#ifndef QPID_SYS_STOPPABLE_H
-#define QPID_SYS_STOPPABLE_H
+#ifndef QPID_SYS_ACTIVITY_H
+#define QPID_SYS_ACTIVITY_H
/*
*
@@ -28,39 +28,42 @@ namespace qpid {
namespace sys {
/**
- * An activity that may be executed by multiple threads, and can be stopped.
- *
- * Stopping prevents new threads from entering and calls a callback
- * when all busy threads have left.
+ * An activity that may be executed by multiple threads concurrently.
+ * An activity has 3 states:
+ * - active: may have active threads, new threads may enter.
+ * - stopping: may have active threads but no new threads may enter.
+ * - stopped: no active threads and no new threads may enter.
*/
-class Stoppable {
+class Activity {
public:
/**
- * Initially not stopped.
+ * Initially active.
*@param stoppedCallback: called when all threads have stopped.
*/
- Stoppable(boost::function<void()> stoppedCallback)
+ Activity(boost::function<void()> stoppedCallback)
: busy(0), stopped(false), notify(stoppedCallback) {}
- /** Mark the scope of a busy thread like this:
+ /** Mark the scope of an activity thread like this:
* <pre>
* {
- * Stoppable::Scope working(stoppable);
- * if (working) { do stuff }
+ * Activity::Scope working(activity);
+ * if (working) { do stuff } // Only if activity is active.
* }
* </pre>
*/
class Scope {
- Stoppable& state;
+ Activity& state;
bool entered;
public:
- Scope(Stoppable& s) : state(s) { entered = state.enter(); }
+ Scope(Activity& s) : state(s) { entered = state.enter(); }
~Scope() { if (entered) state.exit(); }
operator bool() const { return entered; }
};
friend class Scope;
+ // FIXME aconway 2011-09-30: fix pre-conditions with asserts, don't allow
+ // multiple stops/starts.
/**
* Set state to "stopped", so no new threads can enter.
* Notify function will be called when all busy threads have left.
@@ -80,8 +83,13 @@ class Stoppable {
stopped = false;
}
- private:
+ /** True if Activity is stopped with no */
+ bool isStopped() {
+ sys::Monitor::ScopedLock l(lock);
+ return stopped && busy == 0;
+ }
+ private:
// Busy thread enters scope
bool enter() {
sys::Monitor::ScopedLock l(lock);
@@ -110,4 +118,4 @@ class Stoppable {
}} // namespace qpid::sys
-#endif /*!QPID_SYS_STOPPABLE_H*/
+#endif /*!QPID_SYS_ACTIVITY_H*/