summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-xcpp/src/tests/ha_tests.py224
1 files changed, 3 insertions, 221 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 31142de293..de87c49d21 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -23,230 +23,12 @@ import traceback
from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from qpid.datatypes import uuid4
from brokertest import *
+from ha_test import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
from uuid import UUID
-log = getLogger(__name__)
-
-class QmfAgent(object):
- """Access to a QMF broker agent."""
- def __init__(self, address, **kwargs):
- self._connection = Connection.establish(
- address, client_properties={"qpid.ha-admin":1}, **kwargs)
- self._agent = BrokerAgent(self._connection)
-
- def __getattr__(self, name):
- a = getattr(self._agent, name)
- return a
-
-class Credentials(object):
- """SASL credentials: username, password, and mechanism"""
- def __init__(self, username, password, mechanism):
- (self.username, self.password, self.mechanism) = (username, password, mechanism)
-
- def __str__(self): return "Credentials%s"%(self.tuple(),)
-
- def tuple(self): return (self.username, self.password, self.mechanism)
-
- def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
-
-class HaBroker(Broker):
- """Start a broker with HA enabled
- @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
- """
- def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
- client_credentials=None, **kwargs):
- assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- args = copy(args)
- args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::",
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=%s"%ha_cluster]
- if ha_replicate is not None:
- args += [ "--ha-replicate=%s"%ha_replicate ]
- if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
- Broker.__init__(self, test, args, **kwargs)
- self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
- assert os.path.exists(self.qpid_ha_path)
- self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
- assert os.path.exists(self.qpid_config_path)
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- self.qpid_ha_script=import_script(self.qpid_ha_path)
- self._agent = None
- self.client_credentials = client_credentials
-
- def __str__(self): return Broker.__str__(self)
-
- def qpid_ha(self, args):
- cred = self.client_credentials
- url = self.host_port()
- if cred:
- url =cred.add_user(url)
- args = args + ["--sasl-mechanism", cred.mechanism]
- self.qpid_ha_script.main_except(["", "-b", url]+args)
-
- def promote(self): self.qpid_ha(["promote"])
- def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
- def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
- def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-
- def agent(self):
- if not self._agent:
- cred = self.client_credentials
- if cred:
- self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
- else:
- self._agent = QmfAgent(self.host_port())
- return self._agent
-
- def ha_status(self):
- hb = self.agent().getHaBroker()
- hb.update()
- return hb.status
-
- def wait_status(self, status):
- def try_get_status():
- self._status = self.ha_status()
- # Ignore ConnectionError, the broker may not be up yet.
- try:
- self._status = self.ha_status()
- return self._status == status;
- except ConnectionError: return False
- assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
- self, status, self._status)
-
- # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
- def qpid_config(self, args):
- assert subprocess.call(
- [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
-
- def config_replicate(self, from_broker, queue):
- self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
-
- def config_declare(self, queue, replication):
- self.qpid_config(["add", "queue", queue, "--replicate", replication])
-
- def connect_admin(self, **kwargs):
- cred = self.client_credentials
- if cred:
- return Broker.connect(
- self, client_properties={"qpid.ha-admin":1},
- username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
- **kwargs)
- else:
- return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
-
- def wait_backup(self, address):
- """Wait for address to become valid on a backup broker."""
- bs = self.connect_admin().session()
- try: wait_address(bs, address)
- finally: bs.connection.close()
-
- def assert_browse(self, queue, expected, **kwargs):
- """Verify queue contents by browsing."""
- bs = self.connect().session()
- try:
- wait_address(bs, queue)
- assert_browse_retry(bs, queue, expected, **kwargs)
- finally: bs.connection.close()
-
- def assert_browse_backup(self, queue, expected, **kwargs):
- """Combines wait_backup and assert_browse_retry."""
- bs = self.connect_admin().session()
- try:
- wait_address(bs, queue)
- assert_browse_retry(bs, queue, expected, **kwargs)
- finally: bs.connection.close()
-
- def assert_connect_fail(self):
- try:
- self.connect()
- self.test.fail("Expected ConnectionError")
- except ConnectionError: pass
-
- def try_connect(self):
- try: return self.connect()
- except ConnectionError: return None
-
-class HaCluster(object):
- _cluster_count = 0
-
- def __init__(self, test, n, promote=True, **kwargs):
- """Start a cluster of n brokers"""
- self.test = test
- self.kwargs = kwargs
- self._brokers = []
- self.id = HaCluster._cluster_count
- self.broker_id = 0
- HaCluster._cluster_count += 1
- for i in xrange(n): self.start(False)
- self.update_urls()
- self[0].promote()
-
- def next_name(self):
- name="cluster%s-%s"%(self.id, self.broker_id)
- self.broker_id += 1
- return name
-
- def start(self, update_urls=True, args=[]):
- """Start a new broker in the cluster"""
- b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
- self._brokers.append(b)
- if update_urls: self.update_urls()
- return b
-
- def update_urls(self):
- self.url = ",".join([b.host_port() for b in self])
- if len(self) > 1: # No failover addresses on a 1 cluster.
- for b in self: b.set_brokers_url(self.url)
-
- def connect(self, i):
- """Connect with reconnect_urls"""
- return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
-
- def kill(self, i, promote_next=True):
- """Kill broker i, promote broker i+1"""
- self[i].expect = EXPECT_EXIT_FAIL
- self[i].kill()
- if promote_next: self[(i+1) % len(self)].promote()
-
- def restart(self, i):
- """Start a broker with the same port, name and data directory. It will get
- a separate log file: foo.n.log"""
- b = self._brokers[i]
- self._brokers[i] = HaBroker(
- self.test, name=b.name, port=b.port(), brokers_url=self.url,
- **self.kwargs)
-
- def bounce(self, i, promote_next=True):
- """Stop and restart a broker in a cluster."""
- self.kill(i, promote_next)
- self.restart(i)
-
- # Behave like a list of brokers.
- def __len__(self): return len(self._brokers)
- def __getitem__(self,index): return self._brokers[index]
- def __iter__(self): return self._brokers.__iter__()
-
-def wait_address(session, address):
- """Wait for an address to become valid."""
- def check():
- try:
- session.sender(address)
- return True
- except NotFound: return False
- assert retry(check), "Timed out waiting for address %s"%(address)
-
-def valid_address(session, address):
- """Test if an address is valid"""
- try:
- session.receiver(address)
- return True
- except NotFound: return False
-
class ReplicationTests(BrokerTest):
"""Correctness tests for HA replication."""
@@ -927,7 +709,7 @@ class LongTests(BrokerTest):
if dead is not None:
brokers.restart(dead) # Restart backup
- brokers[dead].ready(client_properties={"qpid.ha-admin":1})
+ brokers[dead].ready()
dead = None
i += 1
except:
@@ -1031,5 +813,5 @@ if __name__ == "__main__":
os.execvp("qpid-python-test",
["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
else:
- print "Skipping ha_tests, qpid_ha not available"
+ print "Skipping ha_tests, %s not available"%(qpid_ha)