diff options
author | Alan Conway <aconway@apache.org> | 2012-02-13 16:17:34 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-13 16:17:34 +0000 |
commit | 5477214cd08c5ed7113bd842f848c2b8afa74107 (patch) | |
tree | f046dba6c447b2ec3bba7018302a89cbf42264e6 | |
parent | ab9cf29397c17ed20dff2bdf0f59896874c40fc1 (diff) | |
download | qpid-python-5477214cd08c5ed7113bd842f848c2b8afa74107.tar.gz |
QPID-3603: Reconnect URL in broker::Link
- Flatten known-hosts in Link to a single URL.
- Circular retry on failover URL.
- Allow setting a different retry URL.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1243575 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 8 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 80 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/reliable_replication_test | 18 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-ha-tool | 29 |
10 files changed, 113 insertions, 80 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index b975511e59..e0c94853d7 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -64,6 +64,7 @@ Link::Link(LinkRegistry* _links, visitCount(0), currentInterval(1), closing(false), + reconnectNext(0), // Index of next address for reconnecting in url. channelCounter(1), connection(0), agent(0) @@ -146,11 +147,23 @@ void Link::established () } } +void Link::setUrl(const Url& u) { + Mutex::ScopedLock mutex(lock); + url = u; + reconnectNext = 0; +} + void Link::opened() { Mutex::ScopedLock mutex(lock); assert(connection); - urls.reset(connection->getKnownHosts()); - QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); + // Get default URL from known-hosts. + const std::vector<Url>& known = connection->getKnownHosts(); + // Flatten vector of URLs into a single URL listing all addresses. + url.clear(); + for(size_t i = 0; i < known.size(); ++i) + url.insert(url.end(), known[i].begin(), known[i].end()); + reconnectNext = 0; + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); } void Link::closed(int, std::string text) @@ -334,17 +347,16 @@ void Link::reconnect(const qpid::Address& a) } } -bool Link::tryFailover() -{ - Address next; - if (urls.next(next) && - (next.host != host || next.port != port || next.protocol != transport)) { +bool Link::tryFailover() { // FIXME aconway 2012-01-30: lock held? + if (reconnectNext >= url.size()) reconnectNext = 0; + if (url.empty()) return false; + Address next = url[reconnectNext++]; + if (next.host != host || next.port != port || next.protocol != transport) { links->changeAddress(Address(transport, host, port), next); QPID_LOG(debug, "Link failing over to " << host << ":" << port); return true; - } else { - return false; } + return false; } // Management updates for a linke are inconsistent in a cluster, so they are diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index a11f99e91e..a9d045b0af 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -23,10 +23,10 @@ */ #include <boost/shared_ptr.hpp> +#include "qpid/Url.h" #include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableConfig.h" #include "qpid/broker/Bridge.h" -#include "qpid/broker/RetryList.h" #include "qpid/sys/Mutex.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" @@ -60,7 +60,8 @@ namespace qpid { uint32_t visitCount; uint32_t currentInterval; bool closing; - RetryList urls; + Url url; // URL can contain many addresses. + size_t reconnectNext; // Index for next re-connect attempt typedef std::vector<Bridge::shared_ptr> Bridges; Bridges created; // Bridges pending creation @@ -111,6 +112,7 @@ namespace qpid { uint nextChannel(); void add(Bridge::shared_ptr); void cancel(Bridge::shared_ptr); + void setUrl(const Url&); // Set URL for reconnection. void established(); // Called when connection is create void opened(); // Called when connection is open (after create) diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 6e61ed0b46..e5bd0ed4dc 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -56,6 +56,7 @@ Backup::Backup(broker::Broker& b, const Settings& s) : s.mechanism, s.username, s.password); assert(result.second); // FIXME aconway 2011-11-23: error handling link = result.first; + link->setUrl(Url(s.brokerUrl)); replicator.reset(new BrokerReplicator(link)); broker.getExchanges().registerExchange(replicator); @@ -63,6 +64,11 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker.getConnectionObservers().add(excluder); } +void Backup::setUrl(const Url& url) { + // FIXME aconway 2012-01-30: locking? + link->setUrl(url); +} + Backup::~Backup() { broker.getExchanges().destroy(replicator->getName()); broker.getConnectionObservers().remove(excluder); // Allows client connections. diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 135363c714..00ec55a6ff 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -49,6 +49,7 @@ class Backup public: Backup(broker::Broker&, const Settings&); ~Backup(); + void setUrl(const Url&); private: broker::Broker& broker; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index a53afa82fe..ad97f87a62 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -55,14 +55,15 @@ const std::string BACKUP="backup"; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : broker(b), settings(s), - clientUrl(url(s.clientUrl, "ha-client-url")), brokerUrl(url(s.brokerUrl, "ha-broker-url")), + clientUrl(s.clientUrl.empty() ? brokerUrl : url(s.clientUrl, "ha-client-url")), backup(new Backup(b, s)), mgmtObject(0) { // Note all HA brokers start out in backup mode. - QPID_LOG(notice, "HA: Backup initialized: client-url=" << clientUrl - << " broker-url=" << brokerUrl); + QPID_LOG(notice, "HA: Backup initialized: " + << " broker-url=" << brokerUrl + << " client-url=" << clientUrl); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( boost::shared_ptr<ReplicatingSubscription::Factory>( @@ -95,17 +96,15 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, break; } case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: { - QPID_LOG(critical, "FIXME" << "before " << clientUrl) clientUrl = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).i_clientAddresses; - QPID_LOG(critical, "FIXME" << "after " << clientUrl) - // FIXME aconway 2012-01-30: upate status for new URL mgmtObject->set_clientAddresses(clientUrl.str()); + // FIXME aconway 2012-01-30: upate status for new URL break; } case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: { brokerUrl = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args).i_brokerAddresses; - // FIXME aconway 2012-01-30: upate status for new URL mgmtObject->set_brokerAddresses(brokerUrl.str()); + if (backup.get()) backup->setUrl(brokerUrl); break; } default: diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 8300de0ea8..affaa7486f 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -57,7 +57,7 @@ class HaBroker : public management::Manageable sys::Mutex lock; broker::Broker& broker; Settings settings; - Url clientUrl, brokerUrl; + Url brokerUrl, clientUrl; std::auto_ptr<Backup> backup; qmf::org::apache::qpid::ha::HaBroker* mgmtObject; }; diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index f7f446b893..5b18e58dee 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -76,7 +76,7 @@ def error_line(filename, n=1): except: return "" return ":\n" + "".join(result) -def retry(function, timeout=10, delay=.01): +def retry(function, timeout=1, delay=.01): """Call function until it returns True or timeout expires. Double the delay for each retry. Return True if function returns true, False if timeout expires.""" @@ -277,8 +277,8 @@ class Broker(Popen): self.find_log() cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] - if log_level != None: - cmd += ["--log-enable=%s" % log_level] + cmd += ["--log-enable=%s"%(log_level or "info+") ] + self.datadir = self.name cmd += ["--data-dir", self.datadir] if show_cmd: print cmd @@ -526,7 +526,7 @@ class BrokerTest(TestCase): retry(test, timeout, delay) self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform)) -def join(thread, timeout=10): +def join(thread, timeout=1): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index c213e6a4ff..e782b57f7f 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -27,18 +27,30 @@ from logging import getLogger, WARN, ERROR, DEBUG log = getLogger("qpid.ha-tests") +class HaBroker(Broker): + def __init__(self, test, args=[], broker_url=None, **kwargs): + assert BrokerTest.ha_lib, "Cannot locate HA plug-in" + Broker.__init__(self, test, + args=["--load-module", BrokerTest.ha_lib, + "--ha-enable=yes", + "--ha-broker-url", broker_url ], + **kwargs) + + def promote(self): + assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0 + + def set_client_url(self, url): + assert os.system( + "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0 + + def set_broker_url(self, url): + assert os.system( + "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0 + + class ShortTests(BrokerTest): """Short HA functionality tests.""" - def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs): - assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - return Broker(self, args=["--load-module", BrokerTest.ha_lib, - "--ha-enable=yes", - "--ha-client-url", client_url, - "--ha-broker-url", broker_url, - ] + args, - **kwargs) - # FIXME aconway 2011-11-15: work around async configuration replication. # Wait for an address to become valid. def wait(self, session, address): @@ -49,15 +61,19 @@ class ShortTests(BrokerTest): except NotFound: return False assert retry(check), "Timed out waiting for %s"%(address) - # FIXME aconway 2012-01-23: workaround: we need to give the - # backup a chance to attach to the queue. + # FIXME aconway 2012-01-23: work around async configuration replication. + # Wait for address to become valid on a backup broker. def wait_backup(self, backup, address): bs = self.connect_admin(backup).session() self.wait(bs, address) bs.connection.close() - def promote(self, broker): - os.system("qpid-ha-tool --promote %s"%(broker.host_port())) + # Combines wait_backup and assert_browse_retry + def assert_browse_backup(self, backup, queue, expected, **kwargs): + bs = self.connect_admin(backup).session() + self.wait(bs, queue) + self.assert_browse_retry(bs, queue, expected, **kwargs) + bs.connection.close() def assert_missing(self, session, address): try: @@ -122,13 +138,13 @@ class ShortTests(BrokerTest): b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind. self.assert_browse_retry(b, prefix+"q4", ["6","7"]) - primary = self.ha_broker(name="primary") - self.promote(primary) + primary = HaBroker(self, name="primary") + primary.promote() p = primary.connect().session() # Create config, send messages before starting the backup, to test catch-up replication. setup(p, "1", primary) - backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) # Create config, send messages after starting the backup, to test steady-state replication. setup(p, "2", primary) @@ -165,16 +181,16 @@ class ShortTests(BrokerTest): def test_sync(self): def queue(name, replicate): return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) - primary = self.ha_broker(name="primary") - self.promote(primary) + primary = HaBroker(self, name="primary") + primary.promote() p = primary.connect().session() s = p.sender(queue("q","messages")) for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() - backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) for m in [str(i) for i in range(10,20)]: s.send(m) s.sync() - backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) for m in [str(i) for i in range(20,30)]: s.send(m) s.sync() @@ -188,10 +204,10 @@ class ShortTests(BrokerTest): def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" - primary = self.ha_broker(name="primary") - self.promote(primary) - backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) - backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) sender = self.popen( ["qpid-send", "--broker", primary.host_port(), @@ -222,9 +238,9 @@ class ShortTests(BrokerTest): def test_failover(self): """Verify that backups rejects connections and that fail-over works in python client""" getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover - primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL) - self.promote(primary) - backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) # Check that backup rejects normal connections try: backup.connect() @@ -241,14 +257,14 @@ class ShortTests(BrokerTest): sender.send("foo") primary.kill() assert retry(lambda: not is_running(primary.pid)) - self.promote(backup) + backup.promote() self.assert_browse_retry(s, "q", ["foo"]) c.close() def test_failover_cpp(self): - primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL) - self.promote(primary) - backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) self.wait_backup(backup, "q") @@ -262,7 +278,7 @@ class ShortTests(BrokerTest): primary.kill() assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die - self.promote(backup) + backup.promote() n = receiver.received # Make sure we are still running assert retry(lambda: receiver.received > n + 10) sender.stop() diff --git a/qpid/cpp/src/tests/reliable_replication_test b/qpid/cpp/src/tests/reliable_replication_test index 6f1d5882a5..273e482da0 100755 --- a/qpid/cpp/src/tests/reliable_replication_test +++ b/qpid/cpp/src/tests/reliable_replication_test @@ -65,12 +65,9 @@ receive() { } bounce_link() { - echo "Destroying link..." $PYTHON_COMMANDS/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A" - echo "Link destroyed; recreating route..." - sleep 2 +# sleep 2 $PYTHON_COMMANDS/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication - echo "Route re-established" } if test -d ${PYTHON_DIR} && test -e $REPLICATING_LISTENER_LIB && test -e $REPLICATION_EXCHANGE_LIB ; then @@ -78,16 +75,11 @@ if test -d ${PYTHON_DIR} && test -e $REPLICATING_LISTENER_LIB && test -e $REPLIC for i in `seq 1 100000`; do echo Message $i; done > replicated.expected send & receive & - for i in `seq 1 5`; do sleep 10; bounce_link; done; + for i in `seq 1 3`; do sleep 1; bounce_link; done; wait #check that received list is identical to sent list - diff replicated.actual replicated.expected || FAIL=1 - if [[ $FAIL ]]; then - echo reliable replication test failed: expectations not met! - exit 1 - else - echo replication reliable in the face of link failures - rm -f replication.actual replication.expected replication-source.log replication-dest.log qpidd-repl.port - fi + diff replicated.actual replicated.expected || exit 1 + rm -f replication.actual replication.expected replication-source.log replication-dest.log qpidd-repl.port + true fi diff --git a/qpid/tools/src/py/qpid-ha-tool b/qpid/tools/src/py/qpid-ha-tool index 5b6d85c7bd..97cbd617d9 100755 --- a/qpid/tools/src/py/qpid-ha-tool +++ b/qpid/tools/src/py/qpid-ha-tool @@ -34,38 +34,43 @@ op.add_option("-q", "--query", action="store_true", help="Show the current HA settings on the broker.") class HaBroker: - def __init__(self, broker): - self.session = qmf.console.Session() - self.qmf_broker = self.session.addBroker(broker, client_properties={"qpid.ha-admin":1}) - ha_brokers = self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha") + def __init__(self, session, broker): + self.session = session + self.qmf_broker = self.session.addBroker( + broker, client_properties={"qpid.ha-admin":1}) + ha_brokers = self.session.getObjects( + _class="habroker", _package="org.apache.qpid.ha") if (not ha_brokers): raise Exception("Broker does not have HA enabled.") self.ha_broker = ha_brokers[0] def query(self): self.ha_broker.update() print "status=", self.ha_broker.status - print "client-addresses=", self.ha_broker.clientAddresses print "broker-addresses=", self.ha_broker.brokerAddresses + print "client-addresses=", self.ha_broker.clientAddresses def main(argv): try: opts, args = op.parse_args(argv) if len(args) >1: broker = args[1] else: broker = "localhost:5672" - hb = HaBroker(broker) + session = qmf.console.Session() try: + hb = HaBroker(session, broker) action=False - if opts.promote: hb.ha_broker.promote(); action=True - if opts.client_addresses: hb.ha_broker.setClientAddresses(opts.client_addresses); action=True - if opts.broker_addresses: hb.ha_broker.setBrokerAddresses(opts.broker_addresses); action=True + if opts.promote: + hb.ha_broker.promote(); action=True + if opts.broker_addresses: + hb.ha_broker.setBrokerAddresses(opts.broker_addresses); action=True + if opts.client_addresses: + hb.ha_broker.setClientAddresses(opts.client_addresses); action=True if opts.query or not action: hb.query() return 0 finally: - hb.session.close() # Avoid errors shutting down threads. + session.close() # Avoid errors shutting down threads. except Exception, e: - raise # FIXME aconway 2012-01-30: print e - return -1 + return 1 if __name__ == "__main__": sys.exit(main(sys.argv)) |