diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 21 |
1 files changed, 15 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 4a8195438f..d57ff76941 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_, boost::function<void()> onError_) : onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - ready(false) -{ - queue.start(); -} + ready(false), bypass(true) +{} void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { mcast(Event::control(body, id)); @@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e) { } } QPID_LOG(trace, "MCAST " << e); - queue.push(e); + if (bypass) { // direct, don't queue + iovec iov = e.toIovec(); + // FIXME aconway 2010-03-10: should do limited retry. + while (!cpg.mcast(&iov, 1)) + ; + } + else + queue.push(e); } - Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { try { PollableEventQueue::Batch::const_iterator i = values.begin(); @@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co } } +void Multicaster::start() { + queue.start(); + bypass = false; +} + void Multicaster::setReady() { sys::Mutex::ScopedLock l(lock); ready = true; |