summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp27
1 files changed, 18 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 1608d0e6ec..a06756e87b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -25,6 +25,7 @@
#include "QueueContext.h"
#include "EventHandler.h"
#include "PrettyId.h"
+#include "Group.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueRegistry.h"
@@ -42,11 +43,12 @@ namespace cluster {
using namespace broker;
using namespace framing;
-MessageHandler::MessageHandler(EventHandler& e, Core& c) :
- HandlerBase(e),
+MessageHandler::MessageHandler(Group& g, Core& c) :
+ HandlerBase(g.getEventHandler()),
broker(c.getBroker()),
core(c),
- messageBuilders(&c.getBroker().getStore())
+ messageBuilders(g.getMessageBuilders()),
+ messageHolder(g.getMessageHolder())
{}
bool MessageHandler::handle(const framing::AMQFrame& frame) {
@@ -59,12 +61,16 @@ bool MessageHandler::handle(const framing::AMQFrame& frame) {
{
boost::shared_ptr<broker::Queue> queue;
boost::intrusive_ptr<broker::Message> message;
- if (messageBuilders.handle(sender(), frame, queue, message)) {
+ if (sender() == self())
+ messageHolder.check(frame, queue, message);
+ else
+ messageBuilders.handle(sender(), frame, queue, message);
+ if (message) {
BrokerContext::ScopedSuppressReplication ssr;
queue->deliver(message);
+ if (sender() == self()) // Async completion
+ message->getIngressCompletion().finishCompleter();
}
- // FIXME aconway 2011-09-29: async completion goes here.
- // For own messages need to release the channel assigned by BrokerContext.
return true;
}
return false;
@@ -79,9 +85,12 @@ boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
}
void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
- // FIXME aconway 2011-09-28: don't re-decode my own messages
- messageBuilders.announce(sender(), channel, queue);
+ // We only need to build message from other brokers, our own messages
+ // are held by the MessageHolder.
+ if (sender() != self()) {
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
+ messageBuilders.announce(sender(), channel, queue);
+ }
}
// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet