diff options
author | Alan Conway <aconway@apache.org> | 2011-09-27 13:11:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-27 13:11:14 +0000 |
commit | d4c2852eda45520e4fb1366fdf94ff3beb8b208c (patch) | |
tree | bf584e1144e3b39c2729065aea3cb5a8d2e4e076 | |
parent | e3feec93b16698fefcdaabc238c8f07f15ff67ef (diff) | |
download | qpid-python-d4c2852eda45520e4fb1366fdf94ff3beb8b208c.tar.gz |
QPID-2920: Groundwork to enable queues hashed over multiple CPG groups.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1176372 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/cluster.mk | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Core.cpp | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Core.h | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/EventHandler.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Group.cpp | 50 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Group.h | 62 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/MessageHandler.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/WiringHandler.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/hash.cpp | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/hash.h | 53 |
17 files changed, 302 insertions, 64 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 0945d58f07..29b609bacd 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -120,6 +120,10 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/Core.h \ qpid/cluster/exp/EventHandler.cpp \ qpid/cluster/exp/EventHandler.h \ + qpid/cluster/exp/Group.cpp \ + qpid/cluster/exp/Group.h \ + qpid/cluster/exp/hash.cpp \ + qpid/cluster/exp/hash.h \ qpid/cluster/exp/HandlerBase.cpp \ qpid/cluster/exp/HandlerBase.h \ qpid/cluster/exp/MessageHandler.cpp \ diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index 21d497cfb5..f2693b15e5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -23,6 +23,8 @@ #include "BrokerContext.h" #include "QueueContext.h" #include "QueueHandler.h" +#include "Multicaster.h" +#include "hash.h" #include "qpid/framing/ClusterMessageRoutingBody.h" #include "qpid/framing/ClusterMessageRoutedBody.h" #include "qpid/framing/ClusterMessageEnqueueBody.h" @@ -50,6 +52,8 @@ using namespace framing; using namespace broker; namespace { +const ProtocolVersion pv; // shorthand + // noReplicate means the current thread is handling a message // received from the cluster so it should not be replicated. QPID_TSS bool tssNoReplicate = false; @@ -59,6 +63,20 @@ QPID_TSS bool tssNoReplicate = false; QPID_TSS RoutingId tssRoutingId = 0; } +// FIXME aconway 2011-09-26: de-const the broker::Cluster interface, +// then de-const here. +Multicaster& BrokerContext::mcaster(const broker::QueuedMessage& qm) { + return core.getGroup(hashof(qm)).getMulticaster(); +} + +Multicaster& BrokerContext::mcaster(const broker::Queue& q) { + return core.getGroup(hashof(q)).getMulticaster(); +} + +Multicaster& BrokerContext::mcaster(const std::string& name) { + return core.getGroup(hashof(name)).getMulticaster(); +} + BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() { assert(!tssNoReplicate); tssNoReplicate = true; @@ -89,16 +107,17 @@ bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& m std::string data(msg->encodedSize(),char()); framing::Buffer buf(&data[0], data.size()); msg->encode(buf); - core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data)); + mcaster(queue).mcast(ClusterMessageRoutingBody(pv, tssRoutingId, data)); core.getRoutingMap().put(tssRoutingId, msg); } - core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName())); + mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, tssRoutingId, queue.getName())); return false; // Strict order, wait for CPG self-delivery to enqueue. } void BrokerContext::routed(const boost::intrusive_ptr<Message>&) { if (tssRoutingId) { // we enqueued at least one message. - core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId)); + core.getGroup(tssRoutingId).getMulticaster().mcast( + ClusterMessageRoutedBody(pv, tssRoutingId)); // Note: routingMap is cleaned up on CPG delivery in MessageHandler. tssRoutingId = 0; } @@ -106,20 +125,19 @@ void BrokerContext::routed(const boost::intrusive_ptr<Message>&) { void BrokerContext::acquire(const broker::QueuedMessage& qm) { if (tssNoReplicate) return; - core.mcast(ClusterMessageAcquireBody( - ProtocolVersion(), qm.queue->getName(), qm.position)); + mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position)); } void BrokerContext::dequeue(const broker::QueuedMessage& qm) { if (!tssNoReplicate) - core.mcast(ClusterMessageDequeueBody( - ProtocolVersion(), qm.queue->getName(), qm.position)); + mcaster(qm).mcast( + ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position)); } void BrokerContext::requeue(const broker::QueuedMessage& qm) { if (!tssNoReplicate) - core.mcast(ClusterMessageRequeueBody( - ProtocolVersion(), + mcaster(qm).mcast(ClusterMessageRequeueBody( + pv, qm.queue->getName(), qm.position, qm.payload->getRedelivered())); @@ -130,17 +148,17 @@ void BrokerContext::create(broker::Queue& q) { if (tssNoReplicate) return; assert(!QueueContext::get(q)); boost::intrusive_ptr<QueueContext> context( - new QueueContext(q, core.getSettings().getConsumeLock(), core.getMulticaster())); + new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName()))); std::string data(q.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); q.encode(buf); - core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data)); + mcaster(q).mcast(ClusterWiringCreateQueueBody(pv, data)); // FIXME aconway 2011-07-29: Need asynchronous completion. } void BrokerContext::destroy(broker::Queue& q) { if (tssNoReplicate) return; - core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName())); + mcaster(q).mcast(ClusterWiringDestroyQueueBody(pv, q.getName())); } void BrokerContext::create(broker::Exchange& ex) { @@ -148,28 +166,27 @@ void BrokerContext::create(broker::Exchange& ex) { std::string data(ex.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); ex.encode(buf); - core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data)); + mcaster(ex.getName()).mcast(ClusterWiringCreateExchangeBody(pv, data)); } void BrokerContext::destroy(broker::Exchange& ex) { if (tssNoReplicate) return; - core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName())); + mcaster(ex.getName()).mcast( + ClusterWiringDestroyExchangeBody(pv, ex.getName())); } void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { if (tssNoReplicate) return; - core.mcast(ClusterWiringBindBody( - ProtocolVersion(), q.getName(), ex.getName(), key, args)); + mcaster(q).mcast(ClusterWiringBindBody(pv, q.getName(), ex.getName(), key, args)); } void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex, const std::string& key, const framing::FieldTable& args) { if (tssNoReplicate) return; - core.mcast(ClusterWiringUnbindBody( - ProtocolVersion(), q.getName(), ex.getName(), key, args)); + mcaster(q).mcast(ClusterWiringUnbindBody(pv, q.getName(), ex.getName(), key, args)); } // n is the number of consumers including the one just added. @@ -190,3 +207,4 @@ void BrokerContext::stopped(broker::Queue& q) { } }} // namespace qpid::cluster + diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h index f0444882a1..c05959fa81 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -82,6 +82,10 @@ class BrokerContext : public broker::Cluster private: uint32_t nextRoutingId(); + // Get multicaster associated with a queue + Multicaster& mcaster(const broker::QueuedMessage& qm); + Multicaster& mcaster(const broker::Queue& q); + Multicaster& mcaster(const std::string&); Core& core; boost::intrusive_ptr<QueueHandler> queueHandler; diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index 1b24b7fcf6..fe7f22e445 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -38,26 +38,28 @@ namespace cluster { Core::Core(const Settings& s, broker::Broker& b) : broker(b), - eventHandler(new EventHandler(*this)), - multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)), settings(s) { + // FIXME aconway 2011-09-23: multi-group + groups.push_back(new Group(*this)); boost::intrusive_ptr<QueueHandler> queueHandler( - new QueueHandler(*eventHandler, multicaster, settings)); - eventHandler->add(queueHandler); - eventHandler->add(boost::intrusive_ptr<HandlerBase>( - new WiringHandler(*eventHandler, queueHandler))); - eventHandler->add(boost::intrusive_ptr<HandlerBase>( - new MessageHandler(*eventHandler))); + new QueueHandler(groups[0]->getEventHandler(), groups[0]->getMulticaster(), settings)); + groups[0]->getEventHandler().add(queueHandler); + groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>( + new WiringHandler(groups[0]->getEventHandler(), queueHandler, broker))); + groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>( + new MessageHandler(groups[0]->getEventHandler(), *this))); std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler)); brokerHandler = bh.get(); // BrokerContext belongs to Broker broker.setCluster(std::auto_ptr<broker::Cluster>(bh)); - eventHandler->start(); - eventHandler->getCpg().join(s.name); + // FIXME aconway 2011-09-26: multi-group + groups[0]->getEventHandler().start(); + groups[0]->getEventHandler().getCpg().join(s.name); // TODO aconway 2010-11-18: logging standards - QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< eventHandler->getSelf()); + // FIXME aconway 2011-09-26: multi-group + QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< groups[0]->getEventHandler().getSelf()); } void Core::initialize() {} @@ -68,8 +70,8 @@ void Core::fatal() { broker::SignalHandler::shutdown(); } -void Core::mcast(const framing::AMQBody& body) { - multicaster.mcast(body); +Group& Core::getGroup(size_t hashValue) { + return *groups[hashValue % groups.size()]; } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h index 5f5237d679..f5847f7fee 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.h +++ b/qpid/cpp/src/qpid/cluster/exp/Core.h @@ -22,15 +22,16 @@ * */ -#include <string> -#include <memory> #include "LockedMap.h" -#include "Multicaster.h" +#include "Group.h" #include "Settings.h" #include "qpid/cluster/types.h" #include "qpid/cluster/Cpg.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/sys/Time.h" +#include <boost/intrusive_ptr.hpp> +#include <string> +#include <memory> // TODO aconway 2010-10-19: experimental cluster code. @@ -49,7 +50,8 @@ class EventHandler; class BrokerContext; /** - * Cluster core state machine. + * Cluster core. + * * Holds together the various objects that implement cluster behavior, * and holds state that is shared by multiple components. * @@ -59,6 +61,7 @@ class Core { public: typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap; + typedef std::vector<boost::intrusive_ptr<Group> > Groups; /** Constructed during Plugin::earlyInitialize() */ Core(const Settings&, broker::Broker&); @@ -69,13 +72,8 @@ class Core /** Shut down broker due to fatal error. Caller should log a critical message */ void fatal(); - /** Multicast an event */ - void mcast(const framing::AMQBody&); - broker::Broker& getBroker() { return broker; } - EventHandler& getEventHandler() { return *eventHandler; } BrokerContext& getBrokerContext() { return *brokerHandler; } - Multicaster& getMulticaster() { return multicaster; } /** Map of messages that are currently being routed. * Used to pass messages being routed from BrokerContext to MessageHandler @@ -83,13 +81,16 @@ class Core RoutingMap& getRoutingMap() { return routingMap; } const Settings& getSettings() const { return settings; } + + /** Get group by hash value. */ + Group& getGroup(size_t hashValue); + private: broker::Broker& broker; - std::auto_ptr<EventHandler> eventHandler; // Handles CPG events. BrokerContext* brokerHandler; // Handles broker events. RoutingMap routingMap; - Multicaster multicaster; Settings settings; + Groups groups; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp index 4653cbf1ca..6685cb6390 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp @@ -33,10 +33,10 @@ namespace qpid { namespace cluster { -EventHandler::EventHandler(Core& c) : - core(c), - cpg(*this), // FIXME aconway 2010-10-20: belongs on Core. - dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)), +EventHandler::EventHandler(boost::shared_ptr<sys::Poller> poller, + boost::function<void()> onError) : + cpg(*this), + dispatcher(cpg, poller, onError), self(cpg.self()) {} diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h index 93423778f1..4af43fb76e 100644 --- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.h @@ -37,7 +37,6 @@ class AMQBody; } namespace cluster { -class Core; class HandlerBase; /** @@ -48,7 +47,9 @@ class HandlerBase; class EventHandler : public Cpg::Handler { public: - EventHandler(Core&); + EventHandler(boost::shared_ptr<sys::Poller> poller, + boost::function<void()> onError); + ~EventHandler(); /** Add a handler */ @@ -75,13 +76,11 @@ class EventHandler : public Cpg::Handler MemberId getSender() { return sender; } MemberId getSelf() { return self; } - Core& getCore() { return core; } Cpg& getCpg() { return cpg; } private: void invoke(const framing::AMQBody& body); - Core& core; Cpg cpg; PollerDispatch dispatcher; MemberId sender; // sender of current event. diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp new file mode 100644 index 0000000000..3a5c1989ae --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Group.h" +#include "Core.h" +#include "EventHandler.h" +#include "Multicaster.h" + +#include "qpid/broker/Broker.h" + +namespace qpid { +namespace framing { +class AMQFrame; +class AMQBody; +} + +namespace cluster { + +Group::Group(Core& core) : + eventHandler( + new EventHandler(core.getBroker().getPoller(), + boost::bind(&Core::fatal, &core))), + multicaster( + new Multicaster(eventHandler->getCpg(), + core.getBroker().getPoller(), + boost::bind(&Core::fatal, &core))) +{} + +Group::~Group() {} + +void Group::mcast(const framing::AMQBody& b) { multicaster->mcast(b); } +void Group::mcast(const framing::AMQFrame& f) { multicaster->mcast(f); } +}} // namespace qpid::cluster::exp diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h new file mode 100644 index 0000000000..15579b2665 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Group.h @@ -0,0 +1,62 @@ +#ifndef QPID_CLUSTER_EXP_GROUP_H +#define QPID_CLUSTER_EXP_GROUP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include <memory> + +namespace qpid { +namespace framing { +class AMQBody; +class AMQFrame; +} + +namespace cluster { + +class Cpg; +class Core; +class EventHandler; +class Multicaster; + +/** + * A CPG instance with an event handler and a multi-caster, + * along with all the per-group handler objects. + */ +class Group : public RefCounted +{ + public: + Group(Core& core); + ~Group(); + + EventHandler& getEventHandler() { return *eventHandler; } + Multicaster& getMulticaster() { return *multicaster; } + + void mcast(const framing::AMQBody&); + void mcast(const framing::AMQFrame&); + private: + std::auto_ptr<EventHandler> eventHandler; + std::auto_ptr<Multicaster> multicaster; +}; +}} // namespace qpid::cluster::exp + +#endif /*!QPID_CLUSTER_EXP_GROUP_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index d9fce02d75..16f5a90a7c 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -39,9 +39,10 @@ namespace qpid { namespace cluster { using namespace broker; -MessageHandler::MessageHandler(EventHandler& e) : +MessageHandler::MessageHandler(EventHandler& e, Core& c) : HandlerBase(e), - broker(e.getCore().getBroker()) + broker(c.getBroker()), + core(c) {} bool MessageHandler::invoke(const framing::AMQBody& body) { @@ -49,7 +50,7 @@ bool MessageHandler::invoke(const framing::AMQBody& body) { } void MessageHandler::routing(RoutingId routingId, const std::string& message) { - if (sender() == self()) return; // Already in getCore().getRoutingMap() + if (sender() == self()) return; // Already in core.getRoutingMap() boost::intrusive_ptr<Message> msg = new Message; // FIXME aconway 2010-10-28: decode message in bounded-size buffers. framing::Buffer buf(const_cast<char*>(&message[0]), message.size()); @@ -70,7 +71,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) { boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed"); boost::intrusive_ptr<Message> msg; if (sender() == self()) - msg = eventHandler.getCore().getRoutingMap().get(routingId); + msg = core.getRoutingMap().get(routingId); else msg = memberMap[sender()].routingMap[routingId]; if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q @@ -81,7 +82,7 @@ void MessageHandler::enqueue(RoutingId routingId, const std::string& q) { void MessageHandler::routed(RoutingId routingId) { if (sender() == self()) - eventHandler.getCore().getRoutingMap().erase(routingId); + core.getRoutingMap().erase(routingId); else memberMap[sender()].routingMap.erase(routingId); } diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h index 40e004d89a..36258879d3 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h @@ -40,6 +40,7 @@ class Queue; namespace cluster { class EventHandler; class BrokerContext; +class Core; // FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for queue. // Make this consistent. @@ -51,7 +52,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler public HandlerBase { public: - MessageHandler(EventHandler&); + MessageHandler(EventHandler&, Core&); bool invoke(const framing::AMQBody& body); @@ -73,6 +74,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler broker::Broker& broker; MemberMap memberMap; + Core& core; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index f71b0d1865..248fb05dc6 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -23,6 +23,7 @@ #include "Multicaster.h" #include "qpid/cluster/types.h" #include "BrokerContext.h" // for ScopedSuppressReplication +#include "hash.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/ClusterQueueResubscribeBody.h" #include "qpid/framing/ClusterQueueSubscribeBody.h" @@ -39,7 +40,7 @@ QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicas : timer(boost::bind(&QueueContext::timeout, this), q.getBroker()->getTimer(), consumeLock), - queue(q), mcast(m), consumers(0) + queue(q), mcast(m), consumers(0), hash(hashof(q.getName())) { q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); q.stopConsumers(); // Stop queue initially. diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index cb1499f83c..d20ed7cce3 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -76,9 +76,6 @@ class QueueContext : public RefCounted { */ void cancel(size_t n); - /** Get the context for a broker queue. */ - static boost::intrusive_ptr<QueueContext> get(broker::Queue&); - /** Called in timer thread when the timer runs out. */ void timeout(); @@ -91,12 +88,19 @@ class QueueContext : public RefCounted { /** Called by MesageHandler when a message is dequeued. */ broker::QueuedMessage dequeue(uint32_t position); - private: + size_t getHash() const { return hash; } + + + /** Get the cluster context for a broker queue. */ + static boost::intrusive_ptr<QueueContext> get(broker::Queue&) ; + +private: sys::Mutex lock; CountdownTimer timer; broker::Queue& queue; // FIXME aconway 2011-06-08: should be shared/weak ptr? Multicaster& mcast; size_t consumers; + size_t hash; typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME aconway 2011-09-15: don't need read/write map? Rename UnackedMap unacked; diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp index 92f7183a08..79e9d657bc 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp @@ -41,9 +41,10 @@ using namespace broker; using framing::FieldTable; WiringHandler::WiringHandler(EventHandler& e, - const boost::intrusive_ptr<QueueHandler>& qh) : + const boost::intrusive_ptr<QueueHandler>& qh, + broker::Broker& b) : HandlerBase(e), - broker(e.getCore().getBroker()), + broker(b), recovery(broker.getQueues(), broker.getExchanges(), broker.getLinks(), broker.getDtxManager()), queueHandler(qh) diff --git a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h index 71aa6e52e9..cd964de997 100644 --- a/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h @@ -51,7 +51,7 @@ class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler, public HandlerBase { public: - WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh); + WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&); bool invoke(const framing::AMQBody& body); diff --git a/qpid/cpp/src/qpid/cluster/exp/hash.cpp b/qpid/cpp/src/qpid/cluster/exp/hash.cpp new file mode 100644 index 0000000000..8712ce13c5 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/hash.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "hash.h" +#include "QueueContext.h" +#include "qpid/broker/QueuedMessage.h" +#include <boost/functional/hash.hpp> + +namespace qpid { +namespace cluster { +size_t hashof(const std::string& s) { return boost::hash_value(s); } +size_t hashof(const QueueContext& qc) { return qc.getHash(); } +size_t hashof(const broker::Queue& q) { + return QueueContext::get(const_cast<broker::Queue&>(q))->getHash(); +} +size_t hashof(uint32_t n) { return boost::hash_value(n); } +size_t hashof(const broker::QueuedMessage& qm) { return hashof(*qm.queue); } + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/hash.h b/qpid/cpp/src/qpid/cluster/exp/hash.h new file mode 100644 index 0000000000..298496b22a --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/hash.h @@ -0,0 +1,53 @@ +#ifndef QPID_CLUSTER_EXP_HASH_H +#define QPID_CLUSTER_EXP_HASH_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" +#include <stdlib.h> +#include <string> + +namespace qpid { +namespace broker { +class Queue; +class QueuedMessage; +} + +namespace cluster { + +class QueueContext; + +/**@file hash functions */ + +// The following all uses the cached hash value on the Queue::getContext() +// FIXME aconway 2011-09-26: de-const broker::Cluster interface then de-const here. +size_t hashof(const broker::Queue& q); +size_t hashof(const QueueContext& qc); +size_t hashof(const broker::QueuedMessage& qm); + +// Hash directly from a value string. +size_t hashof(const std::string& s); +size_t hashof(uint32_t n); + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_HASH_H*/ |