diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 32 |
1 files changed, 20 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 7e97963318..4123e11c92 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -24,10 +24,14 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" +#include <boost/bind.hpp> +#include <algorithm> namespace qpid { namespace cluster { +static const int MCAST_IOV_MAX=63; // Limit imposed by CPG + Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : @@ -36,7 +40,8 @@ Multicaster::Multicaster(Cpg& cpg_, #endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true) + holding(true), + ioVector(MCAST_IOV_MAX) { queue.start(); } @@ -70,26 +75,29 @@ void Multicaster::mcast(const Event& e) { queue.push(e); } - -Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { +Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast( + const PollableEventQueue::Batch& events) +{ + PollableEventQueue::Batch::const_iterator i = events.begin(); try { - PollableEventQueue::Batch::const_iterator i = values.begin(); - while( i != values.end()) { - iovec iov = i->toIovec(); - if (!cpg.mcast(&iov, 1)) { - // cpg didn't send because of CPG flow control. - break; + while (i < events.end()) { + size_t count = std::min(MCAST_IOV_MAX, int(events.end() - i)); + std::transform(i, i+count, ioVector.begin(), + boost::bind(&Event::toIovec, _1)); + if (!cpg.mcast(&ioVector.front(), count)) { + QPID_LOG(trace, "CPG flow control, will resend " + << events.end() - i << " events"); + break; } - ++i; + i += count; } - return i; } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); queue.stop(); onError(); - return values.end(); } + return i; } void Multicaster::release() { |