diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index 1608d0e6ec..a06756e87b 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -25,6 +25,7 @@ #include "QueueContext.h" #include "EventHandler.h" #include "PrettyId.h" +#include "Group.h" #include "qpid/broker/Message.h" #include "qpid/broker/Broker.h" #include "qpid/broker/QueueRegistry.h" @@ -42,11 +43,12 @@ namespace cluster { using namespace broker; using namespace framing; -MessageHandler::MessageHandler(EventHandler& e, Core& c) : - HandlerBase(e), +MessageHandler::MessageHandler(Group& g, Core& c) : + HandlerBase(g.getEventHandler()), broker(c.getBroker()), core(c), - messageBuilders(&c.getBroker().getStore()) + messageBuilders(g.getMessageBuilders()), + messageHolder(g.getMessageHolder()) {} bool MessageHandler::handle(const framing::AMQFrame& frame) { @@ -59,12 +61,16 @@ bool MessageHandler::handle(const framing::AMQFrame& frame) { { boost::shared_ptr<broker::Queue> queue; boost::intrusive_ptr<broker::Message> message; - if (messageBuilders.handle(sender(), frame, queue, message)) { + if (sender() == self()) + messageHolder.check(frame, queue, message); + else + messageBuilders.handle(sender(), frame, queue, message); + if (message) { BrokerContext::ScopedSuppressReplication ssr; queue->deliver(message); + if (sender() == self()) // Async completion + message->getIngressCompletion().finishCompleter(); } - // FIXME aconway 2011-09-29: async completion goes here. - // For own messages need to release the channel assigned by BrokerContext. return true; } return false; @@ -79,9 +85,12 @@ boost::shared_ptr<broker::Queue> MessageHandler::findQueue( } void MessageHandler::enqueue(const std::string& q, uint16_t channel) { - boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed"); - // FIXME aconway 2011-09-28: don't re-decode my own messages - messageBuilders.announce(sender(), channel, queue); + // We only need to build message from other brokers, our own messages + // are held by the MessageHolder. + if (sender() != self()) { + boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed"); + messageBuilders.announce(sender(), channel, queue); + } } // FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet |