diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_test.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 41 |
1 files changed, 20 insertions, 21 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 0f92f7dbcc..2bf8677cd1 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -20,8 +20,6 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty -from qpid.datatypes import uuid4, UUID from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO @@ -44,7 +42,7 @@ class LogLevel: class QmfAgent(object): """Access to a QMF broker agent.""" def __init__(self, address, **kwargs): - self._connection = Connection.establish( + self._connection = qm.Connection.establish( address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) @@ -105,9 +103,9 @@ class HaPort: self.port = self.socket.getsockname()[1] self.fileno = self.socket.fileno() self.stopped = False - test.cleanup_stop(self) # Stop during test.tearDown + test.teardown_add(self) # Stop during test.tearDown - def stop(self): # Called in tearDown + def teardown(self): # Called in tearDown if not self.stopped: self.stopped = True self.socket.shutdown(socket.SHUT_RDWR) @@ -180,6 +178,7 @@ acl allow all all def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url]) def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]); def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + @property def agent(self): if not self._agent: cred = self.client_credentials @@ -190,7 +189,7 @@ acl allow all all return self._agent def qmf(self): - hb = self.agent().getHaBroker() + hb = self.agent.getHaBroker() hb.update() return hb @@ -203,19 +202,19 @@ acl allow all all try: self._status = self.ha_status() return self._status == status; - except ConnectionError: return False + except qm.ConnectionError: return False assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%( self, status, self._status) - def wait_queue(self, queue, timeout=1): + def wait_queue(self, queue, timeout=1, msg="wait_queue"): """ Wait for queue to be visible via QMF""" - agent = self.agent() - assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout) + agent = self.agent + assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue - def wait_no_queue(self, queue, timeout=1): + def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"): """ Wait for queue to be invisible via QMF""" - agent = self.agent() - assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout) + agent = self.agent + assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue) # TODO aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -273,12 +272,12 @@ acl allow all all def assert_connect_fail(self): try: self.connect() - self.test.fail("Expected ConnectionError") - except ConnectionError: pass + self.test.fail("Expected qm.ConnectionError") + except qm.ConnectionError: pass def try_connect(self): try: return self.connect() - except ConnectionError: return None + except qm.ConnectionError: return None def ready(self, *args, **kwargs): if not 'client_properties' in kwargs: kwargs['client_properties'] = {} @@ -286,7 +285,7 @@ acl allow all all return Broker.ready(self, *args, **kwargs) def kill(self, final=True): - if final: self.ha_port.stop() + if final: self.ha_port.teardown() self._agent = None return Broker.kill(self) @@ -355,9 +354,9 @@ class HaCluster(object): b.set_brokers_url(self.url) b.set_public_url(self.url) - def connect(self, i): + def connect(self, i, **kwargs): """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) + return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) def kill(self, i, promote_next=True, final=True): """Kill broker i, promote broker i+1""" @@ -393,7 +392,7 @@ def wait_address(session, address): """Wait for an address to become valid.""" def check(): try: session.sender(address); return True - except NotFound: return False + except qm.NotFound: return False assert retry(check), "Timed out waiting for address %s"%(address) def valid_address(session, address): @@ -401,6 +400,6 @@ def valid_address(session, address): try: session.receiver(address) return True - except NotFound: return False + except qm.NotFound: return False |