summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-27 13:11:30 +0000
committerAlan Conway <aconway@apache.org>2011-09-27 13:11:30 +0000
commit6c9ffc8cb38805e7c8a802e7bd1f1b2f05910e28 (patch)
treeefe04ddb9701f77e6f5fa2e8f52582a17701554d
parentd4c2852eda45520e4fb1366fdf94ff3beb8b208c (diff)
downloadqpid-python-6c9ffc8cb38805e7c8a802e7bd1f1b2f05910e28.tar.gz
QPID-2920: Use multiple independent CPG groups.
Use a fixed set of CPG groups, hash queue names to choose the group for a given operation. Operations on a given queue will always use the same CPG group but operations on different queues can execute concurrently on different groups. Removed fanuout optimization, it doesn't work with multiple CPG groups. Can't guratnee that "routing" will proceed enqueues on different CPG groups. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1176373 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp42
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp48
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp32
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.h11
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark1
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp5
-rw-r--r--qpid/cpp/xml/cluster.xml14
10 files changed, 49 insertions, 118 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 298c8b8cd2..6b96bc64fa 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -237,8 +237,6 @@ void Queue::requeue(const QueuedMessage& msg){
* calling function sets qmsg with the lock held, but the call to
* Cluster::acquire() will happen after the lock is released in
* ~ClusterAcquireScope().
- *
- * Also marks a Stoppable as busy for the duration of the scope.
**/
struct ClusterAcquireScope {
QueuedMessage qmsg;
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index f2693b15e5..f95c2c40b4 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -25,8 +25,6 @@
#include "QueueHandler.h"
#include "Multicaster.h"
#include "hash.h"
-#include "qpid/framing/ClusterMessageRoutingBody.h"
-#include "qpid/framing/ClusterMessageRoutedBody.h"
#include "qpid/framing/ClusterMessageEnqueueBody.h"
#include "qpid/framing/ClusterMessageAcquireBody.h"
#include "qpid/framing/ClusterMessageDequeueBody.h"
@@ -57,10 +55,6 @@ const ProtocolVersion pv; // shorthand
// noReplicate means the current thread is handling a message
// received from the cluster so it should not be replicated.
QPID_TSS bool tssNoReplicate = false;
-
-// Routing ID of the message being routed in the current thread.
-// 0 if we are not currently routing a message.
-QPID_TSS RoutingId tssRoutingId = 0;
}
// FIXME aconway 2011-09-26: de-const the broker::Cluster interface,
@@ -90,38 +84,24 @@ BrokerContext::ScopedSuppressReplication::~ScopedSuppressReplication() {
BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q)
: core(c), queueHandler(q) {}
-RoutingId BrokerContext::nextRoutingId() {
- RoutingId id = ++routingId;
- if (id == 0) id = ++routingId; // Avoid 0 on wrap-around.
- return id;
-}
-
-void BrokerContext::routing(const boost::intrusive_ptr<Message>&) { }
+BrokerContext::~BrokerContext() {}
bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
{
if (tssNoReplicate) return true;
- if (!tssRoutingId) { // This is the first enqueue, so send the message
- tssRoutingId = nextRoutingId();
- // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
- std::string data(msg->encodedSize(),char());
- framing::Buffer buf(&data[0], data.size());
- msg->encode(buf);
- mcaster(queue).mcast(ClusterMessageRoutingBody(pv, tssRoutingId, data));
- core.getRoutingMap().put(tssRoutingId, msg);
- }
- mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, tssRoutingId, queue.getName()));
+ // FIXME aconway 2010-10-20: replicate message in fragments
+ // (frames), using fixed size bufffers.
+ std::string data(msg->encodedSize(),char());
+ framing::Buffer buf(&data[0], data.size());
+ msg->encode(buf);
+ mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), data));
return false; // Strict order, wait for CPG self-delivery to enqueue.
}
-void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
- if (tssRoutingId) { // we enqueued at least one message.
- core.getGroup(tssRoutingId).getMulticaster().mcast(
- ClusterMessageRoutedBody(pv, tssRoutingId));
- // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
- tssRoutingId = 0;
- }
-}
+// routing and routed are no-ops. They are needed to implement fanout
+// optimization, which is currently not implemnted
+void BrokerContext::routing(const boost::intrusive_ptr<broker::Message>&) {}
+void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {}
void BrokerContext::acquire(const broker::QueuedMessage& qm) {
if (tssNoReplicate) return;
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index c05959fa81..6ac5f4ed0a 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -48,8 +48,7 @@ class BrokerContext : public broker::Cluster
};
BrokerContext(Core&, boost::intrusive_ptr<QueueHandler>);
-
- // FIXME aconway 2010-10-20: implement all points.
+ ~BrokerContext();
// Messages
@@ -81,7 +80,6 @@ class BrokerContext : public broker::Cluster
private:
- uint32_t nextRoutingId();
// Get multicaster associated with a queue
Multicaster& mcaster(const broker::QueuedMessage& qm);
Multicaster& mcaster(const broker::Queue& q);
@@ -89,7 +87,6 @@ class BrokerContext : public broker::Cluster
Core& core;
boost::intrusive_ptr<QueueHandler> queueHandler;
- sys::AtomicValue<uint32_t> routingId;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index fe7f22e445..31ca2f067f 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -32,6 +32,7 @@
#include "qpid/framing/Buffer.h"
#include "qpid/log/Statement.h"
#include <sys/uio.h> // For iovec
+#include <boost/lexical_cast.hpp>
namespace qpid {
namespace cluster {
@@ -40,33 +41,38 @@ Core::Core(const Settings& s, broker::Broker& b) :
broker(b),
settings(s)
{
- // FIXME aconway 2011-09-23: multi-group
- groups.push_back(new Group(*this));
- boost::intrusive_ptr<QueueHandler> queueHandler(
- new QueueHandler(groups[0]->getEventHandler(), groups[0]->getMulticaster(), settings));
- groups[0]->getEventHandler().add(queueHandler);
- groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
- new WiringHandler(groups[0]->getEventHandler(), queueHandler, broker)));
- groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
- new MessageHandler(groups[0]->getEventHandler(), *this)));
+ // FIXME aconway 2011-09-26: this has to be consistent in a
+ // cluster, negotiate as part of join protocol.
+ size_t nGroups = broker.getOptions().workerThreads;
+ for (size_t i = 0; i < nGroups; ++i) {
+ // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol.
+ std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i);
+ QPID_LOG(critical, "FIXME create group " << i << " of " << "nGroups. " << groupName);
+ groups.push_back(new Group(*this));
+ boost::intrusive_ptr<QueueHandler> queueHandler(
+ new QueueHandler(groups[i]->getEventHandler(), groups[i]->getMulticaster(), settings));
+ groups[i]->getEventHandler().add(queueHandler);
+ groups[i]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
+ new WiringHandler(groups[i]->getEventHandler(), queueHandler, broker)));
+ groups[i]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
+ new MessageHandler(groups[i]->getEventHandler(), *this)));
- std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler));
- brokerHandler = bh.get();
- // BrokerContext belongs to Broker
- broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
- // FIXME aconway 2011-09-26: multi-group
- groups[0]->getEventHandler().start();
- groups[0]->getEventHandler().getCpg().join(s.name);
- // TODO aconway 2010-11-18: logging standards
- // FIXME aconway 2011-09-26: multi-group
- QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< groups[0]->getEventHandler().getSelf());
+ std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler));
+ brokerHandler = bh.get();
+ // BrokerContext belongs to Broker
+ broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
+ // FIXME aconway 2011-09-26: multi-group
+ groups[i]->getEventHandler().start();
+ groups[i]->getEventHandler().getCpg().join(groupName);
+ // TODO aconway 2010-11-18: logging standards
+ // FIXME aconway 2011-09-26: multi-group
+ QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< groups[i]->getEventHandler().getSelf());
+ }
}
void Core::initialize() {}
void Core::fatal() {
- // FIXME aconway 2010-10-20: error handling
- assert(0);
broker::SignalHandler::shutdown();
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h
index f5847f7fee..d23ed2e4e8 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.h
@@ -60,7 +60,6 @@ class BrokerContext;
class Core
{
public:
- typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap;
typedef std::vector<boost::intrusive_ptr<Group> > Groups;
/** Constructed during Plugin::earlyInitialize() */
@@ -75,11 +74,6 @@ class Core
broker::Broker& getBroker() { return broker; }
BrokerContext& getBrokerContext() { return *brokerHandler; }
- /** Map of messages that are currently being routed.
- * Used to pass messages being routed from BrokerContext to MessageHandler
- */
- RoutingMap& getRoutingMap() { return routingMap; }
-
const Settings& getSettings() const { return settings; }
/** Get group by hash value. */
@@ -88,7 +82,6 @@ class Core
private:
broker::Broker& broker;
BrokerContext* brokerHandler; // Handles broker events.
- RoutingMap routingMap;
Settings settings;
Groups groups;
};
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 16f5a90a7c..170baa488e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -49,16 +49,6 @@ bool MessageHandler::invoke(const framing::AMQBody& body) {
return framing::invoke(*this, body).wasHandled();
}
-void MessageHandler::routing(RoutingId routingId, const std::string& message) {
- if (sender() == self()) return; // Already in core.getRoutingMap()
- boost::intrusive_ptr<Message> msg = new Message;
- // FIXME aconway 2010-10-28: decode message in bounded-size buffers.
- framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
- msg->decodeHeader(buf);
- msg->decodeContent(buf);
- memberMap[sender()].routingMap[routingId] = msg;
-}
-
boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
const std::string& q, const char* msg)
{
@@ -67,26 +57,18 @@ boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
return queue;
}
-void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
+void MessageHandler::enqueue(const std::string& q, const std::string& message) {
+
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
- boost::intrusive_ptr<Message> msg;
- if (sender() == self())
- msg = core.getRoutingMap().get(routingId);
- else
- msg = memberMap[sender()].routingMap[routingId];
- if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
- << " failed: unknown message"));
+ // FIXME aconway 2010-10-28: decode message by frame in bounded-size buffers.
+ boost::intrusive_ptr<broker::Message> msg = new broker::Message();
+ framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
BrokerContext::ScopedSuppressReplication ssr;
queue->deliver(msg);
}
-void MessageHandler::routed(RoutingId routingId) {
- if (sender() == self())
- core.getRoutingMap().erase(routingId);
- else
- memberMap[sender()].routingMap.erase(routingId);
-}
-
// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
// and scan queue once.
void MessageHandler::acquire(const std::string& q, uint32_t position) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
index 36258879d3..d97999b738 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
@@ -56,24 +56,15 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
bool invoke(const framing::AMQBody& body);
- void routing(uint32_t routingId, const std::string& message);
- void enqueue(uint32_t routingId, const std::string& queue);
- void routed(uint32_t routingId);
+ void enqueue(const std::string& queue, const std::string& message);
void acquire(const std::string& queue, uint32_t position);
void dequeue(const std::string& queue, uint32_t position);
void requeue(const std::string& queue, uint32_t position, bool redelivered);
private:
- struct Member {
- typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap;
- RoutingMap routingMap;
- };
- typedef std::map<MemberId, Member> MemberMap;
-
boost::shared_ptr<broker::Queue> findQueue(const std::string& q, const char* msg);
broker::Broker& broker;
- MemberMap memberMap;
Core& core;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index 484fe00912..29c8ef54dd 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -47,6 +47,5 @@ done
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
run_test "Throughput:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS
-
run_test "Latency:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index b26311abef..9783316449 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -217,10 +217,7 @@ int main(int argc, char ** argv)
}
if (opts.printContent)
std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
- if (opts.messages && count >= opts.messages) {
- cerr << "qpid-receive(" << getpid() << ") DONE" << endl;
- done = true;
- }
+ if (opts.messages && count >= opts.messages) done = true;
}
} else if (opts.checkRedelivered && !msg.getRedelivered()) {
throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!");
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index dfcf30ecdf..5fba6b4036 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -331,23 +331,11 @@
<!-- Message delivery and disposition -->
<class name="cluster-message" code="0x82">
- <!-- FIXME aconway 2010-10-19: create message in fragments -->
- <control name="routing" code="0x1">
- <field name="routing-id" type="uint32"/>
- <field name="message" type="str32"/>
- </control>
-
- <!-- FIXME aconway 2011-04-27: reference queues by index, not name -->
<control name="enqueue" code="0x2">
- <field name="routing-id" type="uint32"/>
<field name="queue" type="queue.name"/>
+ <field name="message" type="str32"/>
</control>
- <control name="routed" code="0x3">
- <field name="routing-id" type="uint32"/>
- </control>
-
- <!-- FIXME aconway 2011-04-27: review queue positions vs. global message IDs -->
<control name="acquire" code="0x4">
<field name="queue" type="queue.name"/>
<field name="position" type="uint32"/>