diff options
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 79 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/LockedConnectionMap.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 3 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 1 |
9 files changed, 81 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 78f7bf13fc..467c960674 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -144,6 +144,13 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; +/** NOTE: increment this number whenever any incompatible changes in + * cluster protocol/behavior are made. It allows early detection and + * sensible reporting of an attempt to mix different versions in a + * cluster. + */ +const uint32_t Cluster::CLUSTER_VERSION = 1; + struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; MemberId member; @@ -153,7 +160,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } - void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } @@ -233,6 +240,7 @@ void Cluster::initialize() { // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { + QPID_LOG(debug, *this << " add local connection " << c->getId()); localConnections.insert(c); assert(c->getId().getMember() == self); // Announce the connection to the cluster. @@ -242,11 +250,14 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { + QPID_LOG(debug, *this << " add shadow connection " << c->getId()); // Safe to use connections here because we're pre-catchup, either // discarding or stalled, so deliveredFrame is not processing any // connection events. assert(discarding); - connections.insert(ConnectionMap::value_type(c->getId(), c)); + pair<ConnectionMap::iterator, bool> ib + = connections.insert(ConnectionMap::value_type(c->getId(), c)); + assert(ib.second); } void Cluster::erase(const ConnectionId& id) { @@ -317,11 +328,11 @@ void Cluster::deliver( } LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) -LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) + LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) -void Cluster::deliverEvent(const Event& e) { + void Cluster::deliverEvent(const Event& e) { LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());) - deliverEventQueue.push(e); + deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { @@ -339,16 +350,21 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData())); - QPID_LOG(trace, *this << " DLVR: " << e); if (e.isCluster()) { + QPID_LOG(trace, *this << " DLVR: " << e); EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - if (castUpdateOffer(ef.frame.getBody())) + const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); + if (offer) { + QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId() + << " to " << MemberId(offer->getUpdatee())); deliverEventQueue.stop(); + } deliverFrame(ef); } else if(!discarding) { + QPID_LOG(trace, *this << " DLVR: " << e); if (e.isControl()) deliverFrame(EventFrame(e, e.getFrame())); else { @@ -403,9 +419,8 @@ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); } - else { + else processFrame(e, l); - } } @@ -447,7 +462,7 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { mgmtId << id; cp = new Connection(*this, shadowOut, mgmtId.str(), id); } - connections.insert(ConnectionMap::value_type(id, cp)); + connections.insert(ConnectionMap::value_type(id, cp)); } return cp; } @@ -556,7 +571,8 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self); + mcast.mcastControl( + ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self); } } @@ -587,26 +603,29 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -// Go back to normal processing after an offer that did not result in an update. -void Cluster::cancelOffer(const MemberId& updatee, Lock& l) { - QPID_LOG(info, *this << " cancelled offer to " << updatee); - deliverEventQueue.start(); // Go back to normal processing - setReady(l); - makeOffer(map.firstJoiner(), l); // Maybe make another offer. -} - -void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { +void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, + uint32_t version, Lock& l) { // NOTE: deliverEventQueue has been stopped at the update offer by // deliveredEvent in case an update is required. if (state == LEFT) return; + if (version != CLUSTER_VERSION) { + QPID_LOG(critical, *this << " incompatible cluster versions " << + version << " != " << CLUSTER_VERSION); + leave(l); + return; + } MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); if (updater == self) { assert(state == OFFER); if (url) // My offer was first. updateStart(updatee, *url, l); - else // Another offer was first. - cancelOffer(updatee, l); + else { // Another offer was first. + QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall"); + setReady(l); + makeOffer(map.firstJoiner(), l); // Maybe make another offer. + deliverEventQueue.start(); // Go back to normal processing + } } else if (updatee == self && url) { assert(state == JOINER); @@ -615,8 +634,11 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu QPID_LOG(info, *this << " receiving update from " << updater); checkUpdateIn(l); } - else + else { + QPID_LOG(debug,*this << " unstall, ignore update " << updater + << " to " << updatee); deliverEventQueue.start(); // Not involved in update. + } } static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { @@ -629,21 +651,23 @@ static client::ConnectionSettings connectionSettings(const ClusterSettings& sett void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { // An offer was received while handling an error, and converted to a retract. + // Behavior is very similar to updateOffer. if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); if (updater == self) { assert(state == OFFER); if (url) { // My offer was first. - QPID_LOG(info, *this << " retracted offer to " << updatee); + QPID_LOG(info, *this << " retracting offer to " << updatee); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); } - cancelOffer(updatee, l); + setReady(l); + makeOffer(map.firstJoiner(), l); // Maybe make another offer. + // Don't unstall the event queue, that was already done in deliveredFrame } - else - deliverEventQueue.start(); // Not involved in update. + QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee); } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { @@ -672,6 +696,7 @@ void Cluster::updateInDone(const ClusterMap& m) { void Cluster::updateInRetracted() { Lock l(lock); updateRetracted = true; + map.clearStatus(); checkUpdateIn(l); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index a89bd83ac0..170b465ff3 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -108,7 +108,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Called in deliverFrame thread to indicate an error from the broker. void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg); - void connectionError(); // Called only during update by Connection::shadowReady Decoder& getDecoder() { return decoder; } @@ -122,6 +121,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef PollableQueue<EventFrame> PollableFrameQueue; typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap; + /** Version number of the cluster protocol, to avoid mixed versions. */ + static const uint32_t CLUSTER_VERSION; + // NB: A dummy Lock& parameter marks functions that must only be // called with Cluster::lock locked. @@ -141,7 +143,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); - void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); + void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, + uint32_t version, Lock&); void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); @@ -159,7 +162,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); void erase(const ConnectionId&, Lock&); - void cancelOffer(const MemberId&, Lock&); // == Called in CPG dispatch thread void deliver( // CPG deliver callback. diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index 0395ff6382..e8c421d4eb 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -158,8 +158,6 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { bool ClusterMap::configChange(const std::string& addresses) { bool memberChange = false; Set update = decode(addresses); - for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8) - update.insert(MemberId(std::string(i, i+8))); Set removed; std::set_difference(alive.begin(), alive.end(), update.begin(), update.end(), diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 3359c7c1f3..7e42ed1c19 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -95,6 +95,9 @@ class ClusterMap { uint64_t getFrameSeq() { return frameSeq; } uint64_t incrementFrameSeq() { return ++frameSeq; } + /** Clear out all knowledge of joiners & members, just keep alive set */ + void clearStatus() { joiners.clear(); members.clear(); } + private: Url getUrl(const Map& map, const MemberId& id); diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 52ea84b02b..ae62994e88 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -113,11 +113,12 @@ Event::operator Buffer() const { return Buffer(const_cast<char*>(getData()), getSize()); } -AMQFrame Event::getFrame() const { +const AMQFrame& Event::getFrame() const { assert(type == CONTROL); + if (!frame.getBody()) { Buffer buf(*this); - AMQFrame frame; QPID_ASSERT(frame.decode(buf)); + } return frame; } @@ -128,8 +129,17 @@ std::ostream& operator << (std::ostream& o, EventType t) { } std::ostream& operator << (std::ostream& o, const EventHeader& e) { - o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]"; - return o; + return o << "Event[" << e.getConnectionId() << " " << e.getType() + << " " << e.getSize() << " bytes]"; +} + +std::ostream& operator << (std::ostream& o, const Event& e) { + o << "Event[" << e.getConnectionId() << " "; + if (e.getType() == CONTROL) + o << e.getFrame(); + else + o << " data " << e.getSize() << " bytes"; + return o << "]"; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 76ba88a87f..3175dd9ed2 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -95,7 +95,7 @@ class Event : public EventHeader { char* getStore() { return store; } const char* getStore() const { return store; } - framing::AMQFrame getFrame() const; + const framing::AMQFrame& getFrame() const; operator framing::Buffer() const; @@ -105,8 +105,10 @@ class Event : public EventHeader { void encodeHeader() const; RefCountedBuffer::pointer store; + mutable framing::AMQFrame frame; }; +std::ostream& operator << (std::ostream&, const Event&); std::ostream& operator << (std::ostream&, const EventHeader&); }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/LockedConnectionMap.h b/cpp/src/qpid/cluster/LockedConnectionMap.h index 4df742d6c2..f4f1d7e832 100644 --- a/cpp/src/qpid/cluster/LockedConnectionMap.h +++ b/cpp/src/qpid/cluster/LockedConnectionMap.h @@ -37,6 +37,7 @@ class LockedConnectionMap public: void insert(const ConnectionPtr& c) { sys::Mutex::ScopedLock l(lock); + assert(map.find(c->getId()) == map.end()); map[c->getId()] = c; } diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index bad56de826..a8ab105395 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -146,7 +146,8 @@ void UpdateClient::update() { client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); - QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); + QPID_LOG(debug, updaterId << " update completed to " << updateeId + << " at " << updateeUrl << ": " << membership); } namespace { diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 474c191b0b..ab66179a05 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -36,6 +36,7 @@ <control name = "update-offer" code="0x2"> <field name="updatee" type="uint64"/> <field name="cluster-id" type="uuid"/> + <field name="version" type="uint32"/> </control> <!-- Sender retracts an offer to a new joiner. --> |