summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r--qpid/cpp/src/tests/brokertest.py134
1 files changed, 91 insertions, 43 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 4b6820e4fd..4e5c255a1a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -21,19 +21,47 @@
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
-from qpid import connection, messaging, util
+from qpid import connection, util
from qpid.compat import format_exc
from qpid.harness import Skipped
from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
from logging import getLogger
+from qpidtoollibs import BrokerAgent
-try: import qmf.console
-except: print "Cannot import module qmf.console, skipping tests"; exit(0);
+# NOTE: Always import native client qpid.messaging, import swigged client
+# qpid_messaging if possible. qpid_messaing is set to None if not available.
+#
+# qm is set to qpid_messaging if it is available, qpid.messaging if not.
+# Use qm.X to specify names from the default messaging module.
+#
+# Set environment variable QPID_PY_NO_SWIG=1 to prevent qpid_messaging from loading.
+#
+# BrokerTest can be configured to determine which protocol is used by default:
+#
+# -DPROTOCOL="amqpX": Use protocol "amqpX". Defaults to amqp1.0 if swig client
+# is being used, amqp0-10 if native client is being used.
+#
+# The configured defaults can be over-ridden on BrokerTest.connect and some
+# other methods by specifying native=True|False and protocol="amqpX"
+#
+import qpid.messaging
+qm = qpid.messaging
+qpid_messaging = None
+if not os.environ.get("QPID_PY_NO_SWIG"):
+ try:
+ import qpid_messaging
+ from qpid.datatypes import uuid4
+ qm = qpid_messaging
+ # Silence warnings from swigged messaging library unless enabled in environment.
+ if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ:
+ qm.Logger.configure(["--log-enable=error"])
+ except ImportError:
+ print "Cannot load python SWIG bindings, falling back to native qpid.messaging."
-log = getLogger("qpid.brokertest")
+log = getLogger("brokertest")
# Values for expected outcome of process at end of test
EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
@@ -149,7 +177,7 @@ class Popen(subprocess.Popen):
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
- def stop(self): # Clean up at end of test.
+ def teardown(self): # Clean up at end of test.
try:
if self.expect == EXPECT_UNKNOWN:
try: self.kill() # Just make sure its dead
@@ -253,14 +281,16 @@ class Broker(Popen):
self.test = test
self._port=port
+ args = copy(args)
+ if BrokerTest.amqp_lib: args += ["--load-module", BrokerTest.amqp_lib]
if BrokerTest.store_lib and not test_store:
- args = args + ['--load-module', BrokerTest.store_lib]
+ args += ['--load-module', BrokerTest.store_lib]
if BrokerTest.sql_store_lib:
- args = args + ['--load-module', BrokerTest.sql_store_lib]
- args = args + ['--catalog', BrokerTest.sql_catalog]
+ args += ['--load-module', BrokerTest.sql_store_lib]
+ args += ['--catalog', BrokerTest.sql_catalog]
if BrokerTest.sql_clfs_store_lib:
- args = args + ['--load-module', BrokerTest.sql_clfs_store_lib]
- args = args + ['--catalog', BrokerTest.sql_catalog]
+ args += ['--load-module', BrokerTest.sql_clfs_store_lib]
+ args += ['--catalog', BrokerTest.sql_catalog]
cmd = [BrokerTest.qpidd_exec, "--port", port, "--interface", "127.0.0.1", "--no-module-dir"] + args
if not "--auth" in args: cmd.append("--auth=no")
if wait != None:
@@ -288,13 +318,11 @@ class Broker(Popen):
cmd += ["--data-dir", self.datadir]
if show_cmd: print cmd
Popen.__init__(self, cmd, expect, stdout=PIPE)
- test.cleanup_stop(self)
+ test.teardown_add(self)
self._host = "127.0.0.1"
- log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+ self._agent = None
- def startQmf(self, handler=None):
- self.qmf_session = qmf.console.Session(handler)
- self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+ log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
def host(self): return self._host
@@ -310,22 +338,25 @@ class Broker(Popen):
def unexpected(self,msg):
raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
- def connect(self, timeout=5, **kwargs):
- """New API connection to the broker."""
- return messaging.Connection.establish(self.host_port(), timeout=timeout, **kwargs)
+ def connect(self, timeout=5, native=False, **kwargs):
+ """New API connection to the broker.
+ @param native if True force use of the native qpid.messaging client
+ even if swig client is available.
+ """
+ if self.test.protocol: kwargs.setdefault("protocol", self.test.protocol)
+ if native: connection_class = qpid.messaging.Connection
+ else: connection_class = qm.Connection
+ return connection_class.establish(self.host_port(), timeout=timeout, **kwargs)
+
+ @property
+ def agent(self, **kwargs):
+ """Return a BrokerAgent for this broker"""
+ if not self._agent: self._agent = BrokerAgent(self.connect(**kwargs))
+ return self._agent
- def connect_old(self):
- """Old API connection to the broker."""
- socket = qpid.util.connect(self.host(),self.port())
- connection = qpid.connection.Connection (sock=socket)
- connection.start()
- return connection;
def declare_queue(self, queue):
- c = self.connect_old()
- s = c.session(str(qpid.datatypes.uuid4()))
- s.queue_declare(queue=queue)
- c.close()
+ self.agent.addQueue(queue)
def _prep_sender(self, queue, durable, xprops):
s = queue + "; {create:always, node:{durable:" + str(durable)
@@ -402,7 +433,7 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content):
contents = []
try:
while True: contents.append(transform(r.fetch(timeout=timeout)))
- except messaging.Empty: pass
+ except qm.Empty: pass
finally: r.close()
return contents
@@ -451,30 +482,42 @@ class BrokerTest(TestCase):
def configure(self, config): self.config=config
def setUp(self):
- outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
+ defs = self.config.defines
+ outdir = defs.get("OUTDIR") or "brokertest.tmp"
self.dir = os.path.join(self.rootdir, outdir, self.id())
os.makedirs(self.dir)
os.chdir(self.dir)
- self.stopem = [] # things to stop at end of test
+ self.teardown_list = [] # things to tear down at end of test
+
+ self.protocol = defs.get("PROTOCOL") or ("amqp1.0" if qpid_messaging else "amqp0-10")
+ self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0
+
def tearDown(self):
err = []
- for p in self.stopem:
- try: p.stop()
- except Exception, e: err.append(str(e))
- self.stopem = [] # reset in case more processes start
+ self.teardown_list.reverse() # Tear down in reverse order
+ for p in self.teardown_list:
+ log.debug("Tearing down %s", p)
+ try:
+ # Call the first of the methods that is available on p.
+ for m in ["teardown", "close"]:
+ a = getattr(p, m, None)
+ if a: a(); break
+ else: raise Exception("Don't know how to tear down %s", p)
+ except Exception, e: err.append("%s: %s"%(e.__class__.__name__, str(e)))
+ self.teardown_list = [] # reset in case more processes start
os.chdir(self.rootdir)
if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
- def cleanup_stop(self, stopable):
- """Call thing.stop at end of test"""
- self.stopem.append(stopable)
+ def teardown_add(self, thing):
+ """Call thing.teardown() or thing.close() at end of test"""
+ self.teardown_list.append(thing)
def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
"""Start a process that will be killed at end of test, in the test dir."""
os.chdir(self.dir)
p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
- self.cleanup_stop(p)
+ self.teardown_add(p)
return p
def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False):
@@ -490,6 +533,11 @@ class BrokerTest(TestCase):
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
+ def protocol_option(self, connection_options=""):
+ if "protocol" in connection_options: return connection_options
+ else: return ",".join(filter(None, [connection_options,"protocol:'%s'"%self.protocol]))
+
+
def join(thread, timeout=30):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
@@ -524,7 +572,7 @@ class NumberedSender(Thread):
def __init__(self, broker, max_depth=None, queue="test-queue",
connection_options=RECONNECT_OPTIONS,
- failover_updates=True, url=None, args=[]):
+ failover_updates=False, url=None, args=[]):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
@@ -533,7 +581,7 @@ class NumberedSender(Thread):
cmd = ["qpid-send",
"--broker", url or broker.host_port(),
"--address", "%s;{create:always}"%queue,
- "--connection-options", "{%s}"%(connection_options),
+ "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)),
"--content-stdin"
] + args
if failover_updates: cmd += ["--failover-updates"]
@@ -592,7 +640,7 @@ class NumberedReceiver(Thread):
"""
def __init__(self, broker, sender=None, queue="test-queue",
connection_options=RECONNECT_OPTIONS,
- failover_updates=True, url=None, args=[]):
+ failover_updates=False, url=None, args=[]):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
@@ -601,7 +649,7 @@ class NumberedReceiver(Thread):
cmd = ["qpid-receive",
"--broker", url or broker.host_port(),
"--address", "%s;{create:always}"%queue,
- "--connection-options", "{%s}"%(connection_options),
+ "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)),
"--forever"
]
if failover_updates: cmd += [ "--failover-updates" ]