summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_test.py
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2015-06-25 10:22:51 +0000
committerRobert Gemmell <robbie@apache.org>2015-06-25 10:22:51 +0000
commit32ae758bc2e8fd962b66a4ab6341b14009f1907e (patch)
tree2f4d8174813284a6ea58bb6b7f6520aa92287476 /qpid/cpp/src/tests/ha_test.py
parent116d91ad7825a98af36a869fc751206fbce0c59f (diff)
parentf7e896076143de4572b4f1f67ef0765125f2498d (diff)
downloadqpid-python-32ae758bc2e8fd962b66a4ab6341b14009f1907e.tar.gz
NO-JIRA: create branch for qpid-cpp 0.34 RC process
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-cpp-0.34-rc@1687469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/ha_test.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py403
1 files changed, 403 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
new file mode 100755
index 0000000000..82ca808cb1
--- /dev/null
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -0,0 +1,403 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
+import traceback
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
+from qpidtoollibs import BrokerAgent
+from qpid.harness import Skipped
+
+log = getLogger(__name__)
+
+class LogLevel:
+ """
+ Temporarily change the log settings on the root logger.
+ Used to suppress expected WARN messages from the python client.
+ """
+ def __init__(self, level):
+ self.save_level = getLogger().getEffectiveLevel()
+ getLogger().setLevel(level)
+
+ def restore(self):
+ getLogger().setLevel(self.save_level)
+
+class QmfAgent(object):
+ """Access to a QMF broker agent."""
+ def __init__(self, address, **kwargs):
+ self._connection = qm.Connection.establish(
+ address, client_properties={"qpid.ha-admin":1}, **kwargs)
+ self._agent = BrokerAgent(self._connection)
+
+ def queues(self):
+ return [q.values['name'] for q in self._agent.getAllQueues()]
+
+ def repsub_queue(self, sub):
+ """If QMF subscription sub is a replicating subscription return
+ the name of the replicated queue, else return None"""
+ session = self.getSession(sub.sessionRef)
+ if not session: return None
+ m = re.search("qpid.ha-q:(.*)\.", session.name)
+ return m and m.group(1)
+
+ def repsub_queues(self):
+ """Return queue names for all replicating subscriptions"""
+ return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()])
+
+ def tx_queues(self):
+ """Return names of all tx-queues"""
+ return [q for q in self.queues() if q.startswith("qpid.ha-tx")]
+
+ def __getattr__(self, name):
+ a = getattr(self._agent, name)
+ return a
+
+class Credentials(object):
+ """SASL credentials: username, password, and mechanism"""
+ def __init__(self, username, password, mechanism):
+ (self.username, self.password, self.mechanism) = (username, password, mechanism)
+
+ def __str__(self): return "Credentials%s"%(self.tuple(),)
+
+ def tuple(self): return (self.username, self.password, self.mechanism)
+
+ def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+
+class HaPort:
+ """Many HA tests need to allocate a broker port dynamically and then kill
+ and restart a broker on that same port multiple times. qpidd --port=0 only
+ ensures the port for the initial broker process, subsequent brokers re-using
+ the same port may fail with "address already in use".
+
+ HaPort binds and listens to the port and returns a file descriptor to pass
+ to qpidd --socket-fd. It holds on to the port untill the end of the test so
+ the broker can restart multiple times.
+ """
+
+ def __init__(self, test, port=0):
+ """Bind and listen to port. port=0 allocates a port dynamically.
+ self.port is the allocated port, self.fileno is the file descriptor for
+ qpid --socket-fd."""
+
+ self.test = test
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.bind(("", port))
+ self.socket.listen(5)
+ self.port = self.socket.getsockname()[1]
+ self.fileno = self.socket.fileno()
+ self.stopped = False
+ test.teardown_add(self) # Stop during test.tearDown
+
+ def teardown(self): # Called in tearDown
+ if not self.stopped:
+ self.stopped = True
+ self.socket.shutdown(socket.SHUT_RDWR)
+ self.socket.close()
+
+ def __str__(self): return "HaPort<port:%s, fileno:%s>"%(self.port, self.fileno)
+
+
+class HaBroker(Broker):
+ """Start a broker with HA enabled
+ @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
+ """
+
+ heartbeat=5
+
+ def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True,
+ ha_replicate="all", client_credentials=None, **kwargs):
+ assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ ha_port = ha_port or HaPort(test)
+ args = copy(args)
+ args += ["--load-module", BrokerTest.ha_lib,
+ # Non-standard settings for faster tests.
+ "--link-maintenance-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster]
+ # Add default --log-enable arguments unless args already has --log arguments.
+ if not env_has_log_config() and not [l for l in args if l.startswith("--log")]:
+ args += ["--log-enable=info+", "--log-enable=debug+:ha::"]
+ if not [h for h in args if h.startswith("--link-heartbeat-interval")]:
+ args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)]
+
+ if ha_replicate is not None:
+ args += [ "--ha-replicate=%s"%ha_replicate ]
+ if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
+ # Set up default ACL
+ acl=os.path.join(os.getcwd(), "unrestricted.acl")
+ if not os.path.exists(acl):
+ aclf=file(acl,"w")
+ aclf.write("""
+acl allow all all
+ """)
+ aclf.close()
+ if not "--acl-file" in args:
+ args += [ "--acl-file", acl, ]
+ args += ["--socket-fd=%s"%ha_port.fileno, "--listen-disable=tcp"]
+ self._agent = None
+ self.client_credentials = client_credentials
+ self.ha_port = ha_port
+ Broker.__init__(self, test, args, port=ha_port.port, **kwargs)
+
+ # Do some static setup to locate the qpid-config and qpid-ha tools.
+ @property
+ def qpid_ha_script(self):
+ if not hasattr(self, "_qpid_ha_script"):
+ qpid_ha_exec = os.getenv("QPID_HA_EXEC")
+ if not qpid_ha_exec or not os.path.isfile(qpid_ha_exec):
+ raise Skipped("qpid-ha not available")
+ self._qpid_ha_script = import_script(qpid_ha_exec)
+ return self._qpid_ha_script
+
+ def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port())
+
+ def qpid_ha(self, args):
+ if not self.qpid_ha_script:
+ raise Skipped("qpid-ha not available")
+ try:
+ cred = self.client_credentials
+ url = self.host_port()
+ if cred:
+ url =cred.add_user(url)
+ args = args + ["--sasl-mechanism", cred.mechanism]
+ self.qpid_ha_script.main_except(["", "-b", url]+args)
+ except Exception, e:
+ raise Exception("Error in qpid_ha -b %s %s: %s"%(url, args,e))
+
+ def promote(self): self.ready(); self.qpid_ha(["promote", "--cluster-manager"])
+ def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+ @property
+ def agent(self):
+ if not self._agent:
+ cred = self.client_credentials
+ if cred:
+ self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
+ else:
+ self._agent = QmfAgent(self.host_port())
+ return self._agent
+
+ def qmf(self):
+ hb = self.agent.getHaBroker()
+ hb.update()
+ return hb
+
+ def ha_status(self): return self.qmf().status
+
+ def wait_status(self, status, timeout=10):
+
+ def try_get_status():
+ self._status = "<unknown>"
+ try:
+ self._status = self.ha_status()
+ except qm.ConnectionError, e:
+ # Record the error but don't raise, the broker may not be up yet.
+ self._status = "%s: %s" % (type(e).__name__, e)
+ return self._status == status;
+ assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
+ self, status, self._status)
+
+ def wait_queue(self, queue, timeout=10, msg="wait_queue"):
+ """ Wait for queue to be visible via QMF"""
+ agent = self.agent
+ assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), \
+ "%s queue %s not present" % (msg, queue)
+
+ def wait_no_queue(self, queue, timeout=10, msg="wait_no_queue"):
+ """ Wait for queue to be invisible via QMF"""
+ agent = self.agent
+ assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue)
+
+ def qpid_config(self, args):
+ qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
+ if not qpid_config_exec or not os.path.isfile(qpid_config_exec):
+ raise Skipped("qpid-config not available")
+ assert subprocess.call(
+ [qpid_config_exec, "--broker", self.host_port()]+args, stdout=1, stderr=subprocess.STDOUT
+ ) == 0, "qpid-config failed"
+
+ def config_replicate(self, from_broker, queue):
+ self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
+
+ def config_declare(self, queue, replication):
+ self.qpid_config(["add", "queue", queue, "--replicate", replication])
+
+ def connect_admin(self, **kwargs):
+ cred = self.client_credentials
+ if cred:
+ return Broker.connect(
+ self, client_properties={"qpid.ha-admin":1},
+ username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
+ **kwargs)
+ else:
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+
+ def wait_address(self, address):
+ """Wait for address to become valid on the broker."""
+ c = self.connect_admin()
+ try: wait_address(c, address)
+ finally: c.close()
+
+ wait_backup = wait_address
+
+ def browse(self, queue, timeout=0, transform=lambda m: m.content):
+ c = self.connect_admin()
+ try:
+ return browse(c.session(), queue, timeout, transform)
+ finally: c.close()
+
+ def assert_browse_backup(self, queue, expected, **kwargs):
+ """Combines wait_backup and assert_browse_retry."""
+ c = self.connect_admin()
+ try:
+ wait_address(c, queue)
+ assert_browse_retry(c.session(), queue, expected, **kwargs)
+ finally: c.close()
+
+ assert_browse = assert_browse_backup
+
+ def assert_connect_fail(self):
+ try:
+ self.connect()
+ self.test.fail("Expected qm.ConnectionError")
+ except qm.ConnectionError: pass
+
+ def try_connect(self):
+ try: return self.connect()
+ except qm.ConnectionError: return None
+
+ def ready(self, *args, **kwargs):
+ if not 'client_properties' in kwargs: kwargs['client_properties'] = {}
+ kwargs['client_properties']['qpid.ha-admin'] = True
+ return Broker.ready(self, *args, **kwargs)
+
+ def kill(self, final=True):
+ if final: self.ha_port.teardown()
+ self._agent = None
+ return Broker.kill(self)
+
+
+class HaCluster(object):
+ _cluster_count = 0
+
+ def __init__(self, test, n, promote=True, wait=True, args=[], s_args=[], **kwargs):
+ """Start a cluster of n brokers.
+
+ @test: The test being run
+ @n: start n brokers
+ @promote: promote self[0] to primary
+ @wait: wait for primary active and backups ready. Ignored if promote=False
+ @args: args for all brokers in the cluster.
+ @s_args: args for specific brokers: s_args[i] for broker i.
+ """
+ self.test = test
+ self.args = copy(args)
+ self.s_args = copy(s_args)
+ self.kwargs = kwargs
+ self._ports = [HaPort(test) for i in xrange(n)]
+ self._set_url()
+ self._brokers = []
+ self.id = HaCluster._cluster_count
+ self.broker_id = 0
+ HaCluster._cluster_count += 1
+ for i in xrange(n): self.start()
+ if promote:
+ self[0].promote()
+ if wait:
+ self[0].wait_status("active")
+ for b in self[1:]: b.wait_status("ready")
+
+ def next_name(self):
+ name="cluster%s-%s"%(self.id, self.broker_id)
+ self.broker_id += 1
+ return name
+
+ def _ha_broker(self, i, name):
+ args = self.args
+ if i < len(self.s_args): args += self.s_args[i]
+ ha_port = self._ports[i]
+ b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
+ args=args, **self.kwargs)
+ b.ready(timeout=10)
+ return b
+
+ def start(self):
+ """Start a new broker in the cluster"""
+ i = len(self)
+ assert i <= len(self._ports)
+ if i == len(self._ports): # Adding new broker after cluster init
+ self._ports.append(HaPort(self.test))
+ self._set_url()
+ b = self._ha_broker(i, self.next_name())
+ self._brokers.append(b)
+ return b
+
+ def _set_url(self):
+ self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports)
+
+ def connect(self, i, **kwargs):
+ """Connect with reconnect_urls"""
+ c = self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
+ self.test.teardown_add(c) # Clean up
+ return c
+
+ def kill(self, i, promote_next=True, final=True):
+ """Kill broker i, promote broker i+1"""
+ self[i].kill(final=final)
+ if promote_next: self[(i+1) % len(self)].promote()
+
+ def restart(self, i):
+ """Start a broker with the same port, name and data directory. It will get
+ a separate log file: foo.n.log"""
+ if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self))
+ b = self._brokers[i]
+ self._brokers[i] = self._ha_broker(i, b.name)
+ self._brokers[i].ready()
+
+ def bounce(self, i, promote_next=True):
+ """Stop and restart a broker in a cluster."""
+ if (len(self) == 1):
+ self.kill(i, promote_next=False, final=False)
+ self.restart(i)
+ self[i].ready()
+ if promote_next: self[i].promote()
+ else:
+ self.kill(i, promote_next, final=False)
+ self.restart(i)
+
+ # Behave like a list of brokers.
+ def __len__(self): return len(self._brokers)
+ def __getitem__(self,index): return self._brokers[index]
+ def __iter__(self): return self._brokers.__iter__()
+
+
+def wait_address(connection, address):
+ """Wait for an address to become valid."""
+ assert retry(lambda: valid_address(connection, address)), "Timed out waiting for address %s"%(address)
+
+def valid_address(connection, address):
+ """Test if an address is valid"""
+ try:
+ s = connection.session().receiver(address)
+ s.session.close()
+ return True
+ except qm.NotFound:
+ return False
+
+