diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 3 |
4 files changed, 28 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e35d3e4175..7bdc066767 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -323,9 +323,11 @@ void Cluster::deliver( { MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); - Event e(Event::decodeCopy(from, buf)); - LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); - deliverEvent(e); + while (buf.available()) { + Event e(Event::decodeCopy(from, buf)); + LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); + deliverEvent(e); + } } LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 30866d3154..9fcf9f5ce1 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -72,7 +72,7 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { if (buf.available() < e.size) throw Exception("Not enough data for multicast event"); e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); - memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); + buf.getRawData((uint8_t*)(e.getData()), e.size); return e; } diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 7e97963318..4123e11c92 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -24,10 +24,14 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" +#include <boost/bind.hpp> +#include <algorithm> namespace qpid { namespace cluster { +static const int MCAST_IOV_MAX=63; // Limit imposed by CPG + Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : @@ -36,7 +40,8 @@ Multicaster::Multicaster(Cpg& cpg_, #endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true) + holding(true), + ioVector(MCAST_IOV_MAX) { queue.start(); } @@ -70,26 +75,29 @@ void Multicaster::mcast(const Event& e) { queue.push(e); } - -Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { +Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast( + const PollableEventQueue::Batch& events) +{ + PollableEventQueue::Batch::const_iterator i = events.begin(); try { - PollableEventQueue::Batch::const_iterator i = values.begin(); - while( i != values.end()) { - iovec iov = i->toIovec(); - if (!cpg.mcast(&iov, 1)) { - // cpg didn't send because of CPG flow control. - break; + while (i < events.end()) { + size_t count = std::min(MCAST_IOV_MAX, int(events.end() - i)); + std::transform(i, i+count, ioVector.begin(), + boost::bind(&Event::toIovec, _1)); + if (!cpg.mcast(&ioVector.front(), count)) { + QPID_LOG(trace, "CPG flow control, will resend " + << events.end() - i << " events"); + break; } - ++i; + i += count; } - return i; } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); queue.stop(); onError(); - return values.end(); } + return i; } void Multicaster::release() { diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index f2ee5099bb..652900aa0f 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -29,6 +29,7 @@ #include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> #include <deque> +#include <vector> namespace qpid { @@ -72,7 +73,7 @@ class Multicaster PollableEventQueue queue; bool holding; PlainEventQueue holdingQueue; - std::vector<struct ::iovec> ioVector; + std::vector< ::iovec> ioVector; }; }} // namespace qpid::cluster |