diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 124 |
1 files changed, 60 insertions, 64 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 6a19b8e4ea..eaa4a720b1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -20,7 +20,6 @@ #include "Connection.h" #include "UpdateClient.h" #include "FailoverExchange.h" -#include "ClusterQueueHandler.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" @@ -92,8 +91,16 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b writeEstimate(writeEstimate_), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller), - deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller), + deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), + boost::bind(&Cluster::leave, this), + "Error decoding events", + poller), + deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1), + boost::bind(&Cluster::leave, this), + "Error delivering frames", + poller), + connections(*this), + decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), state(INIT), lastSize(0), lastBroker(false), @@ -121,12 +128,23 @@ Cluster::~Cluster() { if (updateThread.id()) updateThread.join(); // Join the previous updatethread. } -void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { - connections.insert(c->getId(), c); +// Called in connection thread to insert a client connection. +void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { + Lock l(lock); + connections.insert(c); } -void Cluster::erase(ConnectionId id) { +// Called in connection thread to insert an updated shadow connection. +void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { + Lock l(lock); + assert(state <= UPDATEE); // Only during update. + connections.insert(c); +} + +void Cluster::erase(const ConnectionId& id) { + // Called only by Connection::deliverClose in deliver thread, no need to lock. connections.erase(id); + decoder.erase(id); } std::vector<string> Cluster::getIds() const { @@ -168,17 +186,7 @@ void Cluster::leave(Lock&) { } } -boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId) { - boost::intrusive_ptr<Connection> cp = connections.find(connectionId); - if (!cp && connectionId.getMember() != myId) { // New shadow connection - std::ostringstream mgmtId; - mgmtId << name << ":" << connectionId; - cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId); - connections.insert(connectionId, cp); - } - return cp; -} - +// Deliver CPG message. void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, @@ -187,58 +195,52 @@ void Cluster::deliver( void* msg, int msg_len) { - Mutex::ScopedLock l(lock); MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); e.setSequence(sequence++); if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); - deliver(e, l); + deliver(e); } -void Cluster::deliver(const Event& e, Lock&) { +void Cluster::deliver(const Event& e) { if (state == LEFT) return; QPID_LATENCY_INIT(e); deliverEventQueue.push(e); } -// Entry point: called when deliverEventQueue has events to process. +// Handler for deliverEventQueue void Cluster::deliveredEvent(const Event& e) { QPID_LATENCY_RECORD("delivered event queue", e); Buffer buf(const_cast<char*>(e.getData()), e.getSize()); - boost::intrusive_ptr<Connection> connection; - if (e.isConnection()) { - if (state <= UPDATEE) { - QPID_LOG(trace, *this << " DROP: " << e); - return; - } - connection = getConnection(e.getConnectionId()); - if (!connection) return; - } if (e.getType() == CONTROL) { AMQFrame frame; - while (frame.decode(buf)) { - deliverFrameQueue.push(EventFrame(connection, e, frame)); - } - } - else if (e.getType() == DATA) { - connection->deliveredEvent(e, deliverFrameQueue); + while (frame.decode(buf)) + deliverFrameQueue.push(EventFrame(e, frame)); } + else if (e.getType() == DATA) + decoder.decode(e, e.getData()); } +// Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { + Mutex::ScopedLock l(lock); QPID_LOG(trace, *this << " DLVR: " << e); QPID_LATENCY_RECORD("delivered frame queue", e.frame); - if (e.connection) { - e.connection->deliveredFrame(e); - } - else { - Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? - ClusterDispatcher dispatch(*this, e.member, l); + if (e.isCluster()) { // Cluster control frame + ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } + else { // Connection frame. + if (state <= UPDATEE) { + QPID_LOG(trace, *this << " DROP: " << e); + return; + } + boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); + connection->deliveredFrame(e); + } QPID_LATENCY_RECORD("processed", e.frame); } @@ -282,7 +284,13 @@ void Cluster::configChange ( std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); - deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId), l); + deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId)); +} + +void Cluster::setReady(Lock&) { + state = READY; + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + mcast.release(); } void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { @@ -296,12 +304,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { - setClusterId(true); - // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release() - state = READY; - mcast.release(); QPID_LOG(notice, *this << " first in cluster"); - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + setClusterId(true); + setReady(l); map = ClusterMap(myId, myUrl, true); memberUpdate(l); } @@ -325,9 +330,6 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& } } - - - void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -361,11 +363,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); if (state == CATCHUP && id == myId) { - state = READY; - mcast.release(); QPID_LOG(notice, *this << " caught up, active cluster member"); - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - mcast.release(); + setReady(l); } } @@ -379,8 +378,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu updateStart(updatee, *url, l); } else { // Another offer was first. - state = READY; - mcast.release(); + setReady(l); QPID_LOG(info, *this << " cancelled update offer to " << updatee); tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer. } @@ -390,7 +388,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu setClusterId(uuid); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); - deliverEventQueue.stop(); + deliverFrameQueue.stop(); checkUpdateIn(l); } } @@ -400,7 +398,7 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { assert(state == OFFER); state = UPDATER; QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); - deliverEventQueue.stop(); + deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. updateThread = Thread( new UpdateClient(myId, updatee, url, broker, map, connections.values(), @@ -422,7 +420,7 @@ void Cluster::checkUpdateIn(Lock& ) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; QPID_LOG(info, *this << " received update, starting catch-up"); - deliverEventQueue.start(); + deliverFrameQueue.start(); } } @@ -432,11 +430,11 @@ void Cluster::updateOutDone() { } void Cluster::updateOutDone(Lock& l) { + QPID_LOG(info, *this << " sent update"); assert(state == UPDATER); state = READY; mcast.release(); - QPID_LOG(info, *this << " sent update"); - deliverEventQueue.start(); + deliverFrameQueue.start(); tryMakeOffer(map.firstJoiner(), l); // Try another offer } @@ -504,8 +502,6 @@ void Cluster::memberUpdate(Lock& l) { } lastSize = size; - // - if (mgmtObject) { mgmtObject->set_clusterSize(size); string urlstr; |