From 3a0cff06b1e946517544cd150da36b50effcf247 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 4 Oct 2011 14:04:27 +0000 Subject: QPID-2920: Asynchronous completion. Keep track of local messages in flight. Prepare async completion counter on local enqueue. Signal async completion when local message is self delivered. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1178801 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/cluster.mk | 2 + qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 14 ++--- qpid/cpp/src/qpid/cluster/exp/BrokerContext.h | 3 +- qpid/cpp/src/qpid/cluster/exp/Core.cpp | 25 ++++---- qpid/cpp/src/qpid/cluster/exp/Group.cpp | 6 +- qpid/cpp/src/qpid/cluster/exp/Group.h | 7 +++ qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 27 ++++++--- qpid/cpp/src/qpid/cluster/exp/MessageHandler.h | 9 ++- qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp | 58 ++++++++++++++++++ qpid/cpp/src/qpid/cluster/exp/MessageHolder.h | 77 ++++++++++++++++++++++++ 10 files changed, 195 insertions(+), 33 deletions(-) create mode 100644 qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp create mode 100644 qpid/cpp/src/qpid/cluster/exp/MessageHolder.h diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 70bcc8e95a..5401c78c3d 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -128,6 +128,8 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/HandlerBase.h \ qpid/cluster/exp/MessageBuilders.cpp \ qpid/cluster/exp/MessageBuilders.h \ + qpid/cluster/exp/MessageHolder.cpp \ + qpid/cluster/exp/MessageHolder.h \ qpid/cluster/exp/MessageHandler.cpp \ qpid/cluster/exp/MessageHandler.h \ qpid/cluster/exp/Multicaster.cpp \ diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index 4a33d737d7..13e9ae3db2 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -23,6 +23,7 @@ #include "BrokerContext.h" #include "QueueContext.h" #include "Multicaster.h" +#include "MessageHolder.h" #include "hash.h" #include "qpid/framing/ClusterMessageEnqueueBody.h" #include "qpid/framing/ClusterMessageAcquireBody.h" @@ -85,7 +86,6 @@ BrokerContext::BrokerContext(Core& c) : core(c) {} BrokerContext::~BrokerContext() {} - namespace { void sendFrame(Multicaster& mcaster, const AMQFrame& frame, uint16_t channel) { AMQFrame copy(frame); @@ -96,15 +96,15 @@ void sendFrame(Multicaster& mcaster, const AMQFrame& frame, uint16_t channel) { bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr& msg) { + // FIXME aconway 2011-10-03: pass shared ptr on broker::Cluster interface. if (!tssReplicate) return true; - // FIXME aconway 2011-09-29: for async completion the - // UniqueIds::release must move to self-delivery so we can - // identify the same message. - UniqueIds::Scope s(channels); - uint16_t channel = s.id; - mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), channel)); + Group& group = core.getGroup(hashof(queue)); + MessageHolder::Channel channel = + group.getMessageHolder().sending(msg, queue.shared_from_this()); + group.getMulticaster().mcast(ClusterMessageEnqueueBody(pv, queue.getName(), channel)); std::for_each(msg->getFrames().begin(), msg->getFrames().end(), boost::bind(&sendFrame, boost::ref(mcaster(queue)), _1, channel)); + msg->getIngressCompletion().startCompleter(); // Async completion return false; // Strict order, wait for CPG self-delivery to enqueue. } diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h index 01909b136d..f5cb401c51 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h @@ -22,7 +22,6 @@ * */ -#include "UniqueIds.h" #include "qpid/broker/Cluster.h" #include "qpid/sys/AtomicValue.h" @@ -84,9 +83,9 @@ class BrokerContext : public broker::Cluster Multicaster& mcaster(const broker::QueuedMessage& qm); Multicaster& mcaster(const broker::Queue& q); Multicaster& mcaster(const std::string&); + Multicaster& mcaster(const uint32_t); Core& core; - UniqueIds channels; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index a55986decb..f4c5a5c88f 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -37,9 +37,7 @@ namespace qpid { namespace cluster { -Core::Core(const Settings& s, broker::Broker& b) : - broker(b), - settings(s) +Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s) { // FIXME aconway 2011-09-26: this has to be consistent in a // cluster, negotiate as part of join protocol. @@ -48,24 +46,27 @@ Core::Core(const Settings& s, broker::Broker& b) : // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol. std::string groupName = s.name + "-" + boost::lexical_cast(i); groups.push_back(new Group(*this)); + boost::intrusive_ptr group(groups.back()); + // FIXME aconway 2011-10-03: clean up, all Handler ctors take Group. boost::intrusive_ptr queueHandler( - new QueueHandler(groups[i]->getEventHandler(), groups[i]->getMulticaster(), settings)); - groups[i]->getEventHandler().add(queueHandler); - groups[i]->getEventHandler().add(boost::intrusive_ptr( - new WiringHandler(groups[i]->getEventHandler(), queueHandler, broker))); - groups[i]->getEventHandler().add(boost::intrusive_ptr( - new MessageHandler(groups[i]->getEventHandler(), *this))); + new QueueHandler(group->getEventHandler(), group->getMulticaster(), settings)); + group->getEventHandler().add(queueHandler); + group->getEventHandler().add( + boost::intrusive_ptr( + new WiringHandler(group->getEventHandler(), queueHandler, broker))); + group->getEventHandler().add( + boost::intrusive_ptr(new MessageHandler(*group, *this))); std::auto_ptr bh(new BrokerContext(*this)); brokerHandler = bh.get(); // BrokerContext belongs to Broker broker.setCluster(std::auto_ptr(bh)); // FIXME aconway 2011-09-26: multi-group - groups[i]->getEventHandler().start(); - groups[i]->getEventHandler().getCpg().join(groupName); + group->getEventHandler().start(); + group->getEventHandler().getCpg().join(groupName); // TODO aconway 2010-11-18: logging standards // FIXME aconway 2011-09-26: multi-group - QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< groups[i]->getEventHandler().getSelf()); + QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< group->getEventHandler().getSelf()); } } diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp index 3a5c1989ae..17615fccc8 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Group.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp @@ -22,6 +22,8 @@ #include "Core.h" #include "EventHandler.h" #include "Multicaster.h" +#include "MessageHolder.h" +#include "MessageBuilders.h" #include "qpid/broker/Broker.h" @@ -40,7 +42,9 @@ Group::Group(Core& core) : multicaster( new Multicaster(eventHandler->getCpg(), core.getBroker().getPoller(), - boost::bind(&Core::fatal, &core))) + boost::bind(&Core::fatal, &core))), + messageHolder(new MessageHolder()), + messageBuilders(new MessageBuilders(&core.getBroker().getStore())) {} Group::~Group() {} diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h index 15579b2665..0bd1fd2277 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Group.h +++ b/qpid/cpp/src/qpid/cluster/exp/Group.h @@ -37,6 +37,8 @@ class Cpg; class Core; class EventHandler; class Multicaster; +class MessageBuilders; +class MessageHolder; /** * A CPG instance with an event handler and a multi-caster, @@ -50,13 +52,18 @@ class Group : public RefCounted EventHandler& getEventHandler() { return *eventHandler; } Multicaster& getMulticaster() { return *multicaster; } + MessageHolder& getMessageHolder() { return *messageHolder; } + MessageBuilders& getMessageBuilders() { return *messageBuilders; } void mcast(const framing::AMQBody&); void mcast(const framing::AMQFrame&); private: std::auto_ptr eventHandler; std::auto_ptr multicaster; + std::auto_ptr messageHolder; + std::auto_ptr messageBuilders; }; + }} // namespace qpid::cluster::exp #endif /*!QPID_CLUSTER_EXP_GROUP_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index 1608d0e6ec..a06756e87b 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -25,6 +25,7 @@ #include "QueueContext.h" #include "EventHandler.h" #include "PrettyId.h" +#include "Group.h" #include "qpid/broker/Message.h" #include "qpid/broker/Broker.h" #include "qpid/broker/QueueRegistry.h" @@ -42,11 +43,12 @@ namespace cluster { using namespace broker; using namespace framing; -MessageHandler::MessageHandler(EventHandler& e, Core& c) : - HandlerBase(e), +MessageHandler::MessageHandler(Group& g, Core& c) : + HandlerBase(g.getEventHandler()), broker(c.getBroker()), core(c), - messageBuilders(&c.getBroker().getStore()) + messageBuilders(g.getMessageBuilders()), + messageHolder(g.getMessageHolder()) {} bool MessageHandler::handle(const framing::AMQFrame& frame) { @@ -59,12 +61,16 @@ bool MessageHandler::handle(const framing::AMQFrame& frame) { { boost::shared_ptr queue; boost::intrusive_ptr message; - if (messageBuilders.handle(sender(), frame, queue, message)) { + if (sender() == self()) + messageHolder.check(frame, queue, message); + else + messageBuilders.handle(sender(), frame, queue, message); + if (message) { BrokerContext::ScopedSuppressReplication ssr; queue->deliver(message); + if (sender() == self()) // Async completion + message->getIngressCompletion().finishCompleter(); } - // FIXME aconway 2011-09-29: async completion goes here. - // For own messages need to release the channel assigned by BrokerContext. return true; } return false; @@ -79,9 +85,12 @@ boost::shared_ptr MessageHandler::findQueue( } void MessageHandler::enqueue(const std::string& q, uint16_t channel) { - boost::shared_ptr queue = findQueue(q, "Cluster enqueue failed"); - // FIXME aconway 2011-09-28: don't re-decode my own messages - messageBuilders.announce(sender(), channel, queue); + // We only need to build message from other brokers, our own messages + // are held by the MessageHolder. + if (sender() != self()) { + boost::shared_ptr queue = findQueue(q, "Cluster enqueue failed"); + messageBuilders.announce(sender(), channel, queue); + } } // FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h index f1d3dc2726..ff5afefe27 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h @@ -26,6 +26,7 @@ #include "HandlerBase.h" #include "MessageBuilders.h" +#include "MessageHolder.h" #include "qpid/framing/AMQP_AllOperations.h" #include #include @@ -42,6 +43,8 @@ namespace cluster { class EventHandler; class BrokerContext; class Core; +class Group; +class MessageHolder; // FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for queue. // Make this consistent. @@ -53,7 +56,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler public HandlerBase { public: - MessageHandler(EventHandler&, Core&); + MessageHandler(Group&, Core&); bool handle(const framing::AMQFrame&); @@ -67,8 +70,10 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler broker::Broker& broker; Core& core; - MessageBuilders messageBuilders; + MessageBuilders& messageBuilders; + MessageHolder& messageHolder; }; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp new file mode 100644 index 0000000000..4cadb033f0 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageHolder.h" +#include "qpid/broker/Message.h" + +namespace qpid { +namespace cluster { + +uint16_t MessageHolder::sending(const MessagePtr& msg, const QueuePtr& q) { + sys::Mutex::ScopedLock l(lock); + Channel channel = getChannel(l); + messages[channel] = std::make_pair(msg,q); + return channel; +} + +bool MessageHolder::check(const framing::AMQFrame& frame, + QueuePtr& queueOut, MessagePtr& msgOut) +{ + if (frame.getEof() && frame.getEos()) { //end of frameset + sys::Mutex::ScopedLock l(lock); + MessageMap::iterator i = messages.find(frame.getChannel()); + assert(i != messages.end()); + msgOut = i->second.first; + queueOut = i->second.second; + messages.erase(frame.getChannel()); // re-use the frame. + return true; + } + return false; +} + +MessageHolder::Channel MessageHolder::getChannel(const sys::Mutex::ScopedLock&) { + sys::Mutex::ScopedLock l(lock); + Channel old = mark; + while (messages.find(++mark) != messages.end()) + assert(mark != old); // check wrap-around + return mark; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHolder.h b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.h new file mode 100644 index 0000000000..55d7b96260 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.h @@ -0,0 +1,77 @@ +#ifndef QPID_CLUSTER_EXP_MESSAGEHOLDER_H +#define QPID_CLUSTER_EXP_MESSAGEHOLDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include "qpid/framing/AMQFrame.h" +#include +#include + +namespace qpid { + +namespace broker { +class Message; +class Queue; +} + +namespace cluster { + +/** + * Holds locally-received messages until the corresponding transfer + * multicast is self delivered. + * THREAD SAFE: updated in broker and deliver threads. + */ +class MessageHolder +{ + public: + typedef uint16_t Channel; + typedef boost::intrusive_ptr MessagePtr; + typedef boost::shared_ptr QueuePtr; + + MessageHolder() : mark(0) {} + + /** + * Called in broker thread by BrokerContext just before a local message is enqueud. + *@return channel to use to send the message. + */ + uint16_t sending(const MessagePtr& msg, const QueuePtr &q); + + /** Called in deliver thread by MessageHandler as frames are received. + * If this is the last frame of the message, return the corresponding local message + *@return true if this is the last frame. + */ + bool check(const framing::AMQFrame& frame, QueuePtr& queueOut, MessagePtr& msgOut); + + private: + typedef std::pair MessageAndQueue; + typedef std::map MessageMap; + + Channel getChannel(const sys::Mutex::ScopedLock&); + + sys::Mutex lock; + MessageMap messages; + Channel mark; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_MESSAGEHOLDER_H*/ -- cgit v1.2.1