summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_test.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py41
1 files changed, 20 insertions, 21 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 0f92f7dbcc..2bf8677cd1 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -20,8 +20,6 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
-from qpid.datatypes import uuid4, UUID
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
@@ -44,7 +42,7 @@ class LogLevel:
class QmfAgent(object):
"""Access to a QMF broker agent."""
def __init__(self, address, **kwargs):
- self._connection = Connection.establish(
+ self._connection = qm.Connection.establish(
address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
@@ -105,9 +103,9 @@ class HaPort:
self.port = self.socket.getsockname()[1]
self.fileno = self.socket.fileno()
self.stopped = False
- test.cleanup_stop(self) # Stop during test.tearDown
+ test.teardown_add(self) # Stop during test.tearDown
- def stop(self): # Called in tearDown
+ def teardown(self): # Called in tearDown
if not self.stopped:
self.stopped = True
self.socket.shutdown(socket.SHUT_RDWR)
@@ -180,6 +178,7 @@ acl allow all all
def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]);
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+ @property
def agent(self):
if not self._agent:
cred = self.client_credentials
@@ -190,7 +189,7 @@ acl allow all all
return self._agent
def qmf(self):
- hb = self.agent().getHaBroker()
+ hb = self.agent.getHaBroker()
hb.update()
return hb
@@ -203,19 +202,19 @@ acl allow all all
try:
self._status = self.ha_status()
return self._status == status;
- except ConnectionError: return False
+ except qm.ConnectionError: return False
assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
self, status, self._status)
- def wait_queue(self, queue, timeout=1):
+ def wait_queue(self, queue, timeout=1, msg="wait_queue"):
""" Wait for queue to be visible via QMF"""
- agent = self.agent()
- assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout)
+ agent = self.agent
+ assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue
- def wait_no_queue(self, queue, timeout=1):
+ def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"):
""" Wait for queue to be invisible via QMF"""
- agent = self.agent()
- assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
+ agent = self.agent
+ assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue)
# TODO aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -273,12 +272,12 @@ acl allow all all
def assert_connect_fail(self):
try:
self.connect()
- self.test.fail("Expected ConnectionError")
- except ConnectionError: pass
+ self.test.fail("Expected qm.ConnectionError")
+ except qm.ConnectionError: pass
def try_connect(self):
try: return self.connect()
- except ConnectionError: return None
+ except qm.ConnectionError: return None
def ready(self, *args, **kwargs):
if not 'client_properties' in kwargs: kwargs['client_properties'] = {}
@@ -286,7 +285,7 @@ acl allow all all
return Broker.ready(self, *args, **kwargs)
def kill(self, final=True):
- if final: self.ha_port.stop()
+ if final: self.ha_port.teardown()
self._agent = None
return Broker.kill(self)
@@ -355,9 +354,9 @@ class HaCluster(object):
b.set_brokers_url(self.url)
b.set_public_url(self.url)
- def connect(self, i):
+ def connect(self, i, **kwargs):
"""Connect with reconnect_urls"""
- return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
+ return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
def kill(self, i, promote_next=True, final=True):
"""Kill broker i, promote broker i+1"""
@@ -393,7 +392,7 @@ def wait_address(session, address):
"""Wait for an address to become valid."""
def check():
try: session.sender(address); return True
- except NotFound: return False
+ except qm.NotFound: return False
assert retry(check), "Timed out waiting for address %s"%(address)
def valid_address(session, address):
@@ -401,6 +400,6 @@ def valid_address(session, address):
try:
session.receiver(address)
return True
- except NotFound: return False
+ except qm.NotFound: return False