summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Multicaster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Multicaster.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Multicaster.cpp21
1 files changed, 13 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
index 72fc1533f8..229d7edb1e 100644
--- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp
@@ -33,36 +33,41 @@ Multicaster::Multicaster(Cpg& cpg_,
boost::function<void()> onError_) :
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- holding(true)
+ ready(false)
{
queue.start();
}
void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
- QPID_LOG(trace, "MCAST " << id << ": " << body);
mcast(Event::control(body, id));
}
void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) {
- QPID_LOG(trace, "MCAST " << id << ": " << frame);
mcast(Event::control(frame, id));
}
void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
Event e(DATA, id, size);
memcpy(e.getData(), data, size);
- QPID_LOG(trace, "MCAST " << e);
mcast(e);
}
void Multicaster::mcast(const Event& e) {
{
sys::Mutex::ScopedLock l(lock);
- if (e.isConnection() && holding) {
- holdingQueue.push_back(e);
+ if (!ready) {
+ if (e.isConnection()) holdingQueue.push_back(e);
+ else {
+ iovec iov = e.toIovec();
+ // FIXME aconway 2009-11-23: configurable retry --cluster-retry
+ if (!cpg.mcast(&iov, 1))
+ throw Exception("CPG flow control error during initialization");
+ QPID_LOG(trace, "MCAST (direct) " << e);
+ }
return;
}
}
+ QPID_LOG(trace, "MCAST " << e);
queue.push(e);
}
@@ -88,9 +93,9 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co
}
}
-void Multicaster::release() {
+void Multicaster::setReady() {
sys::Mutex::ScopedLock l(lock);
- holding = false;
+ ready = true;
std::for_each(holdingQueue.begin(), holdingQueue.end(), boost::bind(&Multicaster::mcast, this, _1));
holdingQueue.clear();
}