From f6db810cf60addcf5ea9cd28b2daf850c4a732e7 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 27 Jan 2012 19:14:07 +0000 Subject: QPID-3603: Get rid of broker_url="primary" hack, promote primaries via management. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1236846 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/ConnectionObservers.h | 17 ++++++++++--- qpid/cpp/src/qpid/ha/Backup.cpp | 17 ++++++++++--- qpid/cpp/src/qpid/ha/Backup.h | 7 ++++++ qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp | 5 ++-- qpid/cpp/src/qpid/ha/ConnectionExcluder.h | 5 +--- qpid/cpp/src/qpid/ha/HaBroker.cpp | 34 +++++++++----------------- qpid/cpp/src/qpid/ha/HaBroker.h | 3 ++- qpid/cpp/src/tests/brokertest.py | 2 +- qpid/cpp/src/tests/ha_tests.py | 23 ++++++++++------- qpid/tools/src/py/qpid-ha-status | 7 ++---- 10 files changed, 68 insertions(+), 52 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h index b36097cdbb..07e515f3c9 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionObservers.h +++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h @@ -23,6 +23,9 @@ */ #include "ConnectionObserver.h" +#include "qpid/sys/Mutex.h" +#include +#include namespace qpid { namespace broker { @@ -30,12 +33,18 @@ namespace broker { /** * A collection of connection observers. * Calling a ConnectionObserver function will call that function on each observer. + * THREAD SAFE. */ class ConnectionObservers : public ConnectionObserver { public: - // functions for managing the collection of observers void add(boost::shared_ptr observer) { - observers.push_back(observer); + sys::Mutex::ScopedLock l(lock); + observers.insert(observer); + } + + void remove(boost::shared_ptr observer) { + sys::Mutex::ScopedLock l(lock); + observers.erase(observer); } void connection(Connection& c) { @@ -55,10 +64,12 @@ class ConnectionObservers : public ConnectionObserver { } private: - typedef std::vector > Observers; + typedef std::set > Observers; + sys::Mutex lock; Observers observers; template void each(F f) { + sys::Mutex::ScopedLock l(lock); std::for_each(observers.begin(), observers.end(), f); } }; diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 3476af3fe3..6e61ed0b46 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -22,6 +22,7 @@ #include "Settings.h" #include "BrokerReplicator.h" #include "ReplicatingSubscription.h" +#include "ConnectionExcluder.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -42,11 +43,12 @@ using namespace broker; using types::Variant; using std::string; -Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { +Backup::Backup(broker::Broker& b, const Settings& s) : + broker(b), settings(s), excluder(new ConnectionExcluder()) +{ Url url(s.brokerUrl); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; - // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over. // Declare the link std::pair result = broker.getLinks().declare( url[0].host, url[0].port, protocol, @@ -54,9 +56,16 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { s.mechanism, s.username, s.password); assert(result.second); // FIXME aconway 2011-11-23: error handling link = result.first; - boost::shared_ptr wr(new BrokerReplicator(link)); - broker.getExchanges().registerExchange(wr); + + replicator.reset(new BrokerReplicator(link)); + broker.getExchanges().registerExchange(replicator); + + broker.getConnectionObservers().add(excluder); } +Backup::~Backup() { + broker.getExchanges().destroy(replicator->getName()); + broker.getConnectionObservers().remove(excluder); // Allows client connections. +} }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index b4183a4dba..135363c714 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -27,6 +27,7 @@ #include namespace qpid { + namespace broker { class Broker; class Link; @@ -34,6 +35,8 @@ class Link; namespace ha { class Settings; +class ConnectionExcluder; +class BrokerReplicator; /** * State associated with a backup broker. Manages connections to primary. @@ -45,12 +48,16 @@ class Backup { public: Backup(broker::Broker&, const Settings&); + ~Backup(); private: broker::Broker& broker; Settings settings; boost::shared_ptr link; + boost::shared_ptr replicator; + boost::shared_ptr excluder; }; + }} // namespace qpid::ha #endif /*!QPID_HA_BACKUP_H*/ diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp index 141e1da2bc..67409803e8 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp @@ -27,11 +27,10 @@ namespace qpid { namespace ha { -ConnectionExcluder::ConnectionExcluder(PrimaryTest isPrimary_) : isPrimary(isPrimary_) {} +ConnectionExcluder::ConnectionExcluder() {} void ConnectionExcluder::opened(broker::Connection& connection) { - if (!isPrimary() && !connection.isLink() - && !connection.getClientProperties().isSet(ADMIN_TAG)) + if (!connection.isLink() && !connection.getClientProperties().isSet(ADMIN_TAG)) throw Exception( QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId())); } diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h index e6c299884e..f8f2843a0c 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h @@ -41,15 +41,12 @@ namespace ha { class ConnectionExcluder : public broker::ConnectionObserver { public: - typedef boost::function PrimaryTest; - - ConnectionExcluder(PrimaryTest isPrimary_); + ConnectionExcluder(); void opened(broker::Connection& connection); private: static const std::string ADMIN_TAG; - PrimaryTest isPrimary; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 60b4b69581..0e342bd17c 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -55,22 +55,16 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) settings(s), clientUrl(url(s.clientUrl, "ha-client-url")), brokerUrl(url(s.brokerUrl, "ha-broker-url")), + backup(new Backup(b, s)), mgmtObject(0) { - // FIXME aconway 2011-11-22: temporary hack to identify primary. - bool primary = (settings.brokerUrl == PRIMARY); - QPID_LOG(notice, "HA: " << (primary ? "Primary" : "Backup") - << " initialized: client-url=" << clientUrl + // Note all HA brokers start out in backup mode. + QPID_LOG(notice, "HA: Backup initialized: client-url=" << clientUrl << " broker-url=" << brokerUrl); - if (!primary) backup.reset(new Backup(broker, s)); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( boost::shared_ptr( new ReplicatingSubscription::Factory())); - // Register a connection excluder - broker.getConnectionObservers().add( - boost::shared_ptr( - new ConnectionExcluder(boost::bind(&HaBroker::isPrimary, this)))); ManagementAgent* ma = broker.getManagementAgent(); if (!ma) @@ -78,8 +72,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) if (ma) { _qmf::Package packageInit(ma); mgmtObject = new _qmf::HaBroker(ma, this); - // FIXME aconway 2011-11-11: Placeholder - initialize cluster role. - mgmtObject->set_status(isPrimary() ? PRIMARY : BACKUP); + mgmtObject->set_status(BACKUP); ma->addObject(mgmtObject); } } @@ -87,21 +80,22 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) HaBroker::~HaBroker() {} Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { + sys::Mutex::ScopedLock l(lock); switch (methodId) { case _qmf::HaBroker::METHOD_SETSTATUS: { std::string status = dynamic_cast<_qmf::ArgsHaBrokerSetStatus&>(args).i_status; if (status == PRIMARY) { - if (!isPrimary()) { + if (backup.get()) { + // FIXME aconway 2012-01-26: create primary state before resetting backup + // as it allows client connections. backup.reset(); - QPID_LOG(notice, "HA Primary: promoted from backup"); + QPID_LOG(notice, "HA: Primary promoted from backup"); } } else if (status == BACKUP) { - if (isPrimary()) { - backup.reset(new Backup(broker, settings)); - QPID_LOG(notice, "HA Backup: demoted from primary."); - } + if (!backup.get()) + throw Exception("HA: Primary cannot be demoted"); } else { - QPID_LOG(error, "Attempt to set invalid HA status: " << status); + throw Exception("Invalid HA status: "+status); } mgmtObject->set_status(status); break; @@ -112,8 +106,4 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, return Manageable::STATUS_OK; } -bool HaBroker::isPrimary() const { - return !backup.get(); // TODO aconway 2012-01-18: temporary test. -} - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index aff518aa8d..8300de0ea8 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -24,6 +24,7 @@ #include "Settings.h" #include "qpid/Url.h" +#include "qpid/sys/Mutex.h" #include "qmf/org/apache/qpid/ha/HaBroker.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h" #include "qpid/management/Manageable.h" @@ -52,8 +53,8 @@ class HaBroker : public management::Manageable management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); - bool isPrimary() const; private: + sys::Mutex lock; broker::Broker& broker; Settings settings; Url clientUrl, brokerUrl; diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index b2b4d89b0f..f7f446b893 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -513,7 +513,7 @@ class BrokerTest(TestCase): finally: r.close() return contents - def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda d:m.content): + def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 7ccea16a08..bba3bc223a 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -56,8 +56,8 @@ class ShortTests(BrokerTest): self.wait(bs, address) bs.connection.close() - def set_ha_status(self, address, status): - os.system("qpid-ha-status %s %s"%(address, status)) + def promote(self, broker): + os.system("qpid-ha-status %s primary"%(broker.host_port())) def assert_missing(self, session, address): try: @@ -122,7 +122,8 @@ class ShortTests(BrokerTest): b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind. self.assert_browse_retry(b, prefix+"q4", ["6","7"]) - primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary + primary = self.ha_broker(name="primary") + self.promote(primary) p = primary.connect().session() # Create config, send messages before starting the backup, to test catch-up replication. @@ -164,7 +165,8 @@ class ShortTests(BrokerTest): def test_sync(self): def queue(name, replicate): return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) - primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary + primary = self.ha_broker(name="primary") + self.promote(primary) p = primary.connect().session() s = p.sender(queue("q","messages")) for m in [str(i) for i in range(0,10)]: s.send(m) @@ -186,7 +188,8 @@ class ShortTests(BrokerTest): def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" - primary = self.ha_broker(name="primary", broker_url="primary") + primary = self.ha_broker(name="primary") + self.promote(primary) backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) sender = self.popen( @@ -219,7 +222,8 @@ class ShortTests(BrokerTest): def test_failover(self): """Verify that backups rejects connections and that fail-over works in python client""" getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover - primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary + primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL) + self.promote(primary) backup = self.ha_broker(name="backup", broker_url=primary.host_port()) # Check that backup rejects normal connections try: @@ -237,12 +241,13 @@ class ShortTests(BrokerTest): sender.send("foo") primary.kill() assert retry(lambda: not is_running(primary.pid)) - self.set_ha_status(backup.host_port(), "primary") # Promote the backup + self.promote(backup) self.assert_browse_retry(s, "q", ["foo"]) c.close() def test_failover_cpp(self): - primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary + primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL) + self.promote(primary) backup = self.ha_broker(name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) @@ -257,7 +262,7 @@ class ShortTests(BrokerTest): primary.kill() assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die - self.set_ha_status(backup.host_port(), "primary") + self.promote(backup) n = receiver.received # Make sure we are still running assert retry(lambda: receiver.received > n + 10) sender.stop() diff --git a/qpid/tools/src/py/qpid-ha-status b/qpid/tools/src/py/qpid-ha-status index b4234cc051..20804b678d 100755 --- a/qpid/tools/src/py/qpid-ha-status +++ b/qpid/tools/src/py/qpid-ha-status @@ -38,11 +38,8 @@ def validate_status(value): class HaBroker: def __init__(self, broker, session): self.session = session - try: - self.qmf_broker = self.session.addBroker( - broker, client_properties={"qpid.ha-admin":1}) - except Exception, e: - raise Exception("Can't connect to %s: %s"%(broker,e)) + self.qmf_broker = self.session.addBroker( + broker, client_properties={"qpid.ha-admin":1}) ha_brokers=self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha") if (not ha_brokers): raise Exception("Broker does not have HA enabled.") self.ha_broker = ha_brokers[0]; -- cgit v1.2.1