summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-10-04 14:04:27 +0000
committerAlan Conway <aconway@apache.org>2011-10-04 14:04:27 +0000
commit3a0cff06b1e946517544cd150da36b50effcf247 (patch)
treef33291869163de0207e57b659dba55791deba7ab
parentde0cd1fa6022a5143435010cc76a10dfbe2706ad (diff)
downloadqpid-python-3a0cff06b1e946517544cd150da36b50effcf247.tar.gz
QPID-2920: Asynchronous completion.
Keep track of local messages in flight. Prepare async completion counter on local enqueue. Signal async completion when local message is self delivered. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1178801 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp25
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Group.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp27
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.h9
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp58
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHolder.h77
10 files changed, 195 insertions, 33 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 70bcc8e95a..5401c78c3d 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -128,6 +128,8 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/HandlerBase.h \
qpid/cluster/exp/MessageBuilders.cpp \
qpid/cluster/exp/MessageBuilders.h \
+ qpid/cluster/exp/MessageHolder.cpp \
+ qpid/cluster/exp/MessageHolder.h \
qpid/cluster/exp/MessageHandler.cpp \
qpid/cluster/exp/MessageHandler.h \
qpid/cluster/exp/Multicaster.cpp \
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 4a33d737d7..13e9ae3db2 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -23,6 +23,7 @@
#include "BrokerContext.h"
#include "QueueContext.h"
#include "Multicaster.h"
+#include "MessageHolder.h"
#include "hash.h"
#include "qpid/framing/ClusterMessageEnqueueBody.h"
#include "qpid/framing/ClusterMessageAcquireBody.h"
@@ -85,7 +86,6 @@ BrokerContext::BrokerContext(Core& c) : core(c) {}
BrokerContext::~BrokerContext() {}
-
namespace {
void sendFrame(Multicaster& mcaster, const AMQFrame& frame, uint16_t channel) {
AMQFrame copy(frame);
@@ -96,15 +96,15 @@ void sendFrame(Multicaster& mcaster, const AMQFrame& frame, uint16_t channel) {
bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
{
+ // FIXME aconway 2011-10-03: pass shared ptr on broker::Cluster interface.
if (!tssReplicate) return true;
- // FIXME aconway 2011-09-29: for async completion the
- // UniqueIds::release must move to self-delivery so we can
- // identify the same message.
- UniqueIds<uint16_t>::Scope s(channels);
- uint16_t channel = s.id;
- mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), channel));
+ Group& group = core.getGroup(hashof(queue));
+ MessageHolder::Channel channel =
+ group.getMessageHolder().sending(msg, queue.shared_from_this());
+ group.getMulticaster().mcast(ClusterMessageEnqueueBody(pv, queue.getName(), channel));
std::for_each(msg->getFrames().begin(), msg->getFrames().end(),
boost::bind(&sendFrame, boost::ref(mcaster(queue)), _1, channel));
+ msg->getIngressCompletion().startCompleter(); // Async completion
return false; // Strict order, wait for CPG self-delivery to enqueue.
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
index 01909b136d..f5cb401c51 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
@@ -22,7 +22,6 @@
*
*/
-#include "UniqueIds.h"
#include "qpid/broker/Cluster.h"
#include "qpid/sys/AtomicValue.h"
@@ -84,9 +83,9 @@ class BrokerContext : public broker::Cluster
Multicaster& mcaster(const broker::QueuedMessage& qm);
Multicaster& mcaster(const broker::Queue& q);
Multicaster& mcaster(const std::string&);
+ Multicaster& mcaster(const uint32_t);
Core& core;
- UniqueIds<uint16_t> channels;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index a55986decb..f4c5a5c88f 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -37,9 +37,7 @@
namespace qpid {
namespace cluster {
-Core::Core(const Settings& s, broker::Broker& b) :
- broker(b),
- settings(s)
+Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s)
{
// FIXME aconway 2011-09-26: this has to be consistent in a
// cluster, negotiate as part of join protocol.
@@ -48,24 +46,27 @@ Core::Core(const Settings& s, broker::Broker& b) :
// FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol.
std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i);
groups.push_back(new Group(*this));
+ boost::intrusive_ptr<Group> group(groups.back());
+ // FIXME aconway 2011-10-03: clean up, all Handler ctors take Group.
boost::intrusive_ptr<QueueHandler> queueHandler(
- new QueueHandler(groups[i]->getEventHandler(), groups[i]->getMulticaster(), settings));
- groups[i]->getEventHandler().add(queueHandler);
- groups[i]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
- new WiringHandler(groups[i]->getEventHandler(), queueHandler, broker)));
- groups[i]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
- new MessageHandler(groups[i]->getEventHandler(), *this)));
+ new QueueHandler(group->getEventHandler(), group->getMulticaster(), settings));
+ group->getEventHandler().add(queueHandler);
+ group->getEventHandler().add(
+ boost::intrusive_ptr<HandlerBase>(
+ new WiringHandler(group->getEventHandler(), queueHandler, broker)));
+ group->getEventHandler().add(
+ boost::intrusive_ptr<HandlerBase>(new MessageHandler(*group, *this)));
std::auto_ptr<BrokerContext> bh(new BrokerContext(*this));
brokerHandler = bh.get();
// BrokerContext belongs to Broker
broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
// FIXME aconway 2011-09-26: multi-group
- groups[i]->getEventHandler().start();
- groups[i]->getEventHandler().getCpg().join(groupName);
+ group->getEventHandler().start();
+ group->getEventHandler().getCpg().join(groupName);
// TODO aconway 2010-11-18: logging standards
// FIXME aconway 2011-09-26: multi-group
- QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< groups[i]->getEventHandler().getSelf());
+ QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< group->getEventHandler().getSelf());
}
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.cpp b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
index 3a5c1989ae..17615fccc8 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.cpp
@@ -22,6 +22,8 @@
#include "Core.h"
#include "EventHandler.h"
#include "Multicaster.h"
+#include "MessageHolder.h"
+#include "MessageBuilders.h"
#include "qpid/broker/Broker.h"
@@ -40,7 +42,9 @@ Group::Group(Core& core) :
multicaster(
new Multicaster(eventHandler->getCpg(),
core.getBroker().getPoller(),
- boost::bind(&Core::fatal, &core)))
+ boost::bind(&Core::fatal, &core))),
+ messageHolder(new MessageHolder()),
+ messageBuilders(new MessageBuilders(&core.getBroker().getStore()))
{}
Group::~Group() {}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Group.h b/qpid/cpp/src/qpid/cluster/exp/Group.h
index 15579b2665..0bd1fd2277 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Group.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Group.h
@@ -37,6 +37,8 @@ class Cpg;
class Core;
class EventHandler;
class Multicaster;
+class MessageBuilders;
+class MessageHolder;
/**
* A CPG instance with an event handler and a multi-caster,
@@ -50,13 +52,18 @@ class Group : public RefCounted
EventHandler& getEventHandler() { return *eventHandler; }
Multicaster& getMulticaster() { return *multicaster; }
+ MessageHolder& getMessageHolder() { return *messageHolder; }
+ MessageBuilders& getMessageBuilders() { return *messageBuilders; }
void mcast(const framing::AMQBody&);
void mcast(const framing::AMQFrame&);
private:
std::auto_ptr<EventHandler> eventHandler;
std::auto_ptr<Multicaster> multicaster;
+ std::auto_ptr<MessageHolder> messageHolder;
+ std::auto_ptr<MessageBuilders> messageBuilders;
};
+
}} // namespace qpid::cluster::exp
#endif /*!QPID_CLUSTER_EXP_GROUP_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 1608d0e6ec..a06756e87b 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -25,6 +25,7 @@
#include "QueueContext.h"
#include "EventHandler.h"
#include "PrettyId.h"
+#include "Group.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueRegistry.h"
@@ -42,11 +43,12 @@ namespace cluster {
using namespace broker;
using namespace framing;
-MessageHandler::MessageHandler(EventHandler& e, Core& c) :
- HandlerBase(e),
+MessageHandler::MessageHandler(Group& g, Core& c) :
+ HandlerBase(g.getEventHandler()),
broker(c.getBroker()),
core(c),
- messageBuilders(&c.getBroker().getStore())
+ messageBuilders(g.getMessageBuilders()),
+ messageHolder(g.getMessageHolder())
{}
bool MessageHandler::handle(const framing::AMQFrame& frame) {
@@ -59,12 +61,16 @@ bool MessageHandler::handle(const framing::AMQFrame& frame) {
{
boost::shared_ptr<broker::Queue> queue;
boost::intrusive_ptr<broker::Message> message;
- if (messageBuilders.handle(sender(), frame, queue, message)) {
+ if (sender() == self())
+ messageHolder.check(frame, queue, message);
+ else
+ messageBuilders.handle(sender(), frame, queue, message);
+ if (message) {
BrokerContext::ScopedSuppressReplication ssr;
queue->deliver(message);
+ if (sender() == self()) // Async completion
+ message->getIngressCompletion().finishCompleter();
}
- // FIXME aconway 2011-09-29: async completion goes here.
- // For own messages need to release the channel assigned by BrokerContext.
return true;
}
return false;
@@ -79,9 +85,12 @@ boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
}
void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
- // FIXME aconway 2011-09-28: don't re-decode my own messages
- messageBuilders.announce(sender(), channel, queue);
+ // We only need to build message from other brokers, our own messages
+ // are held by the MessageHolder.
+ if (sender() != self()) {
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
+ messageBuilders.announce(sender(), channel, queue);
+ }
}
// FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
index f1d3dc2726..ff5afefe27 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
@@ -26,6 +26,7 @@
#include "HandlerBase.h"
#include "MessageBuilders.h"
+#include "MessageHolder.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include <boost/intrusive_ptr.hpp>
#include <map>
@@ -42,6 +43,8 @@ namespace cluster {
class EventHandler;
class BrokerContext;
class Core;
+class Group;
+class MessageHolder;
// FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for queue.
// Make this consistent.
@@ -53,7 +56,7 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
public HandlerBase
{
public:
- MessageHandler(EventHandler&, Core&);
+ MessageHandler(Group&, Core&);
bool handle(const framing::AMQFrame&);
@@ -67,8 +70,10 @@ class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
broker::Broker& broker;
Core& core;
- MessageBuilders messageBuilders;
+ MessageBuilders& messageBuilders;
+ MessageHolder& messageHolder;
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
new file mode 100644
index 0000000000..4cadb033f0
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageHolder.h"
+#include "qpid/broker/Message.h"
+
+namespace qpid {
+namespace cluster {
+
+uint16_t MessageHolder::sending(const MessagePtr& msg, const QueuePtr& q) {
+ sys::Mutex::ScopedLock l(lock);
+ Channel channel = getChannel(l);
+ messages[channel] = std::make_pair(msg,q);
+ return channel;
+}
+
+bool MessageHolder::check(const framing::AMQFrame& frame,
+ QueuePtr& queueOut, MessagePtr& msgOut)
+{
+ if (frame.getEof() && frame.getEos()) { //end of frameset
+ sys::Mutex::ScopedLock l(lock);
+ MessageMap::iterator i = messages.find(frame.getChannel());
+ assert(i != messages.end());
+ msgOut = i->second.first;
+ queueOut = i->second.second;
+ messages.erase(frame.getChannel()); // re-use the frame.
+ return true;
+ }
+ return false;
+}
+
+MessageHolder::Channel MessageHolder::getChannel(const sys::Mutex::ScopedLock&) {
+ sys::Mutex::ScopedLock l(lock);
+ Channel old = mark;
+ while (messages.find(++mark) != messages.end())
+ assert(mark != old); // check wrap-around
+ return mark;
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHolder.h b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.h
new file mode 100644
index 0000000000..55d7b96260
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHolder.h
@@ -0,0 +1,77 @@
+#ifndef QPID_CLUSTER_EXP_MESSAGEHOLDER_H
+#define QPID_CLUSTER_EXP_MESSAGEHOLDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include "qpid/framing/AMQFrame.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Queue;
+}
+
+namespace cluster {
+
+/**
+ * Holds locally-received messages until the corresponding transfer
+ * multicast is self delivered.
+ * THREAD SAFE: updated in broker and deliver threads.
+ */
+class MessageHolder
+{
+ public:
+ typedef uint16_t Channel;
+ typedef boost::intrusive_ptr<broker::Message> MessagePtr;
+ typedef boost::shared_ptr<broker::Queue> QueuePtr;
+
+ MessageHolder() : mark(0) {}
+
+ /**
+ * Called in broker thread by BrokerContext just before a local message is enqueud.
+ *@return channel to use to send the message.
+ */
+ uint16_t sending(const MessagePtr& msg, const QueuePtr &q);
+
+ /** Called in deliver thread by MessageHandler as frames are received.
+ * If this is the last frame of the message, return the corresponding local message
+ *@return true if this is the last frame.
+ */
+ bool check(const framing::AMQFrame& frame, QueuePtr& queueOut, MessagePtr& msgOut);
+
+ private:
+ typedef std::pair<MessagePtr, QueuePtr> MessageAndQueue;
+ typedef std::map<Channel, MessageAndQueue> MessageMap;
+
+ Channel getChannel(const sys::Mutex::ScopedLock&);
+
+ sys::Mutex lock;
+ MessageMap messages;
+ Channel mark;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_MESSAGEHOLDER_H*/