summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp89
1 files changed, 51 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d31fa07c57..05a7a9efff 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -96,7 +96,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
writeEstimate(writeEstimate_),
mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
- deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
+ deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller),
+ deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller),
state(INIT),
lastSize(0),
lastBroker(false)
@@ -111,7 +112,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
failoverExchange.reset(new FailoverExchange(this));
dispatcher.start();
- deliverQueue.start();
+ deliverEventQueue.start();
+ deliverFrameQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
if (quorum_) quorum.init();
cpg.join(name);
@@ -191,14 +193,14 @@ void Cluster::deliver(
void Cluster::deliver(const Event& e, Lock&) {
if (state == LEFT) return;
QPID_LOG(trace, *this << " PUSH: " << e);
- deliverQueue.push(e);
+ QPID_LATENCY_INIT(e);
+ deliverEventQueue.push(e);
}
-// Entry point: called when deliverQueue has events to process.
-void Cluster::delivered(PollableEventQueue::Queue& events) {
+// Entry point: called when deliverEventQueue has events to process.
+void Cluster::deliveredEvents(PollableEventQueue::Queue& events) {
try {
- for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i)
- deliveredEvent(*i, i->getData());
+ for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
events.clear();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
@@ -206,41 +208,52 @@ void Cluster::delivered(PollableEventQueue::Queue& events) {
}
}
-void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
- QPID_LATENCY_RECORD("deliver queue", e);
- Buffer buf(const_cast<char*>(data), e.getSize());
- AMQFrame frame;
- if (e.isCluster()) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
- ClusterDispatcher dispatch(*this, e.getMemberId(), l);
- if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
+void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) {
+ try {
+ for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1));
+ frames.clear();
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
+ leave();
}
- else { // e.isConnection()
+}
+
+void Cluster::deliveredEvent(const Event& e) {
+ QPID_LATENCY_RECORD("delivered event queue", e);
+ Buffer buf(const_cast<char*>(e.getData()), e.getSize());
+ boost::intrusive_ptr<Connection> connection;
+ if (e.isConnection()) {
if (state == NEWBIE) {
QPID_LOG(trace, *this << " DROP: " << e);
+ return;
}
- else {
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
- if (!connection) return;
- if (e.getType() == CONTROL) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- connection->delivered(frame);
- }
- }
- else {
- QPID_LOG(trace, *this << " DLVR: " << e);
- connection->deliverBuffer(buf);
- }
+ connection = getConnection(e.getConnectionId());
+ if (!connection) return;
+ }
+ if (e.getType() == CONTROL) {
+ AMQFrame frame;
+ while (frame.decode(buf)) {
+ deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame));
}
}
- QPID_LATENCY_RECORD("decode+execute", e);
+ else if (e.getType() == DATA) {
+ connection->deliveredEvent(e, deliverFrameQueue);
+ }
}
+void Cluster::deliveredFrame(const EventFrame& e) {
+ QPID_LOG(trace, *this << " DLVR: " << e.frame);
+ if (e.connection) {
+ e.connection->deliveredFrame(e);
+ }
+ else {
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
+ ClusterDispatcher dispatch(*this, e.member, l);
+ if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
+ }
+}
+
struct AddrList {
const cpg_address* addrs;
int count;
@@ -379,7 +392,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
setClusterId(uuid);
state = DUMPEE;
QPID_LOG(info, *this << " receiving dump from " << dumper);
- deliverQueue.stop();
+ deliverEventQueue.stop();
checkDumpIn(l);
}
}
@@ -389,7 +402,7 @@ void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
assert(state == OFFER);
state = DUMPER;
QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
- deliverQueue.stop();
+ deliverEventQueue.stop();
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
new DumpClient(myId, dumpee, url, broker, map, connections.values(),
@@ -411,7 +424,7 @@ void Cluster::checkDumpIn(Lock& ) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
state = CATCHUP;
QPID_LOG(info, *this << " received dump, starting catch-up");
- deliverQueue.start();
+ deliverEventQueue.start();
}
}
@@ -425,7 +438,7 @@ void Cluster::dumpOutDone(Lock& l) {
state = READY;
mcast.release();
QPID_LOG(info, *this << " sent dump");
- deliverQueue.start();
+ deliverEventQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
}