summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-10-05 21:45:45 +0000
committerAlan Conway <aconway@apache.org>2011-10-05 21:45:45 +0000
commit7dec096a62550ac85415564a58dba25d0627011b (patch)
tree1a23e1f79399061464e67a661522c718e7abe2da
parent291af20b7e953748c2f6d925b3d1e8286e939b96 (diff)
downloadqpid-python-7dec096a62550ac85415564a58dba25d0627011b.tar.gz
QPID-2920: Configurable control over concurrency
- new config option controls number of CPG groups used. - minor log message improvements - minor code clean-up git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1179456 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp35
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp15
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Settings.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Settings.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.h4
11 files changed, 46 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 0856bcd824..59c3d0714c 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -126,7 +126,7 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow
callbacks.cpg_deliver_fn = &globalDeliver;
callbacks.cpg_confchg_fn = &globalConfigChange;
- QPID_LOG(notice, "Initializing CPG");
+ QPID_LOG(debug, "Initializing CPG");
cpg_error_t err = cpg_initialize(&handle, &callbacks);
int retries = 6; // FIXME aconway 2009-08-06: make this configurable.
while (err == CPG_ERR_TRY_AGAIN && --retries) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
index cc8e064627..ea2efa5233 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
@@ -35,8 +35,10 @@ struct Cluster2Plugin : public Plugin {
Opts(Settings& s) : Options("Cluster Options"), settings(s) {
addOptions()
("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
- ("consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.");
- // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+ ("cluster2-consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.")
+ ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.");
+ // FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h.
+ // FIXME aconway 2011-10-05: rename to final option names.
}
};
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index f4c5a5c88f..bde27fddd6 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -39,35 +39,36 @@ namespace cluster {
Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s)
{
- // FIXME aconway 2011-09-26: this has to be consistent in a
+ // FIXME aconway 2011-09-26: S.concurrency has to be consistent in a
// cluster, negotiate as part of join protocol.
- size_t nGroups = broker.getOptions().workerThreads;
+ uint32_t nGroups = s.concurrency ? s.concurrency : 1;
for (size_t i = 0; i < nGroups; ++i) {
- // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol.
+ // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol?
std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i);
groups.push_back(new Group(*this));
boost::intrusive_ptr<Group> group(groups.back());
- // FIXME aconway 2011-10-03: clean up, all Handler ctors take Group.
- boost::intrusive_ptr<QueueHandler> queueHandler(
- new QueueHandler(group->getEventHandler(), group->getMulticaster(), settings));
- group->getEventHandler().add(queueHandler);
- group->getEventHandler().add(
- boost::intrusive_ptr<HandlerBase>(
- new WiringHandler(group->getEventHandler(), queueHandler, broker)));
- group->getEventHandler().add(
- boost::intrusive_ptr<HandlerBase>(new MessageHandler(*group, *this)));
+
+ EventHandler& eh(group->getEventHandler());
+ typedef boost::intrusive_ptr<HandlerBase> HandlerBasePtr;
+ boost::intrusive_ptr<QueueHandler> queueHandler(new QueueHandler(*group, settings));
+ eh.add(queueHandler);
+ eh.add(HandlerBasePtr(new WiringHandler(*group, queueHandler, broker)));
+ eh.add(HandlerBasePtr(new MessageHandler(*group, *this)));
std::auto_ptr<BrokerContext> bh(new BrokerContext(*this));
brokerHandler = bh.get();
// BrokerContext belongs to Broker
broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
// FIXME aconway 2011-09-26: multi-group
- group->getEventHandler().start();
- group->getEventHandler().getCpg().join(groupName);
- // TODO aconway 2010-11-18: logging standards
- // FIXME aconway 2011-09-26: multi-group
- QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< group->getEventHandler().getSelf());
+ eh.start();
+ eh.getCpg().join(groupName);
+ // TODO aconway 2010-11-18: logging standards // FIXME aconway 2011-09-26: multi-group
+ QPID_LOG(debug, "cluster: joined CPG group " << groupName << ", member-id=" << eh.getSelf());
}
+ QPID_LOG(notice, "cluster: joined cluster " << s.name
+ << ", member-id="<< groups[0]->getEventHandler().getSelf());
+ QPID_LOG(debug, "cluster: consume-lock=" << s.consumeLockMicros << "us "
+ << " concurrency=" << s.concurrency);
}
void Core::initialize() {}
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
index cc7caaac89..9f9ae1a856 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
@@ -34,6 +34,7 @@ namespace qpid {
namespace framing {
class AMQBody;
+class AMQFrame;
}
namespace cluster {
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
index 4ffc780125..1125d994d1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
@@ -41,7 +41,7 @@ bool MessageHolder::check(const framing::AMQFrame& frame,
assert(i != messages.end());
msgOut = i->second.first;
queueOut = i->second.second;
- messages.erase(frame.getChannel()); // re-use the frame.
+ messages.erase(frame.getChannel()); // re-use the channel.
return true;
}
return false;
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 50638b9cb3..a3575ca257 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -19,22 +19,25 @@
*
*/
-#include "QueueHandler.h"
#include "EventHandler.h"
-#include "QueueReplica.h"
+#include "Group.h"
#include "QueueContext.h"
+#include "QueueHandler.h"
+#include "QueueReplica.h"
+#include "qpid/Exception.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up?
-QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s)
- : HandlerBase(eh), multicaster(m), consumeLock(s.getConsumeLock()) {}
+QueueHandler::QueueHandler(Group& g, const Settings& s)
+ : HandlerBase(g.getEventHandler()),
+ multicaster(g.getMulticaster()),
+ consumeLock(s.getConsumeLock())
+{}
bool QueueHandler::handle(const framing::AMQFrame& frame) {
return framing::invoke(*this, *frame.getBody()).wasHandled();
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
index 6bf6e66eec..a4898e9a4c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
@@ -42,6 +42,7 @@ namespace cluster {
class EventHandler;
class QueueReplica;
class Multicaster;
+class Group;
/**
* Handler for queue subscription events.
@@ -54,7 +55,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
public HandlerBase
{
public:
- QueueHandler(EventHandler&, Multicaster&, const Settings&);
+ QueueHandler(Group&, const Settings&);
bool handle(const framing::AMQFrame& body);
diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
index 6c572849be..c3499e58be 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
@@ -20,12 +20,14 @@
*/
#include "Settings.h"
+#include "qpid/sys/SystemInfo.h"
namespace qpid {
namespace cluster {
Settings::Settings() : // Default settings
- consumeLockMicros(10000)
+ consumeLockMicros(10000),
+ concurrency(sys::SystemInfo::concurrency() + 1)
{}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.h b/qpid/cpp/src/qpid/cluster/exp/Settings.h
index 9d3f5990ac..1ce3c808ea 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Settings.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Settings.h
@@ -35,6 +35,8 @@ struct Settings {
Settings();
std::string name;
uint32_t consumeLockMicros;
+ uint32_t concurrency;
+
sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; }
};
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index b96ae0b30f..ab1af35a72 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -20,6 +20,7 @@
*/
#include "Core.h"
+#include "Group.h"
#include "WiringHandler.h"
#include "EventHandler.h"
#include "QueueHandler.h"
@@ -40,10 +41,10 @@ namespace cluster {
using namespace broker;
using framing::FieldTable;
-WiringHandler::WiringHandler(EventHandler& e,
+WiringHandler::WiringHandler(Group& g,
const boost::intrusive_ptr<QueueHandler>& qh,
broker::Broker& b) :
- HandlerBase(e),
+ HandlerBase(g.getEventHandler()),
broker(b),
recovery(broker.getQueues(), broker.getExchanges(),
broker.getLinks(), broker.getDtxManager()),
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
index 7a07c7098e..4ea109fe64 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
@@ -41,7 +41,7 @@ class Broker;
}
namespace cluster {
-class EventHandler;
+class Group;
class QueueHandler;
/**
@@ -51,7 +51,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler,
public HandlerBase
{
public:
- WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&);
+ WiringHandler(Group&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&);
bool handle(const framing::AMQFrame&);