diff options
author | Alan Conway <aconway@apache.org> | 2009-08-14 12:56:31 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-08-14 12:56:31 +0000 |
commit | 7cd4c643f79a1dac745d0049bcc9febda641c548 (patch) | |
tree | af9a299cb6ec7a234a2c993b1bfdc6d91807d194 | |
parent | e998b312406f35cd49eee45c0701ade51953b748 (diff) | |
download | qpid-python-7cd4c643f79a1dac745d0049bcc9febda641c548.tar.gz |
Revert "Batch multiple events into a single CPG multicast."
This reverts svn revision 803713: Batch multiple events into a single CPG multicast.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@804206 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Multicaster.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Multicaster.h | 3 |
4 files changed, 17 insertions, 28 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 7bdc066767..e35d3e4175 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -323,11 +323,9 @@ void Cluster::deliver( { MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); - while (buf.available()) { - Event e(Event::decodeCopy(from, buf)); - LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); - deliverEvent(e); - } + Event e(Event::decodeCopy(from, buf)); + LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); + deliverEvent(e); } LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index 9fcf9f5ce1..30866d3154 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -72,7 +72,7 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { if (buf.available() < e.size) throw Exception("Not enough data for multicast event"); e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); - buf.getRawData((uint8_t*)(e.getData()), e.size); + memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); return e; } diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index 4123e11c92..7e97963318 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -24,14 +24,10 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" -#include <boost/bind.hpp> -#include <algorithm> namespace qpid { namespace cluster { -static const int MCAST_IOV_MAX=63; // Limit imposed by CPG - Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : @@ -40,8 +36,7 @@ Multicaster::Multicaster(Cpg& cpg_, #endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true), - ioVector(MCAST_IOV_MAX) + holding(true) { queue.start(); } @@ -75,29 +70,26 @@ void Multicaster::mcast(const Event& e) { queue.push(e); } -Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast( - const PollableEventQueue::Batch& events) -{ - PollableEventQueue::Batch::const_iterator i = events.begin(); + +Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { try { - while (i < events.end()) { - size_t count = std::min(MCAST_IOV_MAX, int(events.end() - i)); - std::transform(i, i+count, ioVector.begin(), - boost::bind(&Event::toIovec, _1)); - if (!cpg.mcast(&ioVector.front(), count)) { - QPID_LOG(trace, "CPG flow control, will resend " - << events.end() - i << " events"); - break; + PollableEventQueue::Batch::const_iterator i = values.begin(); + while( i != values.end()) { + iovec iov = i->toIovec(); + if (!cpg.mcast(&iov, 1)) { + // cpg didn't send because of CPG flow control. + break; } - i += count; + ++i; } + return i; } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); queue.stop(); onError(); + return values.end(); } - return i; } void Multicaster::release() { diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h index 652900aa0f..f2ee5099bb 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.h +++ b/qpid/cpp/src/qpid/cluster/Multicaster.h @@ -29,7 +29,6 @@ #include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> #include <deque> -#include <vector> namespace qpid { @@ -73,7 +72,7 @@ class Multicaster PollableEventQueue queue; bool holding; PlainEventQueue holdingQueue; - std::vector< ::iovec> ioVector; + std::vector<struct ::iovec> ioVector; }; }} // namespace qpid::cluster |