summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-03-03 20:23:43 +0000
committerAlan Conway <aconway@apache.org>2010-03-03 20:23:43 +0000
commite855d055a9c136828dd45336e372cd2d8726b6e7 (patch)
treefcf9355d10b6cb5a2f8e39626ef2e43bff31b5e6 /cpp/src/tests
parentddd5b1a3e0c37ef3f02c7ba4c948b1be94ff2d59 (diff)
downloadqpid-python-e855d055a9c136828dd45336e372cd2d8726b6e7.tar.gz
Extended cluster_tests.test_management exposes a bug.
- kill and start brokers with clients running. - added qpid-stat -b and testagent clients - disabled in src/tests/cluster_tests.fail till bug is fixed. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@918674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/cluster_tests.fail1
-rwxr-xr-xcpp/src/tests/cluster_tests.py97
2 files changed, 75 insertions, 23 deletions
diff --git a/cpp/src/tests/cluster_tests.fail b/cpp/src/tests/cluster_tests.fail
index 139597f9cb..268795642d 100644
--- a/cpp/src/tests/cluster_tests.fail
+++ b/cpp/src/tests/cluster_tests.fail
@@ -1,2 +1,3 @@
+cluster_tests.LongTests.test_management
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 22b7c8f5b8..276ad1af2d 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -25,6 +25,7 @@ from qpid.harness import Skipped
from qpid.messaging import Message
from threading import Thread
from logging import getLogger
+from itertools import chain
log = getLogger("qpid.cluster_tests")
@@ -149,21 +150,17 @@ class LongTests(BrokerTest):
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self):
- """Run management in conjunction with other traffic."""
- # Publish often to provoke errors
- args=["--mgmt-pub-interval", 1]
- # Use store if present
- if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
+ """Run management clients and other clients concurrently."""
+ # FIXME aconway 2010-03-03: move to framework
class ClientLoop(StoppableThread):
- """Run an infinite client loop."""
+ """Run a client executable in a loop."""
def __init__(self, broker, cmd):
StoppableThread.__init__(self)
self.broker=broker
- self.cmd = cmd
+ self.cmd = cmd # Client command.
self.lock = Lock()
- self.process = None
- self.stopped = False
+ self.process = None # Client process.
self.start()
def run(self):
@@ -172,37 +169,91 @@ class LongTests(BrokerTest):
self.lock.acquire()
try:
if self.stopped: break
- self.process = self.broker.test.popen(self.cmd,
- expect=EXPECT_UNKNOWN)
+ self.process = self.broker.test.popen(
+ self.cmd, expect=EXPECT_UNKNOWN)
finally: self.lock.release()
try: exit = self.process.wait()
except: exit = 1
self.lock.acquire()
try:
- if exit != 0 and not self.stopped:
- self.process.unexpected("bad exit status in client loop")
+ # Quit and ignore errors if stopped or expecting failure.
+ if self.stopped: break
+ if exit != 0:
+ self.process.unexpected("client of %s exit status %s" %
+ (self.broker.name, exit))
finally: self.lock.release()
except Exception, e:
- error=e
+ self.error = RethrownException("Error in ClientLoop.run")
+ def expect_fail(self):
+ """Ignore exit status of the currently running client."""
+ self.lock.acquire()
+ stopped = True
+ self.lock.release()
+
def stop(self):
+ """Stop the running client and wait for it to exit"""
self.lock.acquire()
try:
self.stopped = True
- try: self.process.terminate()
- except: pass
+ if self.process:
+ try: self.process.kill() # Kill the client.
+ except OSError: pass # The client might not be running.
finally: self.lock.release()
StoppableThread.stop(self)
+
+ # def test_management
+ args=["--mgmt-pub-interval", 1] # Publish management information every second.
+ # Use store if present.
+ if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
cluster = self.cluster(3, args)
- clients = []
- for b in cluster:
- clients.append(ClientLoop(b, ["perftest", "--count", "100", "--port", b.port()]))
- clients.append(ClientLoop(b, ["qpid-queue-stats", "-a", "localhost:%s" %(b.port())]))
+
+ clients = [] # Ordinary clients that only connect to one broker.
+ mclients = [] # Management clients that connect to every broker in the cluster.
+
+ def start_clients(broker):
+ """Start ordinary clients for a broker"""
+ batch = []
+ for cmd in [
+ ["perftest", "--count", 1000,
+ "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
+ ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
+ [os.path.join(self.rootdir, "testagent/testagent"), "localhost",
+ str(broker.port())]
+ ]:
+ batch.append(ClientLoop(broker, cmd))
+ clients.append(batch)
+
+ def start_mclients(broker):
+ """Start management clients that make multiple connections"""
+ for cmd in [
+ ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
+ ]:
+ mclients.append(ClientLoop(broker, cmd))
+
endtime = time.time() + self.duration()
+ alive = 0 # First live cluster member
+ for b in cluster:
+ start_clients(b)
+ start_mclients(b)
+
while time.time() < endtime:
- for b in cluster: b.ready() # Will raise if broker crashed.
- time.sleep(1)
- for c in clients:
+ time.sleep(min(5,self.duration()))
+ for b in cluster[alive:]: b.ready() # Check if a broker crashed.
+ # Kill the first broker. Ignore errors on its clients and all the mclients
+ for c in clients[alive] + mclients: c.expect_fail()
+ clients[alive] = []
+ mclients = []
+ b = cluster[alive]
+ b.expect = EXPECT_EXIT_FAIL
+ b.kill()
+ # Start another broker and clients
+ alive += 1
+ b = cluster.start()
+ start_clients(b)
+ for b in cluster[alive:]: start_mclients(b)
+
+ for c in chain(mclients, *clients):
c.stop()
class StoreTests(BrokerTest):