diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 32 |
1 files changed, 12 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 4123e11c92..7e97963318 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -24,14 +24,10 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" -#include <boost/bind.hpp> -#include <algorithm> namespace qpid { namespace cluster { -static const int MCAST_IOV_MAX=63; // Limit imposed by CPG - Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : @@ -40,8 +36,7 @@ Multicaster::Multicaster(Cpg& cpg_, #endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true), - ioVector(MCAST_IOV_MAX) + holding(true) { queue.start(); } @@ -75,29 +70,26 @@ void Multicaster::mcast(const Event& e) { queue.push(e); } -Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast( - const PollableEventQueue::Batch& events) -{ - PollableEventQueue::Batch::const_iterator i = events.begin(); + +Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { try { - while (i < events.end()) { - size_t count = std::min(MCAST_IOV_MAX, int(events.end() - i)); - std::transform(i, i+count, ioVector.begin(), - boost::bind(&Event::toIovec, _1)); - if (!cpg.mcast(&ioVector.front(), count)) { - QPID_LOG(trace, "CPG flow control, will resend " - << events.end() - i << " events"); - break; + PollableEventQueue::Batch::const_iterator i = values.begin(); + while( i != values.end()) { + iovec iov = i->toIovec(); + if (!cpg.mcast(&iov, 1)) { + // cpg didn't send because of CPG flow control. + break; } - i += count; + ++i; } + return i; } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); queue.stop(); onError(); + return values.end(); } - return i; } void Multicaster::release() { |