diff options
author | Alan Conway <aconway@apache.org> | 2011-10-05 21:45:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-10-05 21:45:45 +0000 |
commit | 7dec096a62550ac85415564a58dba25d0627011b (patch) | |
tree | 1a23e1f79399061464e67a661522c718e7abe2da | |
parent | 291af20b7e953748c2f6d925b3d1e8286e939b96 (diff) | |
download | qpid-python-7dec096a62550ac85415564a58dba25d0627011b.tar.gz |
QPID-2920: Configurable control over concurrency
- new config option controls number of CPG groups used.
- minor log message improvements
- minor code clean-up
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1179456 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cpg.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Core.cpp | 35 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/EventHandler.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueHandler.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Settings.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Settings.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/WiringHandler.h | 4 |
11 files changed, 46 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 0856bcd824..59c3d0714c 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -126,7 +126,7 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow callbacks.cpg_deliver_fn = &globalDeliver; callbacks.cpg_confchg_fn = &globalConfigChange; - QPID_LOG(notice, "Initializing CPG"); + QPID_LOG(debug, "Initializing CPG"); cpg_error_t err = cpg_initialize(&handle, &callbacks); int retries = 6; // FIXME aconway 2009-08-06: make this configurable. while (err == CPG_ERR_TRY_AGAIN && --retries) { diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp index cc8e064627..ea2efa5233 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp @@ -35,8 +35,10 @@ struct Cluster2Plugin : public Plugin { Opts(Settings& s) : Options("Cluster Options"), settings(s) { addOptions() ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join") - ("consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds."); - // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h + ("cluster2-consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.") + ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver."); + // FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h. + // FIXME aconway 2011-10-05: rename to final option names. } }; diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index f4c5a5c88f..bde27fddd6 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -39,35 +39,36 @@ namespace cluster { Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s) { - // FIXME aconway 2011-09-26: this has to be consistent in a + // FIXME aconway 2011-09-26: S.concurrency has to be consistent in a // cluster, negotiate as part of join protocol. - size_t nGroups = broker.getOptions().workerThreads; + uint32_t nGroups = s.concurrency ? s.concurrency : 1; for (size_t i = 0; i < nGroups; ++i) { - // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol. + // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol? std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i); groups.push_back(new Group(*this)); boost::intrusive_ptr<Group> group(groups.back()); - // FIXME aconway 2011-10-03: clean up, all Handler ctors take Group. - boost::intrusive_ptr<QueueHandler> queueHandler( - new QueueHandler(group->getEventHandler(), group->getMulticaster(), settings)); - group->getEventHandler().add(queueHandler); - group->getEventHandler().add( - boost::intrusive_ptr<HandlerBase>( - new WiringHandler(group->getEventHandler(), queueHandler, broker))); - group->getEventHandler().add( - boost::intrusive_ptr<HandlerBase>(new MessageHandler(*group, *this))); + + EventHandler& eh(group->getEventHandler()); + typedef boost::intrusive_ptr<HandlerBase> HandlerBasePtr; + boost::intrusive_ptr<QueueHandler> queueHandler(new QueueHandler(*group, settings)); + eh.add(queueHandler); + eh.add(HandlerBasePtr(new WiringHandler(*group, queueHandler, broker))); + eh.add(HandlerBasePtr(new MessageHandler(*group, *this))); std::auto_ptr<BrokerContext> bh(new BrokerContext(*this)); brokerHandler = bh.get(); // BrokerContext belongs to Broker broker.setCluster(std::auto_ptr<broker::Cluster>(bh)); // FIXME aconway 2011-09-26: multi-group - group->getEventHandler().start(); - group->getEventHandler().getCpg().join(groupName); - // TODO aconway 2010-11-18: logging standards - // FIXME aconway 2011-09-26: multi-group - QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< group->getEventHandler().getSelf()); + eh.start(); + eh.getCpg().join(groupName); + // TODO aconway 2010-11-18: logging standards // FIXME aconway 2011-09-26: multi-group + QPID_LOG(debug, "cluster: joined CPG group " << groupName << ", member-id=" << eh.getSelf()); } + QPID_LOG(notice, "cluster: joined cluster " << s.name + << ", member-id="<< groups[0]->getEventHandler().getSelf()); + QPID_LOG(debug, "cluster: consume-lock=" << s.consumeLockMicros << "us " + << " concurrency=" << s.concurrency); } void Core::initialize() {} diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h index cc7caaac89..9f9ae1a856 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h @@ -34,6 +34,7 @@ namespace qpid { namespace framing { class AMQBody; +class AMQFrame; } namespace cluster { diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp index 4ffc780125..1125d994d1 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp @@ -41,7 +41,7 @@ bool MessageHolder::check(const framing::AMQFrame& frame, assert(i != messages.end()); msgOut = i->second.first; queueOut = i->second.second; - messages.erase(frame.getChannel()); // re-use the frame. + messages.erase(frame.getChannel()); // re-use the channel. return true; } return false; diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp index 50638b9cb3..a3575ca257 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -19,22 +19,25 @@ * */ -#include "QueueHandler.h" #include "EventHandler.h" -#include "QueueReplica.h" +#include "Group.h" #include "QueueContext.h" +#include "QueueHandler.h" +#include "QueueReplica.h" +#include "qpid/Exception.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/Exception.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { -// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up? -QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s) - : HandlerBase(eh), multicaster(m), consumeLock(s.getConsumeLock()) {} +QueueHandler::QueueHandler(Group& g, const Settings& s) + : HandlerBase(g.getEventHandler()), + multicaster(g.getMulticaster()), + consumeLock(s.getConsumeLock()) +{} bool QueueHandler::handle(const framing::AMQFrame& frame) { return framing::invoke(*this, *frame.getBody()).wasHandled(); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h index 6bf6e66eec..a4898e9a4c 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h @@ -42,6 +42,7 @@ namespace cluster { class EventHandler; class QueueReplica; class Multicaster; +class Group; /** * Handler for queue subscription events. @@ -54,7 +55,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, public HandlerBase { public: - QueueHandler(EventHandler&, Multicaster&, const Settings&); + QueueHandler(Group&, const Settings&); bool handle(const framing::AMQFrame& body); diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp index 6c572849be..c3499e58be 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp @@ -20,12 +20,14 @@ */ #include "Settings.h" +#include "qpid/sys/SystemInfo.h" namespace qpid { namespace cluster { Settings::Settings() : // Default settings - consumeLockMicros(10000) + consumeLockMicros(10000), + concurrency(sys::SystemInfo::concurrency() + 1) {} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.h b/qpid/cpp/src/qpid/cluster/exp/Settings.h index 9d3f5990ac..1ce3c808ea 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Settings.h +++ b/qpid/cpp/src/qpid/cluster/exp/Settings.h @@ -35,6 +35,8 @@ struct Settings { Settings(); std::string name; uint32_t consumeLockMicros; + uint32_t concurrency; + sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; } }; diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp index b96ae0b30f..ab1af35a72 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -20,6 +20,7 @@ */ #include "Core.h" +#include "Group.h" #include "WiringHandler.h" #include "EventHandler.h" #include "QueueHandler.h" @@ -40,10 +41,10 @@ namespace cluster { using namespace broker; using framing::FieldTable; -WiringHandler::WiringHandler(EventHandler& e, +WiringHandler::WiringHandler(Group& g, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker& b) : - HandlerBase(e), + HandlerBase(g.getEventHandler()), broker(b), recovery(broker.getQueues(), broker.getExchanges(), broker.getLinks(), broker.getDtxManager()), diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h index 7a07c7098e..4ea109fe64 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h @@ -41,7 +41,7 @@ class Broker; } namespace cluster { -class EventHandler; +class Group; class QueueHandler; /** @@ -51,7 +51,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, public HandlerBase { public: - WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&); + WiringHandler(Group&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&); bool handle(const framing::AMQFrame&); |