summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Multicaster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp21
1 files changed, 15 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 4a8195438f..d57ff76941 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_,
boost::function<void()> onError_) :
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- ready(false)
-{
- queue.start();
-}
+ ready(false), bypass(true)
+{}
void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
mcast(Event::control(body, id));
@@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e) {
}
}
QPID_LOG(trace, "MCAST " << e);
- queue.push(e);
+ if (bypass) { // direct, don't queue
+ iovec iov = e.toIovec();
+ // FIXME aconway 2010-03-10: should do limited retry.
+ while (!cpg.mcast(&iov, 1))
+ ;
+ }
+ else
+ queue.push(e);
}
-
Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
try {
PollableEventQueue::Batch::const_iterator i = values.begin();
@@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co
}
}
+void Multicaster::start() {
+ queue.start();
+ bypass = false;
+}
+
void Multicaster::setReady() {
sys::Mutex::ScopedLock l(lock);
ready = true;