summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/cluster.mk14
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp13
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.cpp13
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp101
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h28
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp9
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Settings.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Settings.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Ticker.cpp68
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Ticker.h86
18 files changed, 281 insertions, 103 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 5401c78c3d..769a313ddb 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -115,23 +115,21 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/BrokerContext.h \
qpid/cluster/exp/BufferFactory.h \
qpid/cluster/exp/Cluster2Plugin.cpp \
- qpid/cluster/exp/CountdownTimer.h \
qpid/cluster/exp/Core.cpp \
qpid/cluster/exp/Core.h \
+ qpid/cluster/exp/CountdownTimer.h \
qpid/cluster/exp/EventHandler.cpp \
qpid/cluster/exp/EventHandler.h \
qpid/cluster/exp/Group.cpp \
qpid/cluster/exp/Group.h \
- qpid/cluster/exp/hash.cpp \
- qpid/cluster/exp/hash.h \
qpid/cluster/exp/HandlerBase.cpp \
qpid/cluster/exp/HandlerBase.h \
qpid/cluster/exp/MessageBuilders.cpp \
qpid/cluster/exp/MessageBuilders.h \
- qpid/cluster/exp/MessageHolder.cpp \
- qpid/cluster/exp/MessageHolder.h \
qpid/cluster/exp/MessageHandler.cpp \
qpid/cluster/exp/MessageHandler.h \
+ qpid/cluster/exp/MessageHolder.cpp \
+ qpid/cluster/exp/MessageHolder.h \
qpid/cluster/exp/Multicaster.cpp \
qpid/cluster/exp/Multicaster.h \
qpid/cluster/exp/QueueContext.cpp \
@@ -142,9 +140,13 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/QueueReplica.h \
qpid/cluster/exp/Settings.cpp \
qpid/cluster/exp/Settings.h \
+ qpid/cluster/exp/Ticker.h \
+ qpid/cluster/exp/Ticker.cpp \
qpid/cluster/exp/UniqueIds.h \
qpid/cluster/exp/WiringHandler.cpp \
- qpid/cluster/exp/WiringHandler.h
+ qpid/cluster/exp/WiringHandler.h \
+ qpid/cluster/exp/hash.cpp \
+ qpid/cluster/exp/hash.h
# The watchdog plugin and helper executable
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index e165ba405e..9943b3d2b5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -19,11 +19,11 @@
*
*/
-#include "Core.h"
#include "BrokerContext.h"
-#include "QueueContext.h"
-#include "Multicaster.h"
+#include "Core.h"
#include "MessageHolder.h"
+#include "Multicaster.h"
+#include "QueueContext.h"
#include "hash.h"
#include "qpid/framing/ClusterMessageEnqueueBody.h"
#include "qpid/framing/ClusterMessageAcquireBody.h"
@@ -136,7 +136,7 @@ void BrokerContext::create(broker::Queue& q) {
if (!tssReplicate) return;
assert(!QueueContext::get(q));
boost::intrusive_ptr<QueueContext> context(
- new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName())));
+ new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks));
std::string data(q.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
q.encode(buf);
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index f5cb401c51..ff3bb2085c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -29,6 +29,7 @@ namespace qpid {
namespace cluster {
class Core;
class QueueContext;
+class Multicaster;
// TODO aconway 2010-10-19: experimental cluster code.
diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
index ea2efa5233..68b9d5075b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
@@ -35,8 +35,9 @@ struct Cluster2Plugin : public Plugin {
Opts(Settings& s) : Options("Cluster Options"), settings(s) {
addOptions()
("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
- ("cluster2-consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.")
- ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.");
+ ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.")
+ ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in the cluster.")
+ ("cluster2-consume-ticks", optValue(settings.consumeTicks, "N"), "Maximum number of ticks a broker can hold the consume lock on a shared queue.");
// FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h.
// FIXME aconway 2011-10-05: rename to final option names.
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index d3ee6133c9..5c89d3ff88 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -19,13 +19,14 @@
*
*/
+#include "BrokerContext.h"
#include "Core.h"
#include "EventHandler.h"
-#include "BrokerContext.h"
-#include "WiringHandler.h"
#include "MessageHandler.h"
#include "QueueContext.h"
#include "QueueHandler.h"
+#include "WiringHandler.h"
+#include "hash.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/AMQFrame.h"
@@ -47,7 +48,7 @@ Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s)
std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i);
groups.push_back(new Group(*this));
boost::intrusive_ptr<Group> group(groups.back());
-
+
EventHandler& eh(group->getEventHandler());
typedef boost::intrusive_ptr<HandlerBase> HandlerBasePtr;
boost::intrusive_ptr<QueueHandler> queueHandler(new QueueHandler(*group, settings));
@@ -66,8 +67,6 @@ Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s)
}
QPID_LOG(notice, "cluster: joined cluster " << s.name
<< ", member-id="<< groups[0]->getEventHandler().getSelf());
- QPID_LOG(debug, "cluster: consume-lock=" << s.consumeLockMicros << "us "
- << " concurrency=" << s.concurrency);
}
void Core::initialize() {}
@@ -80,4 +79,8 @@ Group& Core::getGroup(size_t hashValue) {
return *groups[hashValue % groups.size()];
}
+Group& Core::getGroup(const std::string& q) {
+ return getGroup(hashof(q));
+}
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h
index c630b4b3f5..d1367cc79e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.h
@@ -76,8 +76,8 @@ class Core
const Settings& getSettings() const { return settings; }
- /** Get group by hash value. */
Group& getGroup(size_t hashValue);
+ Group& getGroup(const std::string& queueName);
private:
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
index 17615fccc8..c6d98856a1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
@@ -18,13 +18,13 @@
* under the License.
*
*/
-#include "Group.h"
#include "Core.h"
#include "EventHandler.h"
-#include "Multicaster.h"
-#include "MessageHolder.h"
+#include "Group.h"
#include "MessageBuilders.h"
-
+#include "MessageHolder.h"
+#include "Multicaster.h"
+#include "Ticker.h"
#include "qpid/broker/Broker.h"
namespace qpid {
@@ -44,7 +44,10 @@ Group::Group(Core& core) :
core.getBroker().getPoller(),
boost::bind(&Core::fatal, &core))),
messageHolder(new MessageHolder()),
- messageBuilders(new MessageBuilders(&core.getBroker().getStore()))
+ messageBuilders(new MessageBuilders(&core.getBroker().getStore())),
+ ticker(new Ticker(core.getSettings().getTick(),
+ core.getBroker().getTimer(),
+ core.getBroker().getPoller()))
{}
Group::~Group() {}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h
index 0bd1fd2277..49b33c6a70 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.h
@@ -39,10 +39,12 @@ class EventHandler;
class Multicaster;
class MessageBuilders;
class MessageHolder;
+class Ticker;
/**
- * A CPG instance with an event handler and a multi-caster,
- * along with all the per-group handler objects.
+ * Resources used by a group of queues. Includes a CPG instance with
+ * an event handler and a multi-caster, along with all the per-group
+ * handler objects and a Ticker.
*/
class Group : public RefCounted
{
@@ -54,6 +56,7 @@ class Group : public RefCounted
Multicaster& getMulticaster() { return *multicaster; }
MessageHolder& getMessageHolder() { return *messageHolder; }
MessageBuilders& getMessageBuilders() { return *messageBuilders; }
+ Ticker& getTicker() { return *ticker; }
void mcast(const framing::AMQBody&);
void mcast(const framing::AMQFrame&);
@@ -62,6 +65,7 @@ class Group : public RefCounted
std::auto_ptr<Multicaster> multicaster;
std::auto_ptr<MessageHolder> messageHolder;
std::auto_ptr<MessageBuilders> messageBuilders;
+ std::auto_ptr<Ticker> ticker;
};
}} // namespace qpid::cluster::exp
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 56800e6b95..ba06ee82f2 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -20,11 +20,12 @@
*
*/
-#include "QueueContext.h"
+#include "BrokerContext.h"
+#include "Group.h"
#include "Multicaster.h"
-#include "qpid/cluster/types.h"
-#include "BrokerContext.h" // for ScopedSuppressReplication
+#include "QueueContext.h"
#include "hash.h"
+#include "qpid/cluster/types.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/ClusterQueueResubscribeBody.h"
#include "qpid/framing/ClusterQueueSubscribeBody.h"
@@ -37,14 +38,14 @@
namespace qpid {
namespace cluster {
-QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m)
- : timer(boost::bind(&QueueContext::timeout, this),
- q.getBroker()->getTimer(),
- consumeLock),
- queue(q), mcast(m), consumers(0), hash(hashof(q.getName()))
+QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_)
+ : consumers(0), consuming(true), ticks(0),
+ queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())),
+ maxTicks(maxTicks_)
{
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
q.stopConsumers(); // Stop queue initially.
+ g.getTicker().add(this);
}
QueueContext::~QueueContext() {}
@@ -54,72 +55,74 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
}
// Called by QueueReplica in CPG deliver thread when state changes.
-void QueueContext::replicaState(
- QueueOwnership before, QueueOwnership after, bool selfDelivered)
+void QueueContext::replicaState(QueueOwnership before, QueueOwnership after)
{
- // No lock, this function does not touch any member variables.
-
- // Invariants for ownership:
- // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
- // SOLE_OWNER <=> timer stopped, queue started
- // SHARED_OWNER <=> timer started, queue started
-
- // Interested in state changes and my own events which lead to
- // ownership.
- if ((before != after || selfDelivered) && isOwner(after)) {
- QPID_LOG(trace, "cluster: start consumers on " << queue.getName() << ", timer "
- << (after==SHARED_OWNER? "start" : "stop"));
- queue.startConsumers();
- if (after == SHARED_OWNER) timer.start();
- else timer.stop();
+ // Interested in state changes which lead to ownership.
+ // We voluntarily give up ownership before multicasting
+ // the state change so we don't need to handle transitions
+ // that lead to non-ownership.
+ if (before != after && isOwner(after)) {
+ bool start = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ start = !consuming;
+ consuming = true;
+ ticks = 0;
+ }
+ if (start) queue.startConsumers();
}
-
- // If we lost ownership then the queue and timer will already have
- // been stopped by timeout()
}
// FIXME aconway 2011-07-27: Dont spin the token on an empty queue.
// Called in broker threads when a consumer is added
void QueueContext::consume(size_t n) {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ }
if (n == 1) mcast.mcast(
framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
}
// Called in broker threads when a consumer is cancelled
void QueueContext::cancel(size_t n) {
- sys::Mutex::ScopedLock l(lock);
- consumers = n;
- // When consuming threads are stopped, this->stopped will be called.
- if (n == 0) {
- QPID_LOG(trace, "cluster: all consumers canceled on " << queue.getName());
- timer.stop();
- queue.stopConsumers();
+ bool stop = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ consumers = n;
+ stop = (n == 0 && consuming);
}
+ if (stop) queue.stopConsumers();
}
-// Called in timer thread.
-void QueueContext::timeout() {
+// Called in Ticker thread.
+void QueueContext::tick() {
+ bool stop = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ stop = (consuming && ++ticks >= maxTicks);
+ }
// When all threads have stopped, queue will call stopped()
- QPID_LOG(trace, "cluster: lock timeout on " << queue.getName());
- queue.stopConsumers();
+ if (stop) queue.stopConsumers();
}
// Callback set up by queue.stopConsumers() called in connection or timer thread.
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, "cluster: stopped consumers, "
- << (consumers == 0 ? "unsubscribe" : "resubscribe")
- << " to " << queue.getName());
- if (consumers == 0)
- mcast.mcast(framing::ClusterQueueUnsubscribeBody(
- framing::ProtocolVersion(), queue.getName()));
- else // FIXME aconway 2011-09-13: check if we're owner?
+ bool resubscribe = false;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ assert(consuming);
+ consuming = false;
+ resubscribe = consumers;
+ }
+ if (resubscribe)
mcast.mcast(framing::ClusterQueueResubscribeBody(
framing::ProtocolVersion(), queue.getName()));
+ else
+ mcast.mcast(framing::ClusterQueueUnsubscribeBody(
+ framing::ProtocolVersion(), queue.getName()));
}
void QueueContext::requeue(uint32_t position, bool redelivered) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 5f2adeae74..20c2aabc1d 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -23,8 +23,9 @@
*/
#include "LockedMap.h"
-#include "CountdownTimer.h"
+#include "Ticker.h"
#include "qpid/RefCounted.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Mutex.h"
#include "qpid/cluster/types.h"
@@ -39,24 +40,24 @@ class QueuedMessage;
namespace cluster {
class Multicaster;
+class Group;
/**
* Queue state that is not replicated to the cluster.
* Manages the local queue start/stop status.
*
- * Thread safe: Called by connection, dispatch and timer threads.
+* THREAD SAFE: Called by connection threads and Ticker dispatch threads.
*/
-class QueueContext : public RefCounted {
+class QueueContext : public Ticker::Tickable {
public:
- QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m);
+ QueueContext(broker::Queue&, Group&, size_t consumeTicks);
~QueueContext();
/** Replica state has changed, called in deliver thread.
* @param before replica state before the event.
* @param before replica state after the event.
- * @param self is true if this was a self-delivered event.
*/
- void replicaState(QueueOwnership before, QueueOwnership after, bool self);
+ void replicaState(QueueOwnership before, QueueOwnership after);
/** Called when queue is stopped, no threads are dispatching.
* May be called in connection or deliver thread.
@@ -73,8 +74,8 @@ class QueueContext : public RefCounted {
*/
void cancel(size_t n);
- /** Called in timer thread when the timer runs out. */
- void timeout();
+ /** Called regularly at the tick interval in an IO thread.*/
+ void tick();
/** Called by MessageHandler to requeue a message. */
void requeue(uint32_t position, bool redelivered);
@@ -93,13 +94,18 @@ class QueueContext : public RefCounted {
private:
sys::Mutex lock;
- CountdownTimer timer;
+ size_t consumers; // Number of local consumers
+ bool consuming; // True if we have the lock & local consumers are active
+ size_t ticks; // Ticks since we got the lock.
+
+ // Following members are immutable
broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
Multicaster& mcast;
- size_t consumers;
size_t hash;
+ size_t maxTicks; // Max ticks we are allowed.
- typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap;
+ // Following members are safe to use without holding a lock
+ typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap;
UnackedMap unacked;
};
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 4e5820e295..0c96e9326d 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -24,6 +24,7 @@
#include "QueueContext.h"
#include "QueueHandler.h"
#include "QueueReplica.h"
+#include "Settings.h"
#include "qpid/Exception.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
@@ -33,10 +34,8 @@
namespace qpid {
namespace cluster {
-QueueHandler::QueueHandler(Group& g, const Settings& s)
- : HandlerBase(g.getEventHandler()),
- multicaster(g.getMulticaster()),
- consumeLock(s.getConsumeLock())
+QueueHandler::QueueHandler(Group& g, Settings& s)
+ : HandlerBase(g.getEventHandler()), group(g), consumeTicks(s.consumeTicks)
{}
bool QueueHandler::handle(const framing::AMQFrame& frame) {
@@ -62,7 +61,7 @@ void QueueHandler::left(const MemberId& member) {
void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
// Local queues already have a context, remote queues need one.
if (!QueueContext::get(*q))
- new QueueContext(*q, consumeLock, multicaster); // Context attaches to the Queue
+ new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue
queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
new QueueReplica(q, self()));
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
index 053127e428..84e8b75cfb 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
@@ -23,7 +23,6 @@
*/
#include "HandlerBase.h"
-#include "Settings.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "boost/shared_ptr.hpp"
#include "boost/intrusive_ptr.hpp"
@@ -42,6 +41,7 @@ class EventHandler;
class QueueReplica;
class Multicaster;
class Group;
+class Settings;
/**
* Handler for queue subscription events.
@@ -54,7 +54,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
public HandlerBase
{
public:
- QueueHandler(Group&, const Settings&);
+ QueueHandler(Group&, Settings&);
bool handle(const framing::AMQFrame& body);
@@ -76,8 +76,8 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
boost::intrusive_ptr<QueueReplica> find(const std::string& queue);
QueueMap queues;
- Multicaster& multicaster;
- sys::Duration consumeLock;
+ Group& group;
+ size_t consumeTicks;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 5a3c16c00c..11a7496582 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -53,7 +53,7 @@ std::ostream& operator<<(std::ostream& o, QueueOwnership s) {
void QueueReplica::subscribe(const MemberId& member) {
QueueOwnership before = getState();
subscribers.push_back(member);
- update(before, member);
+ update(before);
}
// FIXME aconway 2011-09-20: need to requeue.
@@ -61,7 +61,7 @@ void QueueReplica::unsubscribe(const MemberId& member) {
QueueOwnership before = getState();
MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
if (i != subscribers.end()) subscribers.erase(i, subscribers.end());
- update(before, member);
+ update(before);
}
void QueueReplica::resubscribe(const MemberId& member) {
@@ -69,14 +69,14 @@ void QueueReplica::resubscribe(const MemberId& member) {
QueueOwnership before = getState();
subscribers.pop_front();
subscribers.push_back(member);
- update(before, member);
+ update(before);
}
-void QueueReplica::update(QueueOwnership before, MemberId member) {
+void QueueReplica::update(QueueOwnership before) {
QueueOwnership after = getState();
QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": "
<< before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
- context->replicaState(before, after, member == self);
+ context->replicaState(before, after);
}
QueueOwnership QueueReplica::getState() const {
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
index ee93727ca9..31faf4853a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
@@ -68,7 +68,7 @@ class QueueReplica : public RefCounted
QueueOwnership getState() const;
bool isOwner() const;
bool isSubscriber(const MemberId&) const;
- void update(QueueOwnership before, MemberId from);
+ void update(QueueOwnership before);
friend struct PrintSubscribers;
friend std::ostream& operator<<(std::ostream&, QueueOwnership);
diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
index c3499e58be..4c85dc68e6 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
@@ -26,7 +26,8 @@ namespace qpid {
namespace cluster {
Settings::Settings() : // Default settings
- consumeLockMicros(10000),
+ tick(10000), // FIXME aconway 2011-11-03: smaller default
+ consumeTicks(2),
concurrency(sys::SystemInfo::concurrency() + 1)
{}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.h b/qpid/cpp/src/qpid/cluster/exp/Settings.h
index 1ce3c808ea..ebdddccd26 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Settings.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Settings.h
@@ -34,10 +34,11 @@ namespace cluster {
struct Settings {
Settings();
std::string name;
- uint32_t consumeLockMicros;
+ uint32_t tick;
+ uint32_t consumeTicks;
uint32_t concurrency;
- sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; }
+ sys::Duration getTick() const { return tick * sys::TIME_USEC; }
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
new file mode 100644
index 0000000000..9ff04f2f54
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Ticker.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace cluster {
+
+Ticker::Tickable::~Tickable() {}
+
+Ticker::Ticker(sys::Duration tick, sys::Timer& timer_,
+ boost::shared_ptr<sys::Poller> poller)
+ : sys::TimerTask(tick, "Cluster ticker"), timer(timer_),
+ condition(boost::bind(&Ticker::dispatch, this, _1), poller)
+{
+ timer.add(this);
+}
+
+void Ticker::add(boost::intrusive_ptr<Tickable> t) {
+ sys::Mutex::ScopedLock l(lock);
+ tickables.push_back(t);
+}
+
+void Ticker::remove(boost::intrusive_ptr<Tickable> t) {
+ sys::Mutex::ScopedLock l(lock);
+ Tickables::iterator i = std::find(tickables.begin(), tickables.end(), t);
+ if (i != tickables.end()) tickables.erase(i);
+}
+
+// Called by timer thread, sets condition
+void Ticker::fire() {
+ condition.set();
+ setupNextFire();
+ timer.add(this);
+}
+
+// Called only in condition IO thread.
+void Ticker::dispatch(sys::PollableCondition& cond) {
+ assert(&cond == &condition);
+ {
+ sys::Mutex::ScopedLock l(lock);
+ working = tickables;
+ }
+ // This is safe outside the lock see comment in Ticker.h
+ for(Tickables::iterator i = working.begin(); i!= working.end(); ++i)
+ (*i)->tick();
+ condition.clear(); // Ready for next tick.
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.h b/qpid/cpp/src/qpid/cluster/exp/Ticker.h
new file mode 100644
index 0000000000..0a8d508a70
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.h
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_EXP_TICKER_H
+#define QPID_CLUSTER_EXP_TICKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/PollableCondition.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+
+namespace sys {
+class Poller;
+}
+
+namespace cluster {
+
+/**
+ * Generate regular calls to QueueContext::tick.
+ * Work of caling tick is not done in the timer thread.
+ * The timer task triggers a PollableCondition, which calls the ticks.
+ *
+ * THREAD SAFE: add/remove are called in connection or deliver
+ * threads, fire is called in timer thread and tick is called in the
+ * IO thread for the PollableCondition.
+ */
+class Ticker : public sys::TimerTask
+{
+ public:
+ struct Tickable : public RefCounted {
+ virtual ~Tickable();
+ virtual void tick() = 0;
+ };
+
+ Ticker(sys::Duration tick, sys::Timer&, boost::shared_ptr<sys::Poller>);
+
+ void add(boost::intrusive_ptr<Tickable>);
+ void remove(boost::intrusive_ptr<Tickable>);
+
+ private:
+ typedef std::vector<boost::intrusive_ptr<Tickable> > Tickables;
+
+ void fire(); // Called in timer thread.
+ void dispatch(sys::PollableCondition&); // Called in IO thread
+
+ sys::Timer& timer;
+ sys::PollableCondition condition;
+
+ sys::Mutex lock;
+ Tickables tickables;
+
+ // Only accessed in the condition IO thread so no lock needed.
+ // This is a member to keep memory allocated by the vector and
+ // avoid re-allocation each time
+ Tickables working;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_TICKER_H*/