summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-27 13:11:14 +0000
committerAlan Conway <aconway@apache.org>2011-09-27 13:11:14 +0000
commitd4c2852eda45520e4fb1366fdf94ff3beb8b208c (patch)
treebf584e1144e3b39c2729065aea3cb5a8d2e4e076
parente3feec93b16698fefcdaabc238c8f07f15ff67ef (diff)
downloadqpid-python-d4c2852eda45520e4fb1366fdf94ff3beb8b208c.tar.gz
QPID-2920: Groundwork to enable queues hashed over multiple CPG groups.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1176372 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk4
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp54
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp28
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.h23
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.cpp50
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.h62
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h12
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/WiringHandler.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/hash.cpp36
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/hash.h53
17 files changed, 302 insertions, 64 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 0945d58f07..29b609bacd 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -120,6 +120,10 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/Core.h \
qpid/cluster/exp/EventHandler.cpp \
qpid/cluster/exp/EventHandler.h \
+ qpid/cluster/exp/Group.cpp \
+ qpid/cluster/exp/Group.h \
+ qpid/cluster/exp/hash.cpp \
+ qpid/cluster/exp/hash.h \
qpid/cluster/exp/HandlerBase.cpp \
qpid/cluster/exp/HandlerBase.h \
qpid/cluster/exp/MessageHandler.cpp \
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 21d497cfb5..f2693b15e5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -23,6 +23,8 @@
#include "BrokerContext.h"
#include "QueueContext.h"
#include "QueueHandler.h"
+#include "Multicaster.h"
+#include "hash.h"
#include "qpid/framing/ClusterMessageRoutingBody.h"
#include "qpid/framing/ClusterMessageRoutedBody.h"
#include "qpid/framing/ClusterMessageEnqueueBody.h"
@@ -50,6 +52,8 @@ using namespace framing;
using namespace broker;
namespace {
+const ProtocolVersion pv; // shorthand
+
// noReplicate means the current thread is handling a message
// received from the cluster so it should not be replicated.
QPID_TSS bool tssNoReplicate = false;
@@ -59,6 +63,20 @@ QPID_TSS bool tssNoReplicate = false;
QPID_TSS RoutingId tssRoutingId = 0;
}
+// FIXME aconway 2011-09-26: de-const the broker::Cluster interface,
+// then de-const here.
+Multicaster& BrokerContext::mcaster(const broker::QueuedMessage& qm) {
+ return core.getGroup(hashof(qm)).getMulticaster();
+}
+
+Multicaster& BrokerContext::mcaster(const broker::Queue& q) {
+ return core.getGroup(hashof(q)).getMulticaster();
+}
+
+Multicaster& BrokerContext::mcaster(const std::string& name) {
+ return core.getGroup(hashof(name)).getMulticaster();
+}
+
BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() {
assert(!tssNoReplicate);
tssNoReplicate = true;
@@ -89,16 +107,17 @@ bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& m
std::string data(msg->encodedSize(),char());
framing::Buffer buf(&data[0], data.size());
msg->encode(buf);
- core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data));
+ mcaster(queue).mcast(ClusterMessageRoutingBody(pv, tssRoutingId, data));
core.getRoutingMap().put(tssRoutingId, msg);
}
- core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName()));
+ mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, tssRoutingId, queue.getName()));
return false; // Strict order, wait for CPG self-delivery to enqueue.
}
void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
if (tssRoutingId) { // we enqueued at least one message.
- core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId));
+ core.getGroup(tssRoutingId).getMulticaster().mcast(
+ ClusterMessageRoutedBody(pv, tssRoutingId));
// Note: routingMap is cleaned up on CPG delivery in MessageHandler.
tssRoutingId = 0;
}
@@ -106,20 +125,19 @@ void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
void BrokerContext::acquire(const broker::QueuedMessage& qm) {
if (tssNoReplicate) return;
- core.mcast(ClusterMessageAcquireBody(
- ProtocolVersion(), qm.queue->getName(), qm.position));
+ mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
}
void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
if (!tssNoReplicate)
- core.mcast(ClusterMessageDequeueBody(
- ProtocolVersion(), qm.queue->getName(), qm.position));
+ mcaster(qm).mcast(
+ ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position));
}
void BrokerContext::requeue(const broker::QueuedMessage& qm) {
if (!tssNoReplicate)
- core.mcast(ClusterMessageRequeueBody(
- ProtocolVersion(),
+ mcaster(qm).mcast(ClusterMessageRequeueBody(
+ pv,
qm.queue->getName(),
qm.position,
qm.payload->getRedelivered()));
@@ -130,17 +148,17 @@ void BrokerContext::create(broker::Queue& q) {
if (tssNoReplicate) return;
assert(!QueueContext::get(q));
boost::intrusive_ptr<QueueContext> context(
- new QueueContext(q, core.getSettings().getConsumeLock(), core.getMulticaster()));
+ new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName())));
std::string data(q.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
q.encode(buf);
- core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data));
+ mcaster(q).mcast(ClusterWiringCreateQueueBody(pv, data));
// FIXME aconway 2011-07-29: Need asynchronous completion.
}
void BrokerContext::destroy(broker::Queue& q) {
if (tssNoReplicate) return;
- core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName()));
+ mcaster(q).mcast(ClusterWiringDestroyQueueBody(pv, q.getName()));
}
void BrokerContext::create(broker::Exchange& ex) {
@@ -148,28 +166,27 @@ void BrokerContext::create(broker::Exchange& ex) {
std::string data(ex.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
ex.encode(buf);
- core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data));
+ mcaster(ex.getName()).mcast(ClusterWiringCreateExchangeBody(pv, data));
}
void BrokerContext::destroy(broker::Exchange& ex) {
if (tssNoReplicate) return;
- core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName()));
+ mcaster(ex.getName()).mcast(
+ ClusterWiringDestroyExchangeBody(pv, ex.getName()));
}
void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex,
const std::string& key, const framing::FieldTable& args)
{
if (tssNoReplicate) return;
- core.mcast(ClusterWiringBindBody(
- ProtocolVersion(), q.getName(), ex.getName(), key, args));
+ mcaster(q).mcast(ClusterWiringBindBody(pv, q.getName(), ex.getName(), key, args));
}
void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
const std::string& key, const framing::FieldTable& args)
{
if (tssNoReplicate) return;
- core.mcast(ClusterWiringUnbindBody(
- ProtocolVersion(), q.getName(), ex.getName(), key, args));
+ mcaster(q).mcast(ClusterWiringUnbindBody(pv, q.getName(), ex.getName(), key, args));
}
// n is the number of consumers including the one just added.
@@ -190,3 +207,4 @@ void BrokerContext::stopped(broker::Queue& q) {
}
}} // namespace qpid::cluster
+
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index f0444882a1..c05959fa81 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -82,6 +82,10 @@ class BrokerContext : public broker::Cluster
private:
uint32_t nextRoutingId();
+ // Get multicaster associated with a queue
+ Multicaster& mcaster(const broker::QueuedMessage& qm);
+ Multicaster& mcaster(const broker::Queue& q);
+ Multicaster& mcaster(const std::string&);
Core& core;
boost::intrusive_ptr<QueueHandler> queueHandler;
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index 1b24b7fcf6..fe7f22e445 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -38,26 +38,28 @@ namespace cluster {
Core::Core(const Settings& s, broker::Broker& b) :
broker(b),
- eventHandler(new EventHandler(*this)),
- multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)),
settings(s)
{
+ // FIXME aconway 2011-09-23: multi-group
+ groups.push_back(new Group(*this));
boost::intrusive_ptr<QueueHandler> queueHandler(
- new QueueHandler(*eventHandler, multicaster, settings));
- eventHandler->add(queueHandler);
- eventHandler->add(boost::intrusive_ptr<HandlerBase>(
- new WiringHandler(*eventHandler, queueHandler)));
- eventHandler->add(boost::intrusive_ptr<HandlerBase>(
- new MessageHandler(*eventHandler)));
+ new QueueHandler(groups[0]->getEventHandler(), groups[0]->getMulticaster(), settings));
+ groups[0]->getEventHandler().add(queueHandler);
+ groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
+ new WiringHandler(groups[0]->getEventHandler(), queueHandler, broker)));
+ groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
+ new MessageHandler(groups[0]->getEventHandler(), *this)));
std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler));
brokerHandler = bh.get();
// BrokerContext belongs to Broker
broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
- eventHandler->start();
- eventHandler->getCpg().join(s.name);
+ // FIXME aconway 2011-09-26: multi-group
+ groups[0]->getEventHandler().start();
+ groups[0]->getEventHandler().getCpg().join(s.name);
// TODO aconway 2010-11-18: logging standards
- QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< eventHandler->getSelf());
+ // FIXME aconway 2011-09-26: multi-group
+ QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< groups[0]->getEventHandler().getSelf());
}
void Core::initialize() {}
@@ -68,8 +70,8 @@ void Core::fatal() {
broker::SignalHandler::shutdown();
}
-void Core::mcast(const framing::AMQBody& body) {
- multicaster.mcast(body);
+Group& Core::getGroup(size_t hashValue) {
+ return *groups[hashValue % groups.size()];
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h
index 5f5237d679..f5847f7fee 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.h
@@ -22,15 +22,16 @@
*
*/
-#include <string>
-#include <memory>
#include "LockedMap.h"
-#include "Multicaster.h"
+#include "Group.h"
#include "Settings.h"
#include "qpid/cluster/types.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/sys/Time.h"
+#include <boost/intrusive_ptr.hpp>
+#include <string>
+#include <memory>
// TODO aconway 2010-10-19: experimental cluster code.
@@ -49,7 +50,8 @@ class EventHandler;
class BrokerContext;
/**
- * Cluster core state machine.
+ * Cluster core.
+ *
* Holds together the various objects that implement cluster behavior,
* and holds state that is shared by multiple components.
*
@@ -59,6 +61,7 @@ class Core
{
public:
typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap;
+ typedef std::vector<boost::intrusive_ptr<Group> > Groups;
/** Constructed during Plugin::earlyInitialize() */
Core(const Settings&, broker::Broker&);
@@ -69,13 +72,8 @@ class Core
/** Shut down broker due to fatal error. Caller should log a critical message */
void fatal();
- /** Multicast an event */
- void mcast(const framing::AMQBody&);
-
broker::Broker& getBroker() { return broker; }
- EventHandler& getEventHandler() { return *eventHandler; }
BrokerContext& getBrokerContext() { return *brokerHandler; }
- Multicaster& getMulticaster() { return multicaster; }
/** Map of messages that are currently being routed.
* Used to pass messages being routed from BrokerContext to MessageHandler
@@ -83,13 +81,16 @@ class Core
RoutingMap& getRoutingMap() { return routingMap; }
const Settings& getSettings() const { return settings; }
+
+ /** Get group by hash value. */
+ Group& getGroup(size_t hashValue);
+
private:
broker::Broker& broker;
- std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
BrokerContext* brokerHandler; // Handles broker events.
RoutingMap routingMap;
- Multicaster multicaster;
Settings settings;
+ Groups groups;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
index 4653cbf1ca..6685cb6390 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -33,10 +33,10 @@
namespace qpid {
namespace cluster {
-EventHandler::EventHandler(Core& c) :
- core(c),
- cpg(*this), // FIXME aconway 2010-10-20: belongs on Core.
- dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
+EventHandler::EventHandler(boost::shared_ptr<sys::Poller> poller,
+ boost::function<void()> onError) :
+ cpg(*this),
+ dispatcher(cpg, poller, onError),
self(cpg.self())
{}
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
index 93423778f1..4af43fb76e 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
@@ -37,7 +37,6 @@ class AMQBody;
}
namespace cluster {
-class Core;
class HandlerBase;
/**
@@ -48,7 +47,9 @@ class HandlerBase;
class EventHandler : public Cpg::Handler
{
public:
- EventHandler(Core&);
+ EventHandler(boost::shared_ptr<sys::Poller> poller,
+ boost::function<void()> onError);
+
~EventHandler();
/** Add a handler */
@@ -75,13 +76,11 @@ class EventHandler : public Cpg::Handler
MemberId getSender() { return sender; }
MemberId getSelf() { return self; }
- Core& getCore() { return core; }
Cpg& getCpg() { return cpg; }
private:
void invoke(const framing::AMQBody& body);
- Core& core;
Cpg cpg;
PollerDispatch dispatcher;
MemberId sender; // sender of current event.
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
new file mode 100644
index 0000000000..3a5c1989ae
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Group.h"
+#include "Core.h"
+#include "EventHandler.h"
+#include "Multicaster.h"
+
+#include "qpid/broker/Broker.h"
+
+namespace qpid {
+namespace framing {
+class AMQFrame;
+class AMQBody;
+}
+
+namespace cluster {
+
+Group::Group(Core& core) :
+ eventHandler(
+ new EventHandler(core.getBroker().getPoller(),
+ boost::bind(&Core::fatal, &core))),
+ multicaster(
+ new Multicaster(eventHandler->getCpg(),
+ core.getBroker().getPoller(),
+ boost::bind(&Core::fatal, &core)))
+{}
+
+Group::~Group() {}
+
+void Group::mcast(const framing::AMQBody& b) { multicaster->mcast(b); }
+void Group::mcast(const framing::AMQFrame& f) { multicaster->mcast(f); }
+}} // namespace qpid::cluster::exp
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h
new file mode 100644
index 0000000000..15579b2665
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.h
@@ -0,0 +1,62 @@
+#ifndef QPID_CLUSTER_EXP_GROUP_H
+#define QPID_CLUSTER_EXP_GROUP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include <memory>
+
+namespace qpid {
+namespace framing {
+class AMQBody;
+class AMQFrame;
+}
+
+namespace cluster {
+
+class Cpg;
+class Core;
+class EventHandler;
+class Multicaster;
+
+/**
+ * A CPG instance with an event handler and a multi-caster,
+ * along with all the per-group handler objects.
+ */
+class Group : public RefCounted
+{
+ public:
+ Group(Core& core);
+ ~Group();
+
+ EventHandler& getEventHandler() { return *eventHandler; }
+ Multicaster& getMulticaster() { return *multicaster; }
+
+ void mcast(const framing::AMQBody&);
+ void mcast(const framing::AMQFrame&);
+ private:
+ std::auto_ptr<EventHandler> eventHandler;
+ std::auto_ptr<Multicaster> multicaster;
+};
+}} // namespace qpid::cluster::exp
+
+#endif /*!QPID_CLUSTER_EXP_GROUP_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index d9fce02d75..16f5a90a7c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -39,9 +39,10 @@ namespace qpid {
namespace cluster {
using namespace broker;
-MessageHandler::MessageHandler(EventHandler& e) :
+MessageHandler::MessageHandler(EventHandler& e, Core& c) :
HandlerBase(e),
- broker(e.getCore().getBroker())
+ broker(c.getBroker()),
+ core(c)
{}
bool MessageHandler::invoke(const framing::AMQBody& body) {
@@ -49,7 +50,7 @@ bool MessageHandler::invoke(const framing::AMQBody& body) {
}
void MessageHandler::routing(RoutingId routingId, const std::string& message) {
- if (sender() == self()) return; // Already in getCore().getRoutingMap()
+ if (sender() == self()) return; // Already in core.getRoutingMap()
boost::intrusive_ptr<Message> msg = new Message;
// FIXME aconway 2010-10-28: decode message in bounded-size buffers.
framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
@@ -70,7 +71,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
boost::intrusive_ptr<Message> msg;
if (sender() == self())
- msg = eventHandler.getCore().getRoutingMap().get(routingId);
+ msg = core.getRoutingMap().get(routingId);
else
msg = memberMap[sender()].routingMap[routingId];
if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
@@ -81,7 +82,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
void MessageHandler::routed(RoutingId routingId) {
if (sender() == self())
- eventHandler.getCore().getRoutingMap().erase(routingId);
+ core.getRoutingMap().erase(routingId);
else
memberMap[sender()].routingMap.erase(routingId);
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
index 40e004d89a..36258879d3 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
@@ -40,6 +40,7 @@ class Queue;
namespace cluster {
class EventHandler;
class BrokerContext;
+class Core;
// FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for queue.
// Make this consistent.
@@ -51,7 +52,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
public HandlerBase
{
public:
- MessageHandler(EventHandler&);
+ MessageHandler(EventHandler&, Core&);
bool invoke(const framing::AMQBody& body);
@@ -73,6 +74,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
broker::Broker& broker;
MemberMap memberMap;
+ Core& core;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index f71b0d1865..248fb05dc6 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -23,6 +23,7 @@
#include "Multicaster.h"
#include "qpid/cluster/types.h"
#include "BrokerContext.h" // for ScopedSuppressReplication
+#include "hash.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/ClusterQueueResubscribeBody.h"
#include "qpid/framing/ClusterQueueSubscribeBody.h"
@@ -39,7 +40,7 @@ QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicas
: timer(boost::bind(&QueueContext::timeout, this),
q.getBroker()->getTimer(),
consumeLock),
- queue(q), mcast(m), consumers(0)
+ queue(q), mcast(m), consumers(0), hash(hashof(q.getName()))
{
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
q.stopConsumers(); // Stop queue initially.
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index cb1499f83c..d20ed7cce3 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -76,9 +76,6 @@ class QueueContext : public RefCounted {
*/
void cancel(size_t n);
- /** Get the context for a broker queue. */
- static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
-
/** Called in timer thread when the timer runs out. */
void timeout();
@@ -91,12 +88,19 @@ class QueueContext : public RefCounted {
/** Called by MesageHandler when a message is dequeued. */
broker::QueuedMessage dequeue(uint32_t position);
- private:
+ size_t getHash() const { return hash; }
+
+
+ /** Get the cluster context for a broker queue. */
+ static boost::intrusive_ptr<QueueContext> get(broker::Queue&) ;
+
+private:
sys::Mutex lock;
CountdownTimer timer;
broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr?
Multicaster& mcast;
size_t consumers;
+ size_t hash;
typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME aconway 2011-09-15: don't need read/write map? Rename
UnackedMap unacked;
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
index 92f7183a08..79e9d657bc 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
@@ -41,9 +41,10 @@ using namespace broker;
using framing::FieldTable;
WiringHandler::WiringHandler(EventHandler& e,
- const boost::intrusive_ptr<QueueHandler>& qh) :
+ const boost::intrusive_ptr<QueueHandler>& qh,
+ broker::Broker& b) :
HandlerBase(e),
- broker(e.getCore().getBroker()),
+ broker(b),
recovery(broker.getQueues(), broker.getExchanges(),
broker.getLinks(), broker.getDtxManager()),
queueHandler(qh)
diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
index 71aa6e52e9..cd964de997 100644
--- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
@@ -51,7 +51,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler,
public HandlerBase
{
public:
- WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh);
+ WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&);
bool invoke(const framing::AMQBody& body);
diff --git a/qpid/cpp/src/qpid/cluster/exp/hash.cpp b/qpid/cpp/src/qpid/cluster/exp/hash.cpp
new file mode 100644
index 0000000000..8712ce13c5
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/hash.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "hash.h"
+#include "QueueContext.h"
+#include "qpid/broker/QueuedMessage.h"
+#include <boost/functional/hash.hpp>
+
+namespace qpid {
+namespace cluster {
+size_t hashof(const std::string& s) { return boost::hash_value(s); }
+size_t hashof(const QueueContext& qc) { return qc.getHash(); }
+size_t hashof(const broker::Queue& q) {
+ return QueueContext::get(const_cast<broker::Queue&>(q))->getHash();
+}
+size_t hashof(uint32_t n) { return boost::hash_value(n); }
+size_t hashof(const broker::QueuedMessage& qm) { return hashof(*qm.queue); }
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/hash.h b/qpid/cpp/src/qpid/cluster/exp/hash.h
new file mode 100644
index 0000000000..298496b22a
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/hash.h
@@ -0,0 +1,53 @@
+#ifndef QPID_CLUSTER_EXP_HASH_H
+#define QPID_CLUSTER_EXP_HASH_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IntegerTypes.h"
+#include <stdlib.h>
+#include <string>
+
+namespace qpid {
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace cluster {
+
+class QueueContext;
+
+/**@file hash functions */
+
+// The following all uses the cached hash value on the Queue::getContext()
+// FIXME aconway 2011-09-26: de-const broker::Cluster interface then de-const here.
+size_t hashof(const broker::Queue& q);
+size_t hashof(const QueueContext& qc);
+size_t hashof(const broker::QueuedMessage& qm);
+
+// Hash directly from a value string.
+size_t hashof(const std::string& s);
+size_t hashof(uint32_t n);
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_HASH_H*/