diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 32 |
1 files changed, 28 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index a106ec128b..b02fa16ae9 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -28,10 +28,14 @@ namespace qpid { namespace cluster { -Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : - onError(onError_), cpg(cpg_), +Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_, + const boost::shared_ptr<sys::Poller>& poller, + boost::function<void()> onError_) : + onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true) + holding(true), + mcastMax(mcastMax_), + pending(0) { queue.start(); } @@ -56,6 +60,7 @@ void Multicaster::mcast(const Event& e) { } } queue.push(e); + } @@ -64,9 +69,18 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; - if (!cpg.mcast(&iov, 1)) break; // returns false for flow control + if (!cpg.mcast(&iov, 1)) + break; // cpg.mcast returns false for flow control QPID_LOG(trace, " MCAST " << *i); ++i; + if (mcastMax) { + sys::Mutex::ScopedLock l(lock); + assert(pending < mcastMax); + if (++pending == mcastMax) { + queue.stop(); + break ; + } + } } values.erase(values.begin(), i); } @@ -84,4 +98,14 @@ void Multicaster::release() { holdingQueue.clear(); } +void Multicaster::delivered(const Event&) { + sys::Mutex::ScopedLock l(lock); + if (mcastMax) { + assert(pending <= mcastMax); + if (pending == mcastMax) + queue.start(); + --pending; + } +} + }} // namespace qpid::cluster |