summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-12 21:03:43 +0000
committerAlan Conway <aconway@apache.org>2008-08-12 21:03:43 +0000
commit6884ded02594404ac07a590b0677738baf851672 (patch)
tree12b5680470befb344bb0f490fb47c512d5043eb3
parente72b261cb7f9c24cff62cd256c2aab4ce56e4a46 (diff)
downloadqpid-python-6884ded02594404ac07a590b0677738baf851672.tar.gz
Queue cluster send frames, do cpg_mcast in separate thread, batching if possible.
5x thruput improvement :) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@685317 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp52
-rw-r--r--cpp/src/qpid/cluster/Cluster.h20
2 files changed, 50 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 37b126f5a9..84edfa201d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -68,7 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2))
+ deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)),
+ mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
@@ -83,6 +84,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
deliverQueue.start(poller);
+ mcastQueue.start(poller);
}
Cluster::~Cluster() {}
@@ -124,12 +126,26 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) {
void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
- Buffer buf(buffer);
- frame.encode(buf);
- encodePtr(buf, connection);
- iovec iov = { buffer, buf.getPosition() };
- cpg.mcast(name, &iov, 1);
+ mcastQueue.push(Message(frame, self, connection));
+}
+
+void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
+ const MessageQueue::iterator& end)
+{
+ // Static is OK because there is only one cluster allowed per
+ // process and only one thread in mcastQueueCb at a time.
+ static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
+ MessageQueue::iterator i = begin;
+ while (i != end) {
+ Buffer buf(buffer, sizeof(buffer));
+ while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) {
+ i->frame.encode(buf);
+ encodePtr(buf, i->connection);
+ ++i;
+ }
+ iovec iov = { buffer, buf.getPosition() };
+ cpg.mcast(name, &iov, 1);
+ }
}
void Cluster::notify() {
@@ -181,12 +197,14 @@ void Cluster::deliver(
Id from(nodeid, pid);
try {
Buffer buf(static_cast<char*>(msg), msg_len);
- AMQFrame frame;
- if (!frame.decode(buf)) // Not enough data.
- throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling.
- void* connection;
- decodePtr(buf, connection);
- deliverQueue.push(DeliveredFrame(frame, from, connection));
+ while (buf.available() > 0) {
+ AMQFrame frame;
+ if (!frame.decode(buf)) // Not enough data.
+ throw Exception("Received incomplete cluster event.");
+ void* connection;
+ decodePtr(buf, connection);
+ deliverQueue.push(Message(frame, from, connection));
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -196,10 +214,10 @@ void Cluster::deliver(
}
}
-void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
- const PollableQueue<DeliveredFrame>::iterator& end)
+void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
+ const MessageQueue::iterator& end)
{
- for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) {
+ for (MessageQueue::iterator i = begin; i != end; ++i) {
AMQFrame& frame(i->frame);
Id from(i->from);
ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
@@ -220,7 +238,7 @@ void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what());
+ QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what());
assert(0);
throw;
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 6f5e6d9cfb..1c43bdac43 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -97,11 +97,13 @@ class Cluster : private Cpg::Handler, public RefCounted
typedef std::map<Id, Member> MemberMap;
typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
- struct DeliveredFrame {
+ /** Message sent over the cluster. */
+ struct Message {
framing::AMQFrame frame; Id from; void* connection;
- DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c)
+ Message(const framing::AMQFrame& f, const Id i, void* c)
: frame(f), from(i), connection(c) {}
};
+ typedef PollableQueue<Message> MessageQueue;
boost::function<void()> shutdownNext;
@@ -126,10 +128,17 @@ class Cluster : private Cpg::Handler, public RefCounted
);
/** Callback to handle delivered frames from the deliverQueue. */
- void deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
- const PollableQueue<DeliveredFrame>::iterator& end);
+ void deliverQueueCb(const MessageQueue::iterator& begin,
+ const MessageQueue::iterator& end);
+ /** Callback to multi-cast frames from mcastQueue */
+ void mcastQueueCb(const MessageQueue::iterator& begin,
+ const MessageQueue::iterator& end);
+
+
+ /** Callback to dispatch CPG events. */
void dispatch(sys::DispatchHandle&);
+ /** Callback if CPG fd is disconnected. */
void disconnect(sys::DispatchHandle&);
void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
@@ -147,7 +156,8 @@ class Cluster : private Cpg::Handler, public RefCounted
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableQueue<DeliveredFrame> deliverQueue;
+ MessageQueue deliverQueue;
+ MessageQueue mcastQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);