diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 89 |
1 files changed, 51 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d31fa07c57..05a7a9efff 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -96,7 +96,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b writeEstimate(writeEstimate_), mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), + deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller), + deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller), state(INIT), lastSize(0), lastBroker(false) @@ -111,7 +112,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); failoverExchange.reset(new FailoverExchange(this)); dispatcher.start(); - deliverQueue.start(); + deliverEventQueue.start(); + deliverFrameQueue.start(); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); if (quorum_) quorum.init(); cpg.join(name); @@ -191,14 +193,14 @@ void Cluster::deliver( void Cluster::deliver(const Event& e, Lock&) { if (state == LEFT) return; QPID_LOG(trace, *this << " PUSH: " << e); - deliverQueue.push(e); + QPID_LATENCY_INIT(e); + deliverEventQueue.push(e); } -// Entry point: called when deliverQueue has events to process. -void Cluster::delivered(PollableEventQueue::Queue& events) { +// Entry point: called when deliverEventQueue has events to process. +void Cluster::deliveredEvents(PollableEventQueue::Queue& events) { try { - for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i) - deliveredEvent(*i, i->getData()); + for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1)); events.clear(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); @@ -206,41 +208,52 @@ void Cluster::delivered(PollableEventQueue::Queue& events) { } } -void Cluster::deliveredEvent(const EventHeader& e, const char* data) { - QPID_LATENCY_RECORD("deliver queue", e); - Buffer buf(const_cast<char*>(data), e.getSize()); - AMQFrame frame; - if (e.isCluster()) { - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); - Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? - ClusterDispatcher dispatch(*this, e.getMemberId(), l); - if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); - } +void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) { + try { + for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1)); + frames.clear(); + } catch (const std::exception& e) { + QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); + leave(); } - else { // e.isConnection() +} + +void Cluster::deliveredEvent(const Event& e) { + QPID_LATENCY_RECORD("delivered event queue", e); + Buffer buf(const_cast<char*>(e.getData()), e.getSize()); + boost::intrusive_ptr<Connection> connection; + if (e.isConnection()) { if (state == NEWBIE) { QPID_LOG(trace, *this << " DROP: " << e); + return; } - else { - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); - if (!connection) return; - if (e.getType() == CONTROL) { - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); - connection->delivered(frame); - } - } - else { - QPID_LOG(trace, *this << " DLVR: " << e); - connection->deliverBuffer(buf); - } + connection = getConnection(e.getConnectionId()); + if (!connection) return; + } + if (e.getType() == CONTROL) { + AMQFrame frame; + while (frame.decode(buf)) { + deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame)); } } - QPID_LATENCY_RECORD("decode+execute", e); + else if (e.getType() == DATA) { + connection->deliveredEvent(e, deliverFrameQueue); + } } +void Cluster::deliveredFrame(const EventFrame& e) { + QPID_LOG(trace, *this << " DLVR: " << e.frame); + if (e.connection) { + e.connection->deliveredFrame(e); + } + else { + Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? + ClusterDispatcher dispatch(*this, e.member, l); + if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) + throw Exception(QPID_MSG("Invalid cluster control")); + } +} + struct AddrList { const cpg_address* addrs; int count; @@ -379,7 +392,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& setClusterId(uuid); state = DUMPEE; QPID_LOG(info, *this << " receiving dump from " << dumper); - deliverQueue.stop(); + deliverEventQueue.stop(); checkDumpIn(l); } } @@ -389,7 +402,7 @@ void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) { assert(state == OFFER); state = DUMPER; QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url); - deliverQueue.stop(); + deliverEventQueue.stop(); if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread( new DumpClient(myId, dumpee, url, broker, map, connections.values(), @@ -411,7 +424,7 @@ void Cluster::checkDumpIn(Lock& ) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; QPID_LOG(info, *this << " received dump, starting catch-up"); - deliverQueue.start(); + deliverEventQueue.start(); } } @@ -425,7 +438,7 @@ void Cluster::dumpOutDone(Lock& l) { state = READY; mcast.release(); QPID_LOG(info, *this << " sent dump"); - deliverQueue.start(); + deliverEventQueue.start(); tryMakeOffer(map.firstNewbie(), l); // Try another offer } |