diff options
-rw-r--r-- | cpp/src/qpid/ha/BrokerInfo.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerInfo.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Membership.cpp | 81 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Membership.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 3 | ||||
-rwxr-xr-x | cpp/src/tests/ha_test.py | 2 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 9 |
9 files changed, 67 insertions, 66 deletions
diff --git a/cpp/src/qpid/ha/BrokerInfo.cpp b/cpp/src/qpid/ha/BrokerInfo.cpp index 80e023c11c..a13451e179 100644 --- a/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/cpp/src/qpid/ha/BrokerInfo.cpp @@ -91,13 +91,16 @@ void BrokerInfo::assign(const Variant::Map& m) { status = BrokerStatus(get(m, STATUS).asUint8()); } -std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { - o << b.getSystemId().str().substr(0,8); - if (b.getAddress() != empty) o << "@" << b.getAddress(); - o << "(" << printable(b.getStatus()) << ")"; +std::ostream& BrokerInfo::printId(std::ostream& o) const { + o << getSystemId().str().substr(0,8); + if (getAddress() != empty) o << "@" << getAddress(); return o; } +std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { + return b.printId(o) << "(" << printable(b.getStatus()) << ")"; +} + std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) { std::ostream_iterator<BrokerInfo> out(o, " "); copy(infos.begin(), infos.end(), out); diff --git a/cpp/src/qpid/ha/BrokerInfo.h b/cpp/src/qpid/ha/BrokerInfo.h index c9324ce0ca..bd1ad86392 100644 --- a/cpp/src/qpid/ha/BrokerInfo.h +++ b/cpp/src/qpid/ha/BrokerInfo.h @@ -64,6 +64,9 @@ class BrokerInfo // So it can be put in a set. bool operator<(const BrokerInfo x) const { return systemId < x.systemId; } + // Print just the identifying information, not the status. + std::ostream& printId(std::ostream& o) const; + private: Address address; types::Uuid systemId; diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 81b0cd6413..bb706e53b1 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -902,24 +902,7 @@ void BrokerReplicator::disconnected() { } void BrokerReplicator::setMembership(const Variant::List& brokers) { - Membership& membership(haBroker.getMembership()); - membership.assign(brokers); - // Check if the primary has signalled a change in my status: - // from CATCHUP to READY when we are caught up. - // from READY TO CATCHUP if we are timed out during fail-over. - BrokerInfo info; - if (membership.get(membership.getSelf(), info)) { - BrokerStatus oldStatus = haBroker.getStatus(); - BrokerStatus newStatus = info.getStatus(); - if (oldStatus == CATCHUP && newStatus == READY) { - QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready"); - haBroker.getMembership().setStatus(READY); - } - else if (oldStatus == READY && newStatus == CATCHUP) { - QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up"); - haBroker.getMembership().setStatus(CATCHUP); - } - } + haBroker.getMembership().assign(brokers); } }} // namespace broker diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp index 6af9b6e6d8..411bad3841 100644 --- a/cpp/src/qpid/ha/Membership.cpp +++ b/cpp/src/qpid/ha/Membership.cpp @@ -43,6 +43,7 @@ Membership::Membership(const BrokerInfo& info, HaBroker& b) : haBroker(b), self(info.getSystemId()) { brokers[self] = info; + oldStatus = info.getStatus(); } void Membership::clear() { @@ -54,6 +55,7 @@ void Membership::clear() { void Membership::add(const BrokerInfo& b) { Mutex::ScopedLock l(lock); + assert(b.getSystemId() != self); brokers[b.getSystemId()] = b; update(l); } @@ -86,6 +88,10 @@ void Membership::assign(const types::Variant::List& list) { types::Variant::List Membership::asList() const { Mutex::ScopedLock l(lock); + return asList(l); +} + +types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const { types::Variant::List list; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); @@ -109,18 +115,43 @@ bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { return true; } +namespace { +bool checkTransition(BrokerStatus from, BrokerStatus to) { + // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. + static const BrokerStatus TRANSITIONS[][2] = { + { STANDALONE, JOINING }, // Initialization of backup broker + { JOINING, CATCHUP }, // Connected to primary + { JOINING, RECOVERING }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. + { READY, RECOVERING }, // Chosen as new primary + { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. + { RECOVERING, ACTIVE } // All expected backups are ready + }; + static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); + for (size_t i = 0; i < N; ++i) { + if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) + return true; + } + return false; +} +} // namespace + + void Membership::update(Mutex::ScopedLock& l) { QPID_LOG(info, "Membership: " << brokers); // Update managment and send update event. - Variant::List brokerList = asList(); - if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str()); - if (mgmtObject) mgmtObject->set_members(brokerList); + BrokerStatus newStatus = getStatus(l); + Variant::List brokerList = asList(l); + if (mgmtObject) { + mgmtObject->set_status(printable(newStatus).str()); + mgmtObject->set_members(brokerList); + } haBroker.getBroker().getManagementAgent()->raiseEvent( _qmf::EventMembersUpdate(brokerList)); // Update link client properties framing::FieldTable linkProperties = haBroker.getBroker().getLinkClientProperties(); - if (isBackup(getStatus(l))) { + if (isBackup(newStatus)) { // Set backup tag on outgoing link properties. linkProperties.setTable( ConnectionObserver::BACKUP_TAG, brokers[types::Uuid(self)].asFieldTable()); @@ -130,6 +161,17 @@ void Membership::update(Mutex::ScopedLock& l) { linkProperties.erase(ConnectionObserver::BACKUP_TAG); haBroker.getBroker().setLinkClientProperties(linkProperties); } + + // Check status transitions + if (oldStatus != newStatus) { + QPID_LOG(info, "Status change: " + << printable(oldStatus) << " -> " << printable(newStatus)); + if (!checkTransition(oldStatus, newStatus)) { + haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus) + << " -> " << printable(newStatus))); + } + oldStatus = newStatus; + } } void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { @@ -139,40 +181,9 @@ void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { } -namespace { -bool checkTransition(BrokerStatus from, BrokerStatus to) { - // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. - static const BrokerStatus TRANSITIONS[][2] = { - { STANDALONE, JOINING }, // Initialization of backup broker - { JOINING, CATCHUP }, // Connected to primary - { JOINING, RECOVERING }, // Chosen as initial primary. - { CATCHUP, READY }, // Caught up all queues, ready to take over. - { READY, RECOVERING }, // Chosen as new primary - { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. - { RECOVERING, ACTIVE } // All expected backups are ready - }; - static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); - for (size_t i = 0; i < N; ++i) { - if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) - return true; - } - return false; -} -} // namespace - void Membership::setStatus(BrokerStatus newStatus) { - BrokerStatus status = getStatus(); - QPID_LOG(info, "Status change: " - << printable(status) << " -> " << printable(newStatus)); - bool legal = checkTransition(status, newStatus); - if (!legal) { - haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status) - << " -> " << printable(newStatus))); - } - Mutex::ScopedLock l(lock); brokers[self].setStatus(newStatus); - if (mgmtObject) mgmtObject->set_status(printable(newStatus).str()); update(l); } diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h index 7069e79b7f..f442586a71 100644 --- a/cpp/src/qpid/ha/Membership.h +++ b/cpp/src/qpid/ha/Membership.h @@ -83,12 +83,14 @@ class Membership private: void update(sys::Mutex::ScopedLock&); BrokerStatus getStatus(sys::Mutex::ScopedLock&) const; + types::Variant::List asList(sys::Mutex::ScopedLock&) const; mutable sys::Mutex lock; HaBroker& haBroker; boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject; const types::Uuid self; BrokerInfo::Map brokers; + BrokerStatus oldStatus; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index ac9df05937..2108bc4077 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -51,7 +51,8 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) : cancelled(false), queue(q) { std::ostringstream os; - os << "Guard of " << queue.getName() << " at " << info << ": "; + os << "Guard of " << queue.getName() << " at "; + info.printId(os) << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); queue.addObserver(observer); diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index f36da6c1e1..976619397e 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -121,7 +121,8 @@ ReplicatingSubscription::ReplicatingSubscription( // Set a log prefix message that identifies the remote broker. ostringstream os; - os << "Subscription to " << queue->getName() << " at " << info << ": "; + os << "Subscription to " << queue->getName() << " at "; + info.printId(os) << ": "; logPrefix = os.str(); // If this is a non-cluster standalone replication then we need to diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py index f3c1d3a957..f2fc50054f 100755 --- a/cpp/src/tests/ha_test.py +++ b/cpp/src/tests/ha_test.py @@ -107,7 +107,7 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=trace+:ha::", # FIXME aconway 2013-06-14: debug+ + "--log-enable=debug+:ha::", # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 60e3444c45..368ac02506 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -1121,13 +1121,10 @@ class RecoveryTests(HaBrokerTest): but can still rejoin. """ cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]); - cluster[0].wait_status("active") # Primary ready - for b in cluster[1:3]: b.wait_status("ready") # Backups ready for i in [0,1]: cluster.kill(i, False) - cluster[2].promote() # New primary, expected backup will 1 - cluster[2].wait_status("recovering") + cluster[2].promote() # New primary, expected backup will be 1 # Should not go active till the expected backup connects or times out. - self.assertEqual(cluster[2].ha_status(), "recovering") + cluster[2].wait_status("recovering") # Messages should be held till expected backup times out s = cluster[2].connect().session().sender("q;{create:always}") s.send("foo", sync=False) @@ -1135,7 +1132,7 @@ class RecoveryTests(HaBrokerTest): try: s.sync(timeout=.01); self.fail("Expected Timeout exception") except Timeout: pass s.sync(timeout=1) # And released after the timeout. - self.assertEqual(cluster[2].ha_status(), "active") + cluster[2].wait_status("active") def test_join_ready_cluster(self): """If we join a cluster where the primary is dead, the new primary is |