diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 138 |
1 files changed, 78 insertions, 60 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 312d1e90e3..bea336644f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -22,6 +22,7 @@ #include "UpdateClient.h" #include "FailoverExchange.h" +#include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" @@ -91,7 +92,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : cpg(*this), name(settings.name), myUrl(settings.url.empty() ? Url() : Url(settings.url)), - myId(cpg.self()), + self(cpg.self()), readMax(settings.readMax), writeEstimate(settings.writeEstimate), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), @@ -104,8 +105,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), - decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections), - expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), + expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())), frameId(0), initialized(false), state(INIT), @@ -213,7 +213,7 @@ void Cluster::deliver( framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); e.setSequence(sequence++); - if (from == myId) // Record self-deliveries for flow control. + if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e); } @@ -227,42 +227,33 @@ void Cluster::deliver(const Event& e) { // Handler for deliverEventQueue void Cluster::deliveredEvent(const Event& e) { QPID_LATENCY_RECORD("delivered event queue", e); - Buffer buf(const_cast<char*>(e.getData()), e.getSize()); - if (e.getType() == CONTROL) { - AMQFrame frame; - while (frame.decode(buf)) { - // Check for deliver close here so we can erase the - // connection decoder safely in this thread. - if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>()) - decoder.erase(e.getConnectionId()); - deliverFrameQueue.push(EventFrame(e, frame)); - } + Mutex::ScopedLock l(lock); + if (e.isCluster()) { // Cluster control, process in this thread. + AMQFrame frame(e.getFrame()); + ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l); + if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) + throw Exception(QPID_MSG("Invalid cluster control")); } - else if (e.getType() == DATA) - decoder.decode(e, e.getData()); + else if (state >= CATCHUP) { // Connection frame, push onto deliver queue. + if (e.getType() == CONTROL) + connectionFrame(EventFrame(e, e.getFrame())); + else + connections.decode(e, e.getData()); + } + else // connection frame && state < CATCHUP. Drop. + QPID_LOG(trace, *this << " DROP: " << e); } // Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { - Mutex::ScopedLock l(lock); - const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock? + assert(!e.isCluster()); // Only connection frames on this queue. QPID_LOG(trace, *this << " DLVR: " << e); - QPID_LATENCY_RECORD("delivered frame queue", e.frame); - if (e.isCluster()) { // Cluster control frame - ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); - if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); - } - else { // Connection frame. - if (state <= UPDATEE) { - QPID_LOG(trace, *this << " DROP: " << e); - return; - } - boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); - if (connection) // Ignore frames to closed local connections. - connection->deliveredFrame(e); - } - QPID_LATENCY_RECORD("processed", e.frame); + if (e.type == DATA) // Sequence number to identify data frames. + const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); + if (connection) // Ignore frames to closed local connections. + connection->deliveredFrame(e); } struct AddrList { @@ -310,7 +301,7 @@ void Cluster::configChange ( std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); - deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId)); + deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); } void Cluster::setReady(Lock&) { @@ -323,7 +314,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& bool memberChange = map.configChange(addresses); if (state == LEFT) return; - if (!map.isAlive(myId)) { // Final config change. + if (!map.isAlive(self)) { // Final config change. leave(l); return; } @@ -332,16 +323,16 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (map.aliveCount() == 1) { setClusterId(true); setReady(l); - map = ClusterMap(myId, myUrl, true); + map = ClusterMap(self, myUrl, true); memberUpdate(l); QPID_LOG(notice, *this << " first in cluster"); } else { // Joining established group. state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); elders = map.getAlive(); - elders.erase(myId); + elders.erase(self); broker.getLinks().setPassive(true); } } @@ -361,7 +352,7 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self); } } @@ -388,17 +379,29 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); - if (state == CATCHUP && id == myId) { + if (state == CATCHUP && id == self) { setReady(l); QPID_LOG(notice, *this << " caught up, active cluster member"); } } +void Cluster::stall(Lock&) { + // Stop processing the deliveredEventQueue in order to send or + // recieve an update. + deliverEventQueue.stop(); +} + +void Cluster::unstall(Lock&) { + // Stop processing the deliveredEventQueue in order to send or + // recieve an update. + deliverEventQueue.start(); +} + void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); - if (updater == myId) { + if (updater == self) { assert(state == OFFER); if (url) { // My offer was first. updateStart(updatee, *url, l); @@ -409,29 +412,29 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu makeOffer(map.firstJoiner(), l); // Maybe make another offer. } } - else if (updatee == myId && url) { + else if (updatee == self && url) { assert(state == JOINER); setClusterId(uuid); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); - deliverFrameQueue.stop(); + stall(l); checkUpdateIn(l); } } -void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { +void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { if (state == LEFT) return; assert(state == OFFER); state = UPDATER; QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); - deliverFrameQueue.stop(); + stall(l); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. client::ConnectionSettings cs; cs.username = settings.username; cs.password = settings.password; cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), cs)); @@ -445,13 +448,13 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { checkUpdateIn(l); } -void Cluster::checkUpdateIn(Lock& ) { +void Cluster::checkUpdateIn(Lock& l) { if (state == UPDATEE && updatedMap) { map = *updatedMap; - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; QPID_LOG(info, *this << " received update, starting catch-up"); - deliverFrameQueue.start(); + unstall(l); } } @@ -465,7 +468,7 @@ void Cluster::updateOutDone(Lock& l) { assert(state == UPDATER); state = READY; mcast.release(); - deliverFrameQueue.start(); + unstall(l); makeOffer(map.firstJoiner(), l); // Try another offer } @@ -490,7 +493,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s { _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; stringstream stream; - stream << myId; + stream << self; if (iargs.i_brokerId == stream.str()) stopClusterNode(l); } @@ -511,7 +514,7 @@ void Cluster::stopClusterNode(Lock& l) { void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(), myId); + mcast.mcastControl(ClusterShutdownBody(), self); } void Cluster::memberUpdate(Lock& l) { @@ -522,12 +525,12 @@ void Cluster::memberUpdate(Lock& l) { failoverExchange->setUrls(urls); if (size == 1 && lastSize > 1 && state >= CATCHUP) { - QPID_LOG(info, *this << " last broker standing, update queue policies"); + QPID_LOG(notice, *this << " last broker standing, update queue policies"); lastBroker = true; broker.getQueues().updateQueueClusterState(true); } else if (size > 1 && lastBroker) { - QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); + QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); lastBroker = false; broker.getQueues().updateQueueClusterState(false); } @@ -549,17 +552,25 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_memberIDs(idstr); } - // Close connections belonging to members that have now been excluded - connections.update(myId, map); + // Generate a deliver-close control frame for connections + // belonging to defunct members, so they will be erased in the + // deliverFrameQueue thread. + ConnectionMap::Vector c = connections.values(); + for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) { + ConnectionId cid = (*i)->getId(); + MemberId mid = cid.getMember(); + if (mid != self && !map.isMember(mid)) + connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody()))); + } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.myId << "(" << STATE[cluster.state] << ")"; + return o << cluster.self << "(" << STATE[cluster.state] << ")"; } MemberId Cluster::getId() const { - return myId; // Immutable, no need to lock. + return self; // Immutable, no need to lock. } broker::Broker& Cluster::getBroker() const { @@ -578,7 +589,7 @@ void Cluster::setClusterId(const Uuid& uuid) { clusterId = uuid; if (mgmtObject) { stringstream stream; - stream << myId; + stream << self; mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } @@ -589,4 +600,11 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } +void Cluster::connectionFrame(const EventFrame& frame) { + // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition. + // Measure performance impact, restore with better locking. + // deliverFrameQueue.push(frame); + deliveredFrame(frame); +} + }} // namespace qpid::cluster |