summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Multicaster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp32
1 files changed, 28 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index a106ec128b..b02fa16ae9 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -28,10 +28,14 @@
namespace qpid {
namespace cluster {
-Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) :
- onError(onError_), cpg(cpg_),
+Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_,
+ const boost::shared_ptr<sys::Poller>& poller,
+ boost::function<void()> onError_) :
+ onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- holding(true)
+ holding(true),
+ mcastMax(mcastMax_),
+ pending(0)
{
queue.start();
}
@@ -56,6 +60,7 @@ void Multicaster::mcast(const Event& e) {
}
}
queue.push(e);
+
}
@@ -64,9 +69,18 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
PollableEventQueue::Queue::iterator i = values.begin();
while( i != values.end()) {
iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() };
- if (!cpg.mcast(&iov, 1)) break; // returns false for flow control
+ if (!cpg.mcast(&iov, 1))
+ break; // cpg.mcast returns false for flow control
QPID_LOG(trace, " MCAST " << *i);
++i;
+ if (mcastMax) {
+ sys::Mutex::ScopedLock l(lock);
+ assert(pending < mcastMax);
+ if (++pending == mcastMax) {
+ queue.stop();
+ break ;
+ }
+ }
}
values.erase(values.begin(), i);
}
@@ -84,4 +98,14 @@ void Multicaster::release() {
holdingQueue.clear();
}
+void Multicaster::delivered(const Event&) {
+ sys::Mutex::ScopedLock l(lock);
+ if (mcastMax) {
+ assert(pending <= mcastMax);
+ if (pending == mcastMax)
+ queue.start();
+ --pending;
+ }
+}
+
}} // namespace qpid::cluster