diff options
author | Alan Conway <aconway@apache.org> | 2010-03-03 20:23:43 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-03-03 20:23:43 +0000 |
commit | e855d055a9c136828dd45336e372cd2d8726b6e7 (patch) | |
tree | fcf9355d10b6cb5a2f8e39626ef2e43bff31b5e6 /cpp/src/tests | |
parent | ddd5b1a3e0c37ef3f02c7ba4c948b1be94ff2d59 (diff) | |
download | qpid-python-e855d055a9c136828dd45336e372cd2d8726b6e7.tar.gz |
Extended cluster_tests.test_management exposes a bug.
- kill and start brokers with clients running.
- added qpid-stat -b and testagent clients
- disabled in src/tests/cluster_tests.fail till bug is fixed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@918674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/cluster_tests.fail | 1 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 97 |
2 files changed, 75 insertions, 23 deletions
diff --git a/cpp/src/tests/cluster_tests.fail b/cpp/src/tests/cluster_tests.fail index 139597f9cb..268795642d 100644 --- a/cpp/src/tests/cluster_tests.fail +++ b/cpp/src/tests/cluster_tests.fail @@ -1,2 +1,3 @@ +cluster_tests.LongTests.test_management diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 22b7c8f5b8..276ad1af2d 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -25,6 +25,7 @@ from qpid.harness import Skipped from qpid.messaging import Message from threading import Thread from logging import getLogger +from itertools import chain log = getLogger("qpid.cluster_tests") @@ -149,21 +150,17 @@ class LongTests(BrokerTest): for i in range(i, len(cluster)): cluster[i].kill() def test_management(self): - """Run management in conjunction with other traffic.""" - # Publish often to provoke errors - args=["--mgmt-pub-interval", 1] - # Use store if present - if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] + """Run management clients and other clients concurrently.""" + # FIXME aconway 2010-03-03: move to framework class ClientLoop(StoppableThread): - """Run an infinite client loop.""" + """Run a client executable in a loop.""" def __init__(self, broker, cmd): StoppableThread.__init__(self) self.broker=broker - self.cmd = cmd + self.cmd = cmd # Client command. self.lock = Lock() - self.process = None - self.stopped = False + self.process = None # Client process. self.start() def run(self): @@ -172,37 +169,91 @@ class LongTests(BrokerTest): self.lock.acquire() try: if self.stopped: break - self.process = self.broker.test.popen(self.cmd, - expect=EXPECT_UNKNOWN) + self.process = self.broker.test.popen( + self.cmd, expect=EXPECT_UNKNOWN) finally: self.lock.release() try: exit = self.process.wait() except: exit = 1 self.lock.acquire() try: - if exit != 0 and not self.stopped: - self.process.unexpected("bad exit status in client loop") + # Quit and ignore errors if stopped or expecting failure. + if self.stopped: break + if exit != 0: + self.process.unexpected("client of %s exit status %s" % + (self.broker.name, exit)) finally: self.lock.release() except Exception, e: - error=e + self.error = RethrownException("Error in ClientLoop.run") + def expect_fail(self): + """Ignore exit status of the currently running client.""" + self.lock.acquire() + stopped = True + self.lock.release() + def stop(self): + """Stop the running client and wait for it to exit""" self.lock.acquire() try: self.stopped = True - try: self.process.terminate() - except: pass + if self.process: + try: self.process.kill() # Kill the client. + except OSError: pass # The client might not be running. finally: self.lock.release() StoppableThread.stop(self) + + # def test_management + args=["--mgmt-pub-interval", 1] # Publish management information every second. + # Use store if present. + if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] cluster = self.cluster(3, args) - clients = [] - for b in cluster: - clients.append(ClientLoop(b, ["perftest", "--count", "100", "--port", b.port()])) - clients.append(ClientLoop(b, ["qpid-queue-stats", "-a", "localhost:%s" %(b.port())])) + + clients = [] # Ordinary clients that only connect to one broker. + mclients = [] # Management clients that connect to every broker in the cluster. + + def start_clients(broker): + """Start ordinary clients for a broker""" + batch = [] + for cmd in [ + ["perftest", "--count", 1000, + "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], + ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], + [os.path.join(self.rootdir, "testagent/testagent"), "localhost", + str(broker.port())] + ]: + batch.append(ClientLoop(broker, cmd)) + clients.append(batch) + + def start_mclients(broker): + """Start management clients that make multiple connections""" + for cmd in [ + ["qpid-stat", "-b", "localhost:%s" %(broker.port())] + ]: + mclients.append(ClientLoop(broker, cmd)) + endtime = time.time() + self.duration() + alive = 0 # First live cluster member + for b in cluster: + start_clients(b) + start_mclients(b) + while time.time() < endtime: - for b in cluster: b.ready() # Will raise if broker crashed. - time.sleep(1) - for c in clients: + time.sleep(min(5,self.duration())) + for b in cluster[alive:]: b.ready() # Check if a broker crashed. + # Kill the first broker. Ignore errors on its clients and all the mclients + for c in clients[alive] + mclients: c.expect_fail() + clients[alive] = [] + mclients = [] + b = cluster[alive] + b.expect = EXPECT_EXIT_FAIL + b.kill() + # Start another broker and clients + alive += 1 + b = cluster.start() + start_clients(b) + for b in cluster[alive:]: start_mclients(b) + + for c in chain(mclients, *clients): c.stop() class StoreTests(BrokerTest): |