summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-13 16:17:34 +0000
committerAlan Conway <aconway@apache.org>2012-02-13 16:17:34 +0000
commit5477214cd08c5ed7113bd842f848c2b8afa74107 (patch)
treef046dba6c447b2ec3bba7018302a89cbf42264e6
parentab9cf29397c17ed20dff2bdf0f59896874c40fc1 (diff)
downloadqpid-python-5477214cd08c5ed7113bd842f848c2b8afa74107.tar.gz
QPID-3603: Reconnect URL in broker::Link
- Flatten known-hosts in Link to a single URL. - Circular retry on failover URL. - Allow setting a different retry URL. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1243575 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp30
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h6
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h1
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp13
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/tests/brokertest.py8
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py80
-rwxr-xr-xqpid/cpp/src/tests/reliable_replication_test18
-rwxr-xr-xqpid/tools/src/py/qpid-ha-tool29
10 files changed, 113 insertions, 80 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index b975511e59..e0c94853d7 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -64,6 +64,7 @@ Link::Link(LinkRegistry* _links,
visitCount(0),
currentInterval(1),
closing(false),
+ reconnectNext(0), // Index of next address for reconnecting in url.
channelCounter(1),
connection(0),
agent(0)
@@ -146,11 +147,23 @@ void Link::established ()
}
}
+void Link::setUrl(const Url& u) {
+ Mutex::ScopedLock mutex(lock);
+ url = u;
+ reconnectNext = 0;
+}
+
void Link::opened() {
Mutex::ScopedLock mutex(lock);
assert(connection);
- urls.reset(connection->getKnownHosts());
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+ // Get default URL from known-hosts.
+ const std::vector<Url>& known = connection->getKnownHosts();
+ // Flatten vector of URLs into a single URL listing all addresses.
+ url.clear();
+ for(size_t i = 0; i < known.size(); ++i)
+ url.insert(url.end(), known[i].begin(), known[i].end());
+ reconnectNext = 0;
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
void Link::closed(int, std::string text)
@@ -334,17 +347,16 @@ void Link::reconnect(const qpid::Address& a)
}
}
-bool Link::tryFailover()
-{
- Address next;
- if (urls.next(next) &&
- (next.host != host || next.port != port || next.protocol != transport)) {
+bool Link::tryFailover() { // FIXME aconway 2012-01-30: lock held?
+ if (reconnectNext >= url.size()) reconnectNext = 0;
+ if (url.empty()) return false;
+ Address next = url[reconnectNext++];
+ if (next.host != host || next.port != port || next.protocol != transport) {
links->changeAddress(Address(transport, host, port), next);
QPID_LOG(debug, "Link failing over to " << host << ":" << port);
return true;
- } else {
- return false;
}
+ return false;
}
// Management updates for a linke are inconsistent in a cluster, so they are
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index a11f99e91e..a9d045b0af 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -23,10 +23,10 @@
*/
#include <boost/shared_ptr.hpp>
+#include "qpid/Url.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
-#include "qpid/broker/RetryList.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
@@ -60,7 +60,8 @@ namespace qpid {
uint32_t visitCount;
uint32_t currentInterval;
bool closing;
- RetryList urls;
+ Url url; // URL can contain many addresses.
+ size_t reconnectNext; // Index for next re-connect attempt
typedef std::vector<Bridge::shared_ptr> Bridges;
Bridges created; // Bridges pending creation
@@ -111,6 +112,7 @@ namespace qpid {
uint nextChannel();
void add(Bridge::shared_ptr);
void cancel(Bridge::shared_ptr);
+ void setUrl(const Url&); // Set URL for reconnection.
void established(); // Called when connection is create
void opened(); // Called when connection is open (after create)
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 6e61ed0b46..e5bd0ed4dc 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -56,6 +56,7 @@ Backup::Backup(broker::Broker& b, const Settings& s) :
s.mechanism, s.username, s.password);
assert(result.second); // FIXME aconway 2011-11-23: error handling
link = result.first;
+ link->setUrl(Url(s.brokerUrl));
replicator.reset(new BrokerReplicator(link));
broker.getExchanges().registerExchange(replicator);
@@ -63,6 +64,11 @@ Backup::Backup(broker::Broker& b, const Settings& s) :
broker.getConnectionObservers().add(excluder);
}
+void Backup::setUrl(const Url& url) {
+ // FIXME aconway 2012-01-30: locking?
+ link->setUrl(url);
+}
+
Backup::~Backup() {
broker.getExchanges().destroy(replicator->getName());
broker.getConnectionObservers().remove(excluder); // Allows client connections.
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 135363c714..00ec55a6ff 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -49,6 +49,7 @@ class Backup
public:
Backup(broker::Broker&, const Settings&);
~Backup();
+ void setUrl(const Url&);
private:
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index a53afa82fe..ad97f87a62 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -55,14 +55,15 @@ const std::string BACKUP="backup";
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
settings(s),
- clientUrl(url(s.clientUrl, "ha-client-url")),
brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+ clientUrl(s.clientUrl.empty() ? brokerUrl : url(s.clientUrl, "ha-client-url")),
backup(new Backup(b, s)),
mgmtObject(0)
{
// Note all HA brokers start out in backup mode.
- QPID_LOG(notice, "HA: Backup initialized: client-url=" << clientUrl
- << " broker-url=" << brokerUrl);
+ QPID_LOG(notice, "HA: Backup initialized: "
+ << " broker-url=" << brokerUrl
+ << " client-url=" << clientUrl);
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
boost::shared_ptr<ReplicatingSubscription::Factory>(
@@ -95,17 +96,15 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
break;
}
case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
- QPID_LOG(critical, "FIXME" << "before " << clientUrl)
clientUrl = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).i_clientAddresses;
- QPID_LOG(critical, "FIXME" << "after " << clientUrl)
- // FIXME aconway 2012-01-30: upate status for new URL
mgmtObject->set_clientAddresses(clientUrl.str());
+ // FIXME aconway 2012-01-30: upate status for new URL
break;
}
case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
brokerUrl = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args).i_brokerAddresses;
- // FIXME aconway 2012-01-30: upate status for new URL
mgmtObject->set_brokerAddresses(brokerUrl.str());
+ if (backup.get()) backup->setUrl(brokerUrl);
break;
}
default:
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 8300de0ea8..affaa7486f 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -57,7 +57,7 @@ class HaBroker : public management::Manageable
sys::Mutex lock;
broker::Broker& broker;
Settings settings;
- Url clientUrl, brokerUrl;
+ Url brokerUrl, clientUrl;
std::auto_ptr<Backup> backup;
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
};
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index f7f446b893..5b18e58dee 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -76,7 +76,7 @@ def error_line(filename, n=1):
except: return ""
return ":\n" + "".join(result)
-def retry(function, timeout=10, delay=.01):
+def retry(function, timeout=1, delay=.01):
"""Call function until it returns True or timeout expires.
Double the delay for each retry. Return True if function
returns true, False if timeout expires."""
@@ -277,8 +277,8 @@ class Broker(Popen):
self.find_log()
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
- if log_level != None:
- cmd += ["--log-enable=%s" % log_level]
+ cmd += ["--log-enable=%s"%(log_level or "info+") ]
+
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
if show_cmd: print cmd
@@ -526,7 +526,7 @@ class BrokerTest(TestCase):
retry(test, timeout, delay)
self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
-def join(thread, timeout=10):
+def join(thread, timeout=1):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index c213e6a4ff..e782b57f7f 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -27,18 +27,30 @@ from logging import getLogger, WARN, ERROR, DEBUG
log = getLogger("qpid.ha-tests")
+class HaBroker(Broker):
+ def __init__(self, test, args=[], broker_url=None, **kwargs):
+ assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ Broker.__init__(self, test,
+ args=["--load-module", BrokerTest.ha_lib,
+ "--ha-enable=yes",
+ "--ha-broker-url", broker_url ],
+ **kwargs)
+
+ def promote(self):
+ assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0
+
+ def set_client_url(self, url):
+ assert os.system(
+ "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0
+
+ def set_broker_url(self, url):
+ assert os.system(
+ "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0
+
+
class ShortTests(BrokerTest):
"""Short HA functionality tests."""
- def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs):
- assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- return Broker(self, args=["--load-module", BrokerTest.ha_lib,
- "--ha-enable=yes",
- "--ha-client-url", client_url,
- "--ha-broker-url", broker_url,
- ] + args,
- **kwargs)
-
# FIXME aconway 2011-11-15: work around async configuration replication.
# Wait for an address to become valid.
def wait(self, session, address):
@@ -49,15 +61,19 @@ class ShortTests(BrokerTest):
except NotFound: return False
assert retry(check), "Timed out waiting for %s"%(address)
- # FIXME aconway 2012-01-23: workaround: we need to give the
- # backup a chance to attach to the queue.
+ # FIXME aconway 2012-01-23: work around async configuration replication.
+ # Wait for address to become valid on a backup broker.
def wait_backup(self, backup, address):
bs = self.connect_admin(backup).session()
self.wait(bs, address)
bs.connection.close()
- def promote(self, broker):
- os.system("qpid-ha-tool --promote %s"%(broker.host_port()))
+ # Combines wait_backup and assert_browse_retry
+ def assert_browse_backup(self, backup, queue, expected, **kwargs):
+ bs = self.connect_admin(backup).session()
+ self.wait(bs, queue)
+ self.assert_browse_retry(bs, queue, expected, **kwargs)
+ bs.connection.close()
def assert_missing(self, session, address):
try:
@@ -122,13 +138,13 @@ class ShortTests(BrokerTest):
b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
self.assert_browse_retry(b, prefix+"q4", ["6","7"])
- primary = self.ha_broker(name="primary")
- self.promote(primary)
+ primary = HaBroker(self, name="primary")
+ primary.promote()
p = primary.connect().session()
# Create config, send messages before starting the backup, to test catch-up replication.
setup(p, "1", primary)
- backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
# Create config, send messages after starting the backup, to test steady-state replication.
setup(p, "2", primary)
@@ -165,16 +181,16 @@ class ShortTests(BrokerTest):
def test_sync(self):
def queue(name, replicate):
return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
- primary = self.ha_broker(name="primary")
- self.promote(primary)
+ primary = HaBroker(self, name="primary")
+ primary.promote()
p = primary.connect().session()
s = p.sender(queue("q","messages"))
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
- backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
for m in [str(i) for i in range(10,20)]: s.send(m)
s.sync()
- backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
for m in [str(i) for i in range(20,30)]: s.send(m)
s.sync()
@@ -188,10 +204,10 @@ class ShortTests(BrokerTest):
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
- primary = self.ha_broker(name="primary")
- self.promote(primary)
- backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
- backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
sender = self.popen(
["qpid-send",
"--broker", primary.host_port(),
@@ -222,9 +238,9 @@ class ShortTests(BrokerTest):
def test_failover(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover
- primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
- self.promote(primary)
- backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
# Check that backup rejects normal connections
try:
backup.connect()
@@ -241,14 +257,14 @@ class ShortTests(BrokerTest):
sender.send("foo")
primary.kill()
assert retry(lambda: not is_running(primary.pid))
- self.promote(backup)
+ backup.promote()
self.assert_browse_retry(s, "q", ["foo"])
c.close()
def test_failover_cpp(self):
- primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
- self.promote(primary)
- backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
self.wait_backup(backup, "q")
@@ -262,7 +278,7 @@ class ShortTests(BrokerTest):
primary.kill()
assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
- self.promote(backup)
+ backup.promote()
n = receiver.received # Make sure we are still running
assert retry(lambda: receiver.received > n + 10)
sender.stop()
diff --git a/qpid/cpp/src/tests/reliable_replication_test b/qpid/cpp/src/tests/reliable_replication_test
index 6f1d5882a5..273e482da0 100755
--- a/qpid/cpp/src/tests/reliable_replication_test
+++ b/qpid/cpp/src/tests/reliable_replication_test
@@ -65,12 +65,9 @@ receive() {
}
bounce_link() {
- echo "Destroying link..."
$PYTHON_COMMANDS/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A"
- echo "Link destroyed; recreating route..."
- sleep 2
+# sleep 2
$PYTHON_COMMANDS/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
- echo "Route re-established"
}
if test -d ${PYTHON_DIR} && test -e $REPLICATING_LISTENER_LIB && test -e $REPLICATION_EXCHANGE_LIB ; then
@@ -78,16 +75,11 @@ if test -d ${PYTHON_DIR} && test -e $REPLICATING_LISTENER_LIB && test -e $REPLIC
for i in `seq 1 100000`; do echo Message $i; done > replicated.expected
send &
receive &
- for i in `seq 1 5`; do sleep 10; bounce_link; done;
+ for i in `seq 1 3`; do sleep 1; bounce_link; done;
wait
#check that received list is identical to sent list
- diff replicated.actual replicated.expected || FAIL=1
- if [[ $FAIL ]]; then
- echo reliable replication test failed: expectations not met!
- exit 1
- else
- echo replication reliable in the face of link failures
- rm -f replication.actual replication.expected replication-source.log replication-dest.log qpidd-repl.port
- fi
+ diff replicated.actual replicated.expected || exit 1
+ rm -f replication.actual replication.expected replication-source.log replication-dest.log qpidd-repl.port
+ true
fi
diff --git a/qpid/tools/src/py/qpid-ha-tool b/qpid/tools/src/py/qpid-ha-tool
index 5b6d85c7bd..97cbd617d9 100755
--- a/qpid/tools/src/py/qpid-ha-tool
+++ b/qpid/tools/src/py/qpid-ha-tool
@@ -34,38 +34,43 @@ op.add_option("-q", "--query", action="store_true",
help="Show the current HA settings on the broker.")
class HaBroker:
- def __init__(self, broker):
- self.session = qmf.console.Session()
- self.qmf_broker = self.session.addBroker(broker, client_properties={"qpid.ha-admin":1})
- ha_brokers = self.session.getObjects(_class="habroker", _package="org.apache.qpid.ha")
+ def __init__(self, session, broker):
+ self.session = session
+ self.qmf_broker = self.session.addBroker(
+ broker, client_properties={"qpid.ha-admin":1})
+ ha_brokers = self.session.getObjects(
+ _class="habroker", _package="org.apache.qpid.ha")
if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
self.ha_broker = ha_brokers[0]
def query(self):
self.ha_broker.update()
print "status=", self.ha_broker.status
- print "client-addresses=", self.ha_broker.clientAddresses
print "broker-addresses=", self.ha_broker.brokerAddresses
+ print "client-addresses=", self.ha_broker.clientAddresses
def main(argv):
try:
opts, args = op.parse_args(argv)
if len(args) >1: broker = args[1]
else: broker = "localhost:5672"
- hb = HaBroker(broker)
+ session = qmf.console.Session()
try:
+ hb = HaBroker(session, broker)
action=False
- if opts.promote: hb.ha_broker.promote(); action=True
- if opts.client_addresses: hb.ha_broker.setClientAddresses(opts.client_addresses); action=True
- if opts.broker_addresses: hb.ha_broker.setBrokerAddresses(opts.broker_addresses); action=True
+ if opts.promote:
+ hb.ha_broker.promote(); action=True
+ if opts.broker_addresses:
+ hb.ha_broker.setBrokerAddresses(opts.broker_addresses); action=True
+ if opts.client_addresses:
+ hb.ha_broker.setClientAddresses(opts.client_addresses); action=True
if opts.query or not action: hb.query()
return 0
finally:
- hb.session.close() # Avoid errors shutting down threads.
+ session.close() # Avoid errors shutting down threads.
except Exception, e:
- raise # FIXME aconway 2012-01-30:
print e
- return -1
+ return 1
if __name__ == "__main__":
sys.exit(main(sys.argv))