diff options
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Core.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Core.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Group.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Group.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 101 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueHandler.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueReplica.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Settings.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Settings.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Ticker.cpp | 68 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Ticker.h | 86 |
17 files changed, 273 insertions, 97 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index e165ba405e..9943b3d2b5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -19,11 +19,11 @@ * */ -#include "Core.h" #include "BrokerContext.h" -#include "QueueContext.h" -#include "Multicaster.h" +#include "Core.h" #include "MessageHolder.h" +#include "Multicaster.h" +#include "QueueContext.h" #include "hash.h" #include "qpid/framing/ClusterMessageEnqueueBody.h" #include "qpid/framing/ClusterMessageAcquireBody.h" @@ -136,7 +136,7 @@ void BrokerContext::create(broker::Queue& q) { if (!tssReplicate) return; assert(!QueueContext::get(q)); boost::intrusive_ptr<QueueContext> context( - new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName()))); + new QueueContext(q, core.getGroup(q.getName()), core.getSettings().consumeTicks)); std::string data(q.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); q.encode(buf); diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h index f5cb401c51..ff3bb2085c 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -29,6 +29,7 @@ namespace qpid { namespace cluster { class Core; class QueueContext; +class Multicaster; // TODO aconway 2010-10-19: experimental cluster code. diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp index ea2efa5233..68b9d5075b 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp @@ -35,8 +35,9 @@ struct Cluster2Plugin : public Plugin { Opts(Settings& s) : Options("Cluster Options"), settings(s) { addOptions() ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join") - ("cluster2-consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.") - ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver."); + ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.") + ("cluster2-tick", optValue(settings.tick, "uS"), "Length of 'tick' used for timing events in the cluster.") + ("cluster2-consume-ticks", optValue(settings.consumeTicks, "N"), "Maximum number of ticks a broker can hold the consume lock on a shared queue."); // FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h. // FIXME aconway 2011-10-05: rename to final option names. } diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index d3ee6133c9..5c89d3ff88 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -19,13 +19,14 @@ * */ +#include "BrokerContext.h" #include "Core.h" #include "EventHandler.h" -#include "BrokerContext.h" -#include "WiringHandler.h" #include "MessageHandler.h" #include "QueueContext.h" #include "QueueHandler.h" +#include "WiringHandler.h" +#include "hash.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SignalHandler.h" #include "qpid/framing/AMQFrame.h" @@ -47,7 +48,7 @@ Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s) std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i); groups.push_back(new Group(*this)); boost::intrusive_ptr<Group> group(groups.back()); - + EventHandler& eh(group->getEventHandler()); typedef boost::intrusive_ptr<HandlerBase> HandlerBasePtr; boost::intrusive_ptr<QueueHandler> queueHandler(new QueueHandler(*group, settings)); @@ -66,8 +67,6 @@ Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s) } QPID_LOG(notice, "cluster: joined cluster " << s.name << ", member-id="<< groups[0]->getEventHandler().getSelf()); - QPID_LOG(debug, "cluster: consume-lock=" << s.consumeLockMicros << "us " - << " concurrency=" << s.concurrency); } void Core::initialize() {} @@ -80,4 +79,8 @@ Group& Core::getGroup(size_t hashValue) { return *groups[hashValue % groups.size()]; } +Group& Core::getGroup(const std::string& q) { + return getGroup(hashof(q)); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h index c630b4b3f5..d1367cc79e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.h +++ b/qpid/cpp/src/qpid/cluster/exp/Core.h @@ -76,8 +76,8 @@ class Core const Settings& getSettings() const { return settings; } - /** Get group by hash value. */ Group& getGroup(size_t hashValue); + Group& getGroup(const std::string& queueName); private: broker::Broker& broker; diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp index 17615fccc8..c6d98856a1 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Group.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp @@ -18,13 +18,13 @@ * under the License. * */ -#include "Group.h" #include "Core.h" #include "EventHandler.h" -#include "Multicaster.h" -#include "MessageHolder.h" +#include "Group.h" #include "MessageBuilders.h" - +#include "MessageHolder.h" +#include "Multicaster.h" +#include "Ticker.h" #include "qpid/broker/Broker.h" namespace qpid { @@ -44,7 +44,10 @@ Group::Group(Core& core) : core.getBroker().getPoller(), boost::bind(&Core::fatal, &core))), messageHolder(new MessageHolder()), - messageBuilders(new MessageBuilders(&core.getBroker().getStore())) + messageBuilders(new MessageBuilders(&core.getBroker().getStore())), + ticker(new Ticker(core.getSettings().getTick(), + core.getBroker().getTimer(), + core.getBroker().getPoller())) {} Group::~Group() {} diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h index 0bd1fd2277..49b33c6a70 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Group.h +++ b/qpid/cpp/src/qpid/cluster/exp/Group.h @@ -39,10 +39,12 @@ class EventHandler; class Multicaster; class MessageBuilders; class MessageHolder; +class Ticker; /** - * A CPG instance with an event handler and a multi-caster, - * along with all the per-group handler objects. + * Resources used by a group of queues. Includes a CPG instance with + * an event handler and a multi-caster, along with all the per-group + * handler objects and a Ticker. */ class Group : public RefCounted { @@ -54,6 +56,7 @@ class Group : public RefCounted Multicaster& getMulticaster() { return *multicaster; } MessageHolder& getMessageHolder() { return *messageHolder; } MessageBuilders& getMessageBuilders() { return *messageBuilders; } + Ticker& getTicker() { return *ticker; } void mcast(const framing::AMQBody&); void mcast(const framing::AMQFrame&); @@ -62,6 +65,7 @@ class Group : public RefCounted std::auto_ptr<Multicaster> multicaster; std::auto_ptr<MessageHolder> messageHolder; std::auto_ptr<MessageBuilders> messageBuilders; + std::auto_ptr<Ticker> ticker; }; }} // namespace qpid::cluster::exp diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 56800e6b95..ba06ee82f2 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -20,11 +20,12 @@ * */ -#include "QueueContext.h" +#include "BrokerContext.h" +#include "Group.h" #include "Multicaster.h" -#include "qpid/cluster/types.h" -#include "BrokerContext.h" // for ScopedSuppressReplication +#include "QueueContext.h" #include "hash.h" +#include "qpid/cluster/types.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/ClusterQueueResubscribeBody.h" #include "qpid/framing/ClusterQueueSubscribeBody.h" @@ -37,14 +38,14 @@ namespace qpid { namespace cluster { -QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m) - : timer(boost::bind(&QueueContext::timeout, this), - q.getBroker()->getTimer(), - consumeLock), - queue(q), mcast(m), consumers(0), hash(hashof(q.getName())) +QueueContext::QueueContext(broker::Queue& q, Group& g, size_t maxTicks_) + : consumers(0), consuming(true), ticks(0), + queue(q), mcast(g.getMulticaster()), hash(hashof(q.getName())), + maxTicks(maxTicks_) { q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); q.stopConsumers(); // Stop queue initially. + g.getTicker().add(this); } QueueContext::~QueueContext() {} @@ -54,72 +55,74 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } } // Called by QueueReplica in CPG deliver thread when state changes. -void QueueContext::replicaState( - QueueOwnership before, QueueOwnership after, bool selfDelivered) +void QueueContext::replicaState(QueueOwnership before, QueueOwnership after) { - // No lock, this function does not touch any member variables. - - // Invariants for ownership: - // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped - // SOLE_OWNER <=> timer stopped, queue started - // SHARED_OWNER <=> timer started, queue started - - // Interested in state changes and my own events which lead to - // ownership. - if ((before != after || selfDelivered) && isOwner(after)) { - QPID_LOG(trace, "cluster: start consumers on " << queue.getName() << ", timer " - << (after==SHARED_OWNER? "start" : "stop")); - queue.startConsumers(); - if (after == SHARED_OWNER) timer.start(); - else timer.stop(); + // Interested in state changes which lead to ownership. + // We voluntarily give up ownership before multicasting + // the state change so we don't need to handle transitions + // that lead to non-ownership. + if (before != after && isOwner(after)) { + bool start = false; + { + sys::Mutex::ScopedLock l(lock); + start = !consuming; + consuming = true; + ticks = 0; + } + if (start) queue.startConsumers(); } - - // If we lost ownership then the queue and timer will already have - // been stopped by timeout() } // FIXME aconway 2011-07-27: Dont spin the token on an empty queue. // Called in broker threads when a consumer is added void QueueContext::consume(size_t n) { - sys::Mutex::ScopedLock l(lock); - consumers = n; + { + sys::Mutex::ScopedLock l(lock); + consumers = n; + } if (n == 1) mcast.mcast( framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName())); } // Called in broker threads when a consumer is cancelled void QueueContext::cancel(size_t n) { - sys::Mutex::ScopedLock l(lock); - consumers = n; - // When consuming threads are stopped, this->stopped will be called. - if (n == 0) { - QPID_LOG(trace, "cluster: all consumers canceled on " << queue.getName()); - timer.stop(); - queue.stopConsumers(); + bool stop = false; + { + sys::Mutex::ScopedLock l(lock); + consumers = n; + stop = (n == 0 && consuming); } + if (stop) queue.stopConsumers(); } -// Called in timer thread. -void QueueContext::timeout() { +// Called in Ticker thread. +void QueueContext::tick() { + bool stop = false; + { + sys::Mutex::ScopedLock l(lock); + stop = (consuming && ++ticks >= maxTicks); + } // When all threads have stopped, queue will call stopped() - QPID_LOG(trace, "cluster: lock timeout on " << queue.getName()); - queue.stopConsumers(); + if (stop) queue.stopConsumers(); } // Callback set up by queue.stopConsumers() called in connection or timer thread. // Called when no threads are dispatching from the queue. void QueueContext::stopped() { - sys::Mutex::ScopedLock l(lock); - QPID_LOG(trace, "cluster: stopped consumers, " - << (consumers == 0 ? "unsubscribe" : "resubscribe") - << " to " << queue.getName()); - if (consumers == 0) - mcast.mcast(framing::ClusterQueueUnsubscribeBody( - framing::ProtocolVersion(), queue.getName())); - else // FIXME aconway 2011-09-13: check if we're owner? + bool resubscribe = false; + { + sys::Mutex::ScopedLock l(lock); + assert(consuming); + consuming = false; + resubscribe = consumers; + } + if (resubscribe) mcast.mcast(framing::ClusterQueueResubscribeBody( framing::ProtocolVersion(), queue.getName())); + else + mcast.mcast(framing::ClusterQueueUnsubscribeBody( + framing::ProtocolVersion(), queue.getName())); } void QueueContext::requeue(uint32_t position, bool redelivered) { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index 5f2adeae74..20c2aabc1d 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -23,8 +23,9 @@ */ #include "LockedMap.h" -#include "CountdownTimer.h" +#include "Ticker.h" #include "qpid/RefCounted.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Time.h" #include "qpid/sys/Mutex.h" #include "qpid/cluster/types.h" @@ -39,24 +40,24 @@ class QueuedMessage; namespace cluster { class Multicaster; +class Group; /** * Queue state that is not replicated to the cluster. * Manages the local queue start/stop status. * - * Thread safe: Called by connection, dispatch and timer threads. +* THREAD SAFE: Called by connection threads and Ticker dispatch threads. */ -class QueueContext : public RefCounted { +class QueueContext : public Ticker::Tickable { public: - QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m); + QueueContext(broker::Queue&, Group&, size_t consumeTicks); ~QueueContext(); /** Replica state has changed, called in deliver thread. * @param before replica state before the event. * @param before replica state after the event. - * @param self is true if this was a self-delivered event. */ - void replicaState(QueueOwnership before, QueueOwnership after, bool self); + void replicaState(QueueOwnership before, QueueOwnership after); /** Called when queue is stopped, no threads are dispatching. * May be called in connection or deliver thread. @@ -73,8 +74,8 @@ class QueueContext : public RefCounted { */ void cancel(size_t n); - /** Called in timer thread when the timer runs out. */ - void timeout(); + /** Called regularly at the tick interval in an IO thread.*/ + void tick(); /** Called by MessageHandler to requeue a message. */ void requeue(uint32_t position, bool redelivered); @@ -93,13 +94,18 @@ class QueueContext : public RefCounted { private: sys::Mutex lock; - CountdownTimer timer; + size_t consumers; // Number of local consumers + bool consuming; // True if we have the lock & local consumers are active + size_t ticks; // Ticks since we got the lock. + + // Following members are immutable broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? Multicaster& mcast; - size_t consumers; size_t hash; + size_t maxTicks; // Max ticks we are allowed. - typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; + // Following members are safe to use without holding a lock + typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; UnackedMap unacked; }; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp index 4e5820e295..0c96e9326d 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -24,6 +24,7 @@ #include "QueueContext.h" #include "QueueHandler.h" #include "QueueReplica.h" +#include "Settings.h" #include "qpid/Exception.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" @@ -33,10 +34,8 @@ namespace qpid { namespace cluster { -QueueHandler::QueueHandler(Group& g, const Settings& s) - : HandlerBase(g.getEventHandler()), - multicaster(g.getMulticaster()), - consumeLock(s.getConsumeLock()) +QueueHandler::QueueHandler(Group& g, Settings& s) + : HandlerBase(g.getEventHandler()), group(g), consumeTicks(s.consumeTicks) {} bool QueueHandler::handle(const framing::AMQFrame& frame) { @@ -62,7 +61,7 @@ void QueueHandler::left(const MemberId& member) { void QueueHandler::add(boost::shared_ptr<broker::Queue> q) { // Local queues already have a context, remote queues need one. if (!QueueContext::get(*q)) - new QueueContext(*q, consumeLock, multicaster); // Context attaches to the Queue + new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue queues[q->getName()] = boost::intrusive_ptr<QueueReplica>( new QueueReplica(q, self())); } diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h index 053127e428..84e8b75cfb 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h @@ -23,7 +23,6 @@ */ #include "HandlerBase.h" -#include "Settings.h" #include "qpid/framing/AMQP_AllOperations.h" #include "boost/shared_ptr.hpp" #include "boost/intrusive_ptr.hpp" @@ -42,6 +41,7 @@ class EventHandler; class QueueReplica; class Multicaster; class Group; +class Settings; /** * Handler for queue subscription events. @@ -54,7 +54,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, public HandlerBase { public: - QueueHandler(Group&, const Settings&); + QueueHandler(Group&, Settings&); bool handle(const framing::AMQFrame& body); @@ -76,8 +76,8 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, boost::intrusive_ptr<QueueReplica> find(const std::string& queue); QueueMap queues; - Multicaster& multicaster; - sys::Duration consumeLock; + Group& group; + size_t consumeTicks; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp index 5a3c16c00c..11a7496582 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -53,7 +53,7 @@ std::ostream& operator<<(std::ostream& o, QueueOwnership s) { void QueueReplica::subscribe(const MemberId& member) { QueueOwnership before = getState(); subscribers.push_back(member); - update(before, member); + update(before); } // FIXME aconway 2011-09-20: need to requeue. @@ -61,7 +61,7 @@ void QueueReplica::unsubscribe(const MemberId& member) { QueueOwnership before = getState(); MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member); if (i != subscribers.end()) subscribers.erase(i, subscribers.end()); - update(before, member); + update(before); } void QueueReplica::resubscribe(const MemberId& member) { @@ -69,14 +69,14 @@ void QueueReplica::resubscribe(const MemberId& member) { QueueOwnership before = getState(); subscribers.pop_front(); subscribers.push_back(member); - update(before, member); + update(before); } -void QueueReplica::update(QueueOwnership before, MemberId member) { +void QueueReplica::update(QueueOwnership before) { QueueOwnership after = getState(); QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": " << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]"); - context->replicaState(before, after, member == self); + context->replicaState(before, after); } QueueOwnership QueueReplica::getState() const { diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h index ee93727ca9..31faf4853a 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h @@ -68,7 +68,7 @@ class QueueReplica : public RefCounted QueueOwnership getState() const; bool isOwner() const; bool isSubscriber(const MemberId&) const; - void update(QueueOwnership before, MemberId from); + void update(QueueOwnership before); friend struct PrintSubscribers; friend std::ostream& operator<<(std::ostream&, QueueOwnership); diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp index c3499e58be..4c85dc68e6 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp @@ -26,7 +26,8 @@ namespace qpid { namespace cluster { Settings::Settings() : // Default settings - consumeLockMicros(10000), + tick(10000), // FIXME aconway 2011-11-03: smaller default + consumeTicks(2), concurrency(sys::SystemInfo::concurrency() + 1) {} diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.h b/qpid/cpp/src/qpid/cluster/exp/Settings.h index 1ce3c808ea..ebdddccd26 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Settings.h +++ b/qpid/cpp/src/qpid/cluster/exp/Settings.h @@ -34,10 +34,11 @@ namespace cluster { struct Settings { Settings(); std::string name; - uint32_t consumeLockMicros; + uint32_t tick; + uint32_t consumeTicks; uint32_t concurrency; - sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; } + sys::Duration getTick() const { return tick * sys::TIME_USEC; } }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp new file mode 100644 index 0000000000..9ff04f2f54 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.cpp @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Ticker.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace cluster { + +Ticker::Tickable::~Tickable() {} + +Ticker::Ticker(sys::Duration tick, sys::Timer& timer_, + boost::shared_ptr<sys::Poller> poller) + : sys::TimerTask(tick, "Cluster ticker"), timer(timer_), + condition(boost::bind(&Ticker::dispatch, this, _1), poller) +{ + timer.add(this); +} + +void Ticker::add(boost::intrusive_ptr<Tickable> t) { + sys::Mutex::ScopedLock l(lock); + tickables.push_back(t); +} + +void Ticker::remove(boost::intrusive_ptr<Tickable> t) { + sys::Mutex::ScopedLock l(lock); + Tickables::iterator i = std::find(tickables.begin(), tickables.end(), t); + if (i != tickables.end()) tickables.erase(i); +} + +// Called by timer thread, sets condition +void Ticker::fire() { + condition.set(); + setupNextFire(); + timer.add(this); +} + +// Called only in condition IO thread. +void Ticker::dispatch(sys::PollableCondition& cond) { + assert(&cond == &condition); + { + sys::Mutex::ScopedLock l(lock); + working = tickables; + } + // This is safe outside the lock see comment in Ticker.h + for(Tickables::iterator i = working.begin(); i!= working.end(); ++i) + (*i)->tick(); + condition.clear(); // Ready for next tick. +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Ticker.h b/qpid/cpp/src/qpid/cluster/exp/Ticker.h new file mode 100644 index 0000000000..0a8d508a70 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Ticker.h @@ -0,0 +1,86 @@ +#ifndef QPID_CLUSTER_EXP_TICKER_H +#define QPID_CLUSTER_EXP_TICKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "qpid/RefCounted.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> +#include <vector> + +namespace qpid { + +namespace sys { +class Poller; +} + +namespace cluster { + +/** + * Generate regular calls to QueueContext::tick. + * Work of caling tick is not done in the timer thread. + * The timer task triggers a PollableCondition, which calls the ticks. + * + * THREAD SAFE: add/remove are called in connection or deliver + * threads, fire is called in timer thread and tick is called in the + * IO thread for the PollableCondition. + */ +class Ticker : public sys::TimerTask +{ + public: + struct Tickable : public RefCounted { + virtual ~Tickable(); + virtual void tick() = 0; + }; + + Ticker(sys::Duration tick, sys::Timer&, boost::shared_ptr<sys::Poller>); + + void add(boost::intrusive_ptr<Tickable>); + void remove(boost::intrusive_ptr<Tickable>); + + private: + typedef std::vector<boost::intrusive_ptr<Tickable> > Tickables; + + void fire(); // Called in timer thread. + void dispatch(sys::PollableCondition&); // Called in IO thread + + sys::Timer& timer; + sys::PollableCondition condition; + + sys::Mutex lock; + Tickables tickables; + + // Only accessed in the condition IO thread so no lock needed. + // This is a member to keep memory allocated by the vector and + // avoid re-allocation each time + Tickables working; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_TICKER_H*/ |