diff options
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 134 |
1 files changed, 91 insertions, 43 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 4b6820e4fd..4e5c255a1a 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -21,19 +21,47 @@ import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re import qpid, traceback, signal -from qpid import connection, messaging, util +from qpid import connection, util from qpid.compat import format_exc from qpid.harness import Skipped from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition from logging import getLogger +from qpidtoollibs import BrokerAgent -try: import qmf.console -except: print "Cannot import module qmf.console, skipping tests"; exit(0); +# NOTE: Always import native client qpid.messaging, import swigged client +# qpid_messaging if possible. qpid_messaing is set to None if not available. +# +# qm is set to qpid_messaging if it is available, qpid.messaging if not. +# Use qm.X to specify names from the default messaging module. +# +# Set environment variable QPID_PY_NO_SWIG=1 to prevent qpid_messaging from loading. +# +# BrokerTest can be configured to determine which protocol is used by default: +# +# -DPROTOCOL="amqpX": Use protocol "amqpX". Defaults to amqp1.0 if swig client +# is being used, amqp0-10 if native client is being used. +# +# The configured defaults can be over-ridden on BrokerTest.connect and some +# other methods by specifying native=True|False and protocol="amqpX" +# +import qpid.messaging +qm = qpid.messaging +qpid_messaging = None +if not os.environ.get("QPID_PY_NO_SWIG"): + try: + import qpid_messaging + from qpid.datatypes import uuid4 + qm = qpid_messaging + # Silence warnings from swigged messaging library unless enabled in environment. + if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ: + qm.Logger.configure(["--log-enable=error"]) + except ImportError: + print "Cannot load python SWIG bindings, falling back to native qpid.messaging." -log = getLogger("qpid.brokertest") +log = getLogger("brokertest") # Values for expected outcome of process at end of test EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. @@ -149,7 +177,7 @@ class Popen(subprocess.Popen): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) - def stop(self): # Clean up at end of test. + def teardown(self): # Clean up at end of test. try: if self.expect == EXPECT_UNKNOWN: try: self.kill() # Just make sure its dead @@ -253,14 +281,16 @@ class Broker(Popen): self.test = test self._port=port + args = copy(args) + if BrokerTest.amqp_lib: args += ["--load-module", BrokerTest.amqp_lib] if BrokerTest.store_lib and not test_store: - args = args + ['--load-module', BrokerTest.store_lib] + args += ['--load-module', BrokerTest.store_lib] if BrokerTest.sql_store_lib: - args = args + ['--load-module', BrokerTest.sql_store_lib] - args = args + ['--catalog', BrokerTest.sql_catalog] + args += ['--load-module', BrokerTest.sql_store_lib] + args += ['--catalog', BrokerTest.sql_catalog] if BrokerTest.sql_clfs_store_lib: - args = args + ['--load-module', BrokerTest.sql_clfs_store_lib] - args = args + ['--catalog', BrokerTest.sql_catalog] + args += ['--load-module', BrokerTest.sql_clfs_store_lib] + args += ['--catalog', BrokerTest.sql_catalog] cmd = [BrokerTest.qpidd_exec, "--port", port, "--interface", "127.0.0.1", "--no-module-dir"] + args if not "--auth" in args: cmd.append("--auth=no") if wait != None: @@ -288,13 +318,11 @@ class Broker(Popen): cmd += ["--data-dir", self.datadir] if show_cmd: print cmd Popen.__init__(self, cmd, expect, stdout=PIPE) - test.cleanup_stop(self) + test.teardown_add(self) self._host = "127.0.0.1" - log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) + self._agent = None - def startQmf(self, handler=None): - self.qmf_session = qmf.console.Session(handler) - self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) + log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) def host(self): return self._host @@ -310,22 +338,25 @@ class Broker(Popen): def unexpected(self,msg): raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) - def connect(self, timeout=5, **kwargs): - """New API connection to the broker.""" - return messaging.Connection.establish(self.host_port(), timeout=timeout, **kwargs) + def connect(self, timeout=5, native=False, **kwargs): + """New API connection to the broker. + @param native if True force use of the native qpid.messaging client + even if swig client is available. + """ + if self.test.protocol: kwargs.setdefault("protocol", self.test.protocol) + if native: connection_class = qpid.messaging.Connection + else: connection_class = qm.Connection + return connection_class.establish(self.host_port(), timeout=timeout, **kwargs) + + @property + def agent(self, **kwargs): + """Return a BrokerAgent for this broker""" + if not self._agent: self._agent = BrokerAgent(self.connect(**kwargs)) + return self._agent - def connect_old(self): - """Old API connection to the broker.""" - socket = qpid.util.connect(self.host(),self.port()) - connection = qpid.connection.Connection (sock=socket) - connection.start() - return connection; def declare_queue(self, queue): - c = self.connect_old() - s = c.session(str(qpid.datatypes.uuid4())) - s.queue_declare(queue=queue) - c.close() + self.agent.addQueue(queue) def _prep_sender(self, queue, durable, xprops): s = queue + "; {create:always, node:{durable:" + str(durable) @@ -402,7 +433,7 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content): contents = [] try: while True: contents.append(transform(r.fetch(timeout=timeout))) - except messaging.Empty: pass + except qm.Empty: pass finally: r.close() return contents @@ -451,30 +482,42 @@ class BrokerTest(TestCase): def configure(self, config): self.config=config def setUp(self): - outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" + defs = self.config.defines + outdir = defs.get("OUTDIR") or "brokertest.tmp" self.dir = os.path.join(self.rootdir, outdir, self.id()) os.makedirs(self.dir) os.chdir(self.dir) - self.stopem = [] # things to stop at end of test + self.teardown_list = [] # things to tear down at end of test + + self.protocol = defs.get("PROTOCOL") or ("amqp1.0" if qpid_messaging else "amqp0-10") + self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0 + def tearDown(self): err = [] - for p in self.stopem: - try: p.stop() - except Exception, e: err.append(str(e)) - self.stopem = [] # reset in case more processes start + self.teardown_list.reverse() # Tear down in reverse order + for p in self.teardown_list: + log.debug("Tearing down %s", p) + try: + # Call the first of the methods that is available on p. + for m in ["teardown", "close"]: + a = getattr(p, m, None) + if a: a(); break + else: raise Exception("Don't know how to tear down %s", p) + except Exception, e: err.append("%s: %s"%(e.__class__.__name__, str(e))) + self.teardown_list = [] # reset in case more processes start os.chdir(self.rootdir) if err: raise Exception("Unexpected process status:\n "+"\n ".join(err)) - def cleanup_stop(self, stopable): - """Call thing.stop at end of test""" - self.stopem.append(stopable) + def teardown_add(self, thing): + """Call thing.teardown() or thing.close() at end of test""" + self.teardown_list.append(thing) def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): """Start a process that will be killed at end of test, in the test dir.""" os.chdir(self.dir) p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr) - self.cleanup_stop(p) + self.teardown_add(p) return p def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False): @@ -490,6 +533,11 @@ class BrokerTest(TestCase): def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) + def protocol_option(self, connection_options=""): + if "protocol" in connection_options: return connection_options + else: return ",".join(filter(None, [connection_options,"protocol:'%s'"%self.protocol])) + + def join(thread, timeout=30): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) @@ -524,7 +572,7 @@ class NumberedSender(Thread): def __init__(self, broker, max_depth=None, queue="test-queue", connection_options=RECONNECT_OPTIONS, - failover_updates=True, url=None, args=[]): + failover_updates=False, url=None, args=[]): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. @@ -533,7 +581,7 @@ class NumberedSender(Thread): cmd = ["qpid-send", "--broker", url or broker.host_port(), "--address", "%s;{create:always}"%queue, - "--connection-options", "{%s}"%(connection_options), + "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)), "--content-stdin" ] + args if failover_updates: cmd += ["--failover-updates"] @@ -592,7 +640,7 @@ class NumberedReceiver(Thread): """ def __init__(self, broker, sender=None, queue="test-queue", connection_options=RECONNECT_OPTIONS, - failover_updates=True, url=None, args=[]): + failover_updates=False, url=None, args=[]): """ sender: enable flow control. Call sender.received(n) for each message received. """ @@ -601,7 +649,7 @@ class NumberedReceiver(Thread): cmd = ["qpid-receive", "--broker", url or broker.host_port(), "--address", "%s;{create:always}"%queue, - "--connection-options", "{%s}"%(connection_options), + "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)), "--forever" ] if failover_updates: cmd += [ "--failover-updates" ] |