From 1b390a851f996d058123e7709839455c8b761dab Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 17 Feb 2012 14:13:22 +0000 Subject: QPID-3603: Test to verify C++ client failover is working. - TcpConnector: set identifier early so it is available in error messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245528 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/TCPConnector.cpp | 8 ++--- qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp | 2 -- qpid/cpp/src/tests/brokertest.py | 52 ++++++++++++++-------------- qpid/cpp/src/tests/cluster_tests.py | 4 +-- qpid/cpp/src/tests/ha_tests.py | 53 +++++++++++++++++++++-------- 5 files changed, 71 insertions(+), 48 deletions(-) diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp index 4660a41c07..51eacf77e8 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.cpp +++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp @@ -97,7 +97,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) { boost::bind(&TCPConnector::connected, this, _1), boost::bind(&TCPConnector::connectFailed, this, _3)); closed = false; - + identifier = str(format("[%1%]") % socket.getFullAddress()); connector->start(poller); } @@ -120,8 +120,6 @@ void TCPConnector::start(sys::AsynchIO* aio_) { for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - - identifier = str(format("[%1%]") % socket.getFullAddress()); } void TCPConnector::initAmqp() { @@ -131,7 +129,7 @@ void TCPConnector::initAmqp() { void TCPConnector::connectFailed(const std::string& msg) { connector = 0; - QPID_LOG(warning, "Connect failed: " << msg); + QPID_LOG(warning, "Connect failed: " << msg << " " << identifier); socket.close(); if (!closed) closed = true; @@ -185,7 +183,7 @@ sys::ShutdownHandler* TCPConnector::getShutdownHandler() const { return shutdownHandler; } -const std::string& TCPConnector::getIdentifier() const { +const std::string& TCPConnector::getIdentifier() const { return identifier; } diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp index d8c2e8e34e..df50f1da19 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp @@ -34,8 +34,6 @@ void ConnectionExcluder::opened(broker::Connection& connection) { && !connection.getClientProperties().isSet(ADMIN_TAG)) throw Exception( QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId())); - else - QPID_LOG(debug, "HA: Backup broker accepted connection" << connection.getMgmtId()); } const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin"; diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index f4a20eda5a..b2b4d89b0f 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -198,16 +198,17 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - self._cleanup() + self.wait() def kill(self): - try: subprocess.Popen.kill(self) + try: + subprocess.Popen.kill(self) except AttributeError: # No terminate method try: os.kill( self.pid , signal.SIGKILL) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - self._cleanup() + self.wait() def _cleanup(self): """Clean up after a dead process""" @@ -555,22 +556,22 @@ class NumberedSender(Thread): """ def __init__(self, broker, max_depth=None, queue="test-queue", - connection_options=Cluster.CONNECTION_OPTIONS): + connection_options=Cluster.CONNECTION_OPTIONS, + failover_updates=True, url=None): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. """ Thread.__init__(self) + cmd = ["qpid-send", + "--broker", url or broker.host_port(), + "--address", "%s;{create:always}"%queue, + "--connection-options", "{%s}"%(connection_options), + "--content-stdin" + ] + if failover_updates: cmd += ["--failover-updates"] self.sender = broker.test.popen( - ["qpid-send", - "--broker", "localhost:%s"%broker.port(), - "--address", "%s;{create:always}"%queue, - "--failover-updates", - "--connection-options", "{%s}"%(connection_options), - "--content-stdin" - ], - expect=EXPECT_RUNNING, - stdin=PIPE) + cmd, expect=EXPECT_RUNNING, stdin=PIPE) self.condition = Condition() self.max = max_depth self.received = 0 @@ -617,30 +618,31 @@ class NumberedReceiver(Thread): Thread to run a receiver client and verify it receives sequentially numbered messages. """ - def __init__(self, broker, sender = None, queue="test-queue", - connection_options=Cluster.CONNECTION_OPTIONS): + def __init__(self, broker, sender=None, queue="test-queue", + connection_options=Cluster.CONNECTION_OPTIONS, + failover_updates=True, url=None): """ sender: enable flow control. Call sender.received(n) for each message received. """ Thread.__init__(self) self.test = broker.test + cmd = ["qpid-receive", + "--broker", url or broker.host_port(), + "--address", "%s;{create:always}"%queue, + "--connection-options", "{%s}"%(connection_options), + "--forever" + ] + if failover_updates: cmd += [ "--failover-updates" ] self.receiver = self.test.popen( - ["qpid-receive", - "--broker", "localhost:%s"%broker.port(), - "--address", "%s;{create:always}"%queue, - "--failover-updates", - "--connection-options", "{%s}"%(connection_options), - "--forever" - ], - expect=EXPECT_RUNNING, - stdout=PIPE) + cmd, expect=EXPECT_RUNNING, stdout=PIPE) self.lock = Lock() self.error = None self.sender = sender self.received = 0 def read_message(self): - return int(self.receiver.stdout.readline()) + n = int(self.receiver.stdout.readline()) + return n def run(self): try: diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 2db2cdd433..d2de384f08 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -1046,8 +1046,8 @@ class LongTests(BrokerTest): # Start sender and receiver threads cluster[0].declare_queue("test-queue") - sender = NumberedSender(cluster[0], 1000) # Max queue depth - receiver = NumberedReceiver(cluster[0], sender) + sender = NumberedSender(cluster[0], max_depth=1000) + receiver = NumberedReceiver(cluster[0], sender=sender) receiver.start() sender.start() # Wait for sender & receiver to get up and running diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f3c45ba7a3..6d5dc19aa6 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil from qpid.messaging import Message, NotFound, ConnectionError, Connection from brokertest import * from threading import Thread, Lock, Condition -from logging import getLogger +from logging import getLogger, WARN, ERROR, DEBUG log = getLogger("qpid.ha-tests") @@ -39,7 +39,8 @@ class ShortTests(BrokerTest): ] + args, **kwargs) - # FIXME aconway 2011-11-15: work around async replication. + # FIXME aconway 2011-11-15: work around async wiring replication. + # Wait for an address to become valid. def wait(self, session, address): def check(): try: @@ -48,6 +49,13 @@ class ShortTests(BrokerTest): except NotFound: return False assert retry(check), "Timed out waiting for %s"%(address) + # FIXME aconway 2012-01-23: workaround: we need to give the + # backup a chance to attach to the queue. + def wait_backup(self, backup, address): + bs = self.connect_admin(backup).session() + self.wait(bs, address) + bs.connection.close() + def set_ha_status(self, address, status): os.system("qpid-ha-status %s %s"%(address, status)) @@ -209,7 +217,8 @@ class ShortTests(BrokerTest): raise def test_failover(self): - """Verify that backups rejects connections and that fail-over works""" + """Verify that backups rejects connections and that fail-over works in python client""" + getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary backup = self.ha_broker(name="backup", broker_url=primary.host_port()) # Check that backup rejects normal connections @@ -223,20 +232,36 @@ class ShortTests(BrokerTest): # Test discovery: should connect to primary after reject by backup c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) s = c.session() - s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", sync=True) - # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here, - # send(sync=True) shouldn't return till the backup acknowledges. - bs = self.connect_admin(backup).session() - self.wait(bs, "q") - self.assert_browse_retry(bs, "q", ["foo"]) - bs.connection.close() + sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate())) + self.wait_backup(backup, "q") + sender.send("foo") + primary.kill() + assert retry(lambda: not is_running(primary.pid)) + self.set_ha_status(backup.host_port(), "primary") # Promote the backup + self.assert_browse_retry(s, "q", ["foo"]) + c.close() + + def test_failover_cpp(self): + primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary + backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + url="%s,%s"%(primary.host_port(), backup.host_port()) + primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) + self.wait_backup(backup, "q") + + sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) + receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False) + receiver.start() + sender.start() + self.wait_backup(backup, "q") + assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru primary.kill() - # Promote the backup + assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die self.set_ha_status(backup.host_port(), "primary") - # FIXME aconway 2012-01-23: should re-use session s below - self.assert_browse_retry(c.session(), "q", ["foo"]) - c.close() + n = receiver.received # Make sure we are still running + assert retry(lambda: receiver.received > n + 10) + sender.stop() + receiver.stop() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) -- cgit v1.2.1