diff options
author | Alan Conway <aconway@apache.org> | 2008-08-12 21:03:43 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-12 21:03:43 +0000 |
commit | 6a4322594d0fb830085b992065250d43b6440884 (patch) | |
tree | d2f145663438b877ea4cc679ced5481694c0838f | |
parent | 15e3e5e2eaa03992972c825d4512883755c628ca (diff) | |
download | qpid-python-6a4322594d0fb830085b992065250d43b6440884.tar.gz |
Queue cluster send frames, do cpg_mcast in separate thread, batching if possible.
5x thruput improvement :)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@685317 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 20 |
2 files changed, 50 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 37b126f5a9..84edfa201d 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -68,7 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2)) + deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)), + mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2)) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(trace, "Joining cluster: " << name_); @@ -83,6 +84,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); deliverQueue.start(poller); + mcastQueue.start(poller); } Cluster::~Cluster() {} @@ -124,12 +126,26 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) { QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. - Buffer buf(buffer); - frame.encode(buf); - encodePtr(buf, connection); - iovec iov = { buffer, buf.getPosition() }; - cpg.mcast(name, &iov, 1); + mcastQueue.push(Message(frame, self, connection)); +} + +void Cluster::mcastQueueCb(const MessageQueue::iterator& begin, + const MessageQueue::iterator& end) +{ + // Static is OK because there is only one cluster allowed per + // process and only one thread in mcastQueueCb at a time. + static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. + MessageQueue::iterator i = begin; + while (i != end) { + Buffer buf(buffer, sizeof(buffer)); + while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) { + i->frame.encode(buf); + encodePtr(buf, i->connection); + ++i; + } + iovec iov = { buffer, buf.getPosition() }; + cpg.mcast(name, &iov, 1); + } } void Cluster::notify() { @@ -181,12 +197,14 @@ void Cluster::deliver( Id from(nodeid, pid); try { Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; - if (!frame.decode(buf)) // Not enough data. - throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling. - void* connection; - decodePtr(buf, connection); - deliverQueue.push(DeliveredFrame(frame, from, connection)); + while (buf.available() > 0) { + AMQFrame frame; + if (!frame.decode(buf)) // Not enough data. + throw Exception("Received incomplete cluster event."); + void* connection; + decodePtr(buf, connection); + deliverQueue.push(Message(frame, from, connection)); + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -196,10 +214,10 @@ void Cluster::deliver( } } -void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin, - const PollableQueue<DeliveredFrame>::iterator& end) +void Cluster::deliverQueueCb(const MessageQueue::iterator& begin, + const MessageQueue::iterator& end) { - for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) { + for (MessageQueue::iterator i = begin; i != end; ++i) { AMQFrame& frame(i->frame); Id from(i->from); ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection); @@ -220,7 +238,7 @@ void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. - QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what()); + QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what()); assert(0); throw; } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 6f5e6d9cfb..1c43bdac43 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -97,11 +97,13 @@ class Cluster : private Cpg::Handler, public RefCounted typedef std::map<Id, Member> MemberMap; typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap; - struct DeliveredFrame { + /** Message sent over the cluster. */ + struct Message { framing::AMQFrame frame; Id from; void* connection; - DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c) + Message(const framing::AMQFrame& f, const Id i, void* c) : frame(f), from(i), connection(c) {} }; + typedef PollableQueue<Message> MessageQueue; boost::function<void()> shutdownNext; @@ -126,10 +128,17 @@ class Cluster : private Cpg::Handler, public RefCounted ); /** Callback to handle delivered frames from the deliverQueue. */ - void deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin, - const PollableQueue<DeliveredFrame>::iterator& end); + void deliverQueueCb(const MessageQueue::iterator& begin, + const MessageQueue::iterator& end); + /** Callback to multi-cast frames from mcastQueue */ + void mcastQueueCb(const MessageQueue::iterator& begin, + const MessageQueue::iterator& end); + + + /** Callback to dispatch CPG events. */ void dispatch(sys::DispatchHandle&); + /** Callback if CPG fd is disconnected. */ void disconnect(sys::DispatchHandle&); void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method); @@ -147,7 +156,8 @@ class Cluster : private Cpg::Handler, public RefCounted ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - PollableQueue<DeliveredFrame> deliverQueue; + MessageQueue deliverQueue; + MessageQueue mcastQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); |