diff options
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 224 |
1 files changed, 3 insertions, 221 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 31142de293..de87c49d21 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -23,230 +23,12 @@ import traceback from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4 from brokertest import * +from ha_test import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent from uuid import UUID -log = getLogger(__name__) - -class QmfAgent(object): - """Access to a QMF broker agent.""" - def __init__(self, address, **kwargs): - self._connection = Connection.establish( - address, client_properties={"qpid.ha-admin":1}, **kwargs) - self._agent = BrokerAgent(self._connection) - - def __getattr__(self, name): - a = getattr(self._agent, name) - return a - -class Credentials(object): - """SASL credentials: username, password, and mechanism""" - def __init__(self, username, password, mechanism): - (self.username, self.password, self.mechanism) = (username, password, mechanism) - - def __str__(self): return "Credentials%s"%(self.tuple(),) - - def tuple(self): return (self.username, self.password, self.mechanism) - - def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) - -class HaBroker(Broker): - """Start a broker with HA enabled - @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. - """ - def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", - client_credentials=None, **kwargs): - assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - args = copy(args) - args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-cluster=%s"%ha_cluster] - if ha_replicate is not None: - args += [ "--ha-replicate=%s"%ha_replicate ] - if brokers_url: args += [ "--ha-brokers-url", brokers_url ] - Broker.__init__(self, test, args, **kwargs) - self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") - assert os.path.exists(self.qpid_ha_path) - self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") - assert os.path.exists(self.qpid_config_path) - getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - self.qpid_ha_script=import_script(self.qpid_ha_path) - self._agent = None - self.client_credentials = client_credentials - - def __str__(self): return Broker.__str__(self) - - def qpid_ha(self, args): - cred = self.client_credentials - url = self.host_port() - if cred: - url =cred.add_user(url) - args = args + ["--sasl-mechanism", cred.mechanism] - self.qpid_ha_script.main_except(["", "-b", url]+args) - - def promote(self): self.qpid_ha(["promote"]) - def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) - def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) - def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - - def agent(self): - if not self._agent: - cred = self.client_credentials - if cred: - self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) - else: - self._agent = QmfAgent(self.host_port()) - return self._agent - - def ha_status(self): - hb = self.agent().getHaBroker() - hb.update() - return hb.status - - def wait_status(self, status): - def try_get_status(): - self._status = self.ha_status() - # Ignore ConnectionError, the broker may not be up yet. - try: - self._status = self.ha_status() - return self._status == status; - except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( - self, status, self._status) - - # FIXME aconway 2012-05-01: do direct python call to qpid-config code. - def qpid_config(self, args): - assert subprocess.call( - [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 - - def config_replicate(self, from_broker, queue): - self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) - - def config_declare(self, queue, replication): - self.qpid_config(["add", "queue", queue, "--replicate", replication]) - - def connect_admin(self, **kwargs): - cred = self.client_credentials - if cred: - return Broker.connect( - self, client_properties={"qpid.ha-admin":1}, - username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, - **kwargs) - else: - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) - - def wait_backup(self, address): - """Wait for address to become valid on a backup broker.""" - bs = self.connect_admin().session() - try: wait_address(bs, address) - finally: bs.connection.close() - - def assert_browse(self, queue, expected, **kwargs): - """Verify queue contents by browsing.""" - bs = self.connect().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - - def assert_browse_backup(self, queue, expected, **kwargs): - """Combines wait_backup and assert_browse_retry.""" - bs = self.connect_admin().session() - try: - wait_address(bs, queue) - assert_browse_retry(bs, queue, expected, **kwargs) - finally: bs.connection.close() - - def assert_connect_fail(self): - try: - self.connect() - self.test.fail("Expected ConnectionError") - except ConnectionError: pass - - def try_connect(self): - try: return self.connect() - except ConnectionError: return None - -class HaCluster(object): - _cluster_count = 0 - - def __init__(self, test, n, promote=True, **kwargs): - """Start a cluster of n brokers""" - self.test = test - self.kwargs = kwargs - self._brokers = [] - self.id = HaCluster._cluster_count - self.broker_id = 0 - HaCluster._cluster_count += 1 - for i in xrange(n): self.start(False) - self.update_urls() - self[0].promote() - - def next_name(self): - name="cluster%s-%s"%(self.id, self.broker_id) - self.broker_id += 1 - return name - - def start(self, update_urls=True, args=[]): - """Start a new broker in the cluster""" - b = HaBroker(self.test, name=self.next_name(), **self.kwargs) - self._brokers.append(b) - if update_urls: self.update_urls() - return b - - def update_urls(self): - self.url = ",".join([b.host_port() for b in self]) - if len(self) > 1: # No failover addresses on a 1 cluster. - for b in self: b.set_brokers_url(self.url) - - def connect(self, i): - """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) - - def kill(self, i, promote_next=True): - """Kill broker i, promote broker i+1""" - self[i].expect = EXPECT_EXIT_FAIL - self[i].kill() - if promote_next: self[(i+1) % len(self)].promote() - - def restart(self, i): - """Start a broker with the same port, name and data directory. It will get - a separate log file: foo.n.log""" - b = self._brokers[i] - self._brokers[i] = HaBroker( - self.test, name=b.name, port=b.port(), brokers_url=self.url, - **self.kwargs) - - def bounce(self, i, promote_next=True): - """Stop and restart a broker in a cluster.""" - self.kill(i, promote_next) - self.restart(i) - - # Behave like a list of brokers. - def __len__(self): return len(self._brokers) - def __getitem__(self,index): return self._brokers[index] - def __iter__(self): return self._brokers.__iter__() - -def wait_address(session, address): - """Wait for an address to become valid.""" - def check(): - try: - session.sender(address) - return True - except NotFound: return False - assert retry(check), "Timed out waiting for address %s"%(address) - -def valid_address(session, address): - """Test if an address is valid""" - try: - session.receiver(address) - return True - except NotFound: return False - class ReplicationTests(BrokerTest): """Correctness tests for HA replication.""" @@ -927,7 +709,7 @@ class LongTests(BrokerTest): if dead is not None: brokers.restart(dead) # Restart backup - brokers[dead].ready(client_properties={"qpid.ha-admin":1}) + brokers[dead].ready() dead = None i += 1 except: @@ -1031,5 +813,5 @@ if __name__ == "__main__": os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) else: - print "Skipping ha_tests, qpid_ha not available" + print "Skipping ha_tests, %s not available"%(qpid_ha) |