diff options
Diffstat (limited to 'qpid/cpp/src/tests/interop_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/interop_tests.py | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/interop_tests.py b/qpid/cpp/src/tests/interop_tests.py new file mode 100755 index 0000000000..f76b9f634b --- /dev/null +++ b/qpid/cpp/src/tests/interop_tests.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A set of tests that can be run against a foreign AMQP 1.0 broker. + +RUNNING WITH A FOREIGN BROKER: + +1. Start the broker +2. Create persistent queues named: interop-a interop-b interop-q tx-1 tx-2 +3. Export the environment variable QPID_INTEROP_URL with the URL to connect to your broker + in the form [user[:password]@]host[:port] +4. From the build directory run this test: + ctest -VV -R interop_tests + +If QPID_INTEROP_URL is not set, a qpidd broker will be started for the test. +""" + +import os, sys, shutil, subprocess +import qpid_messaging as qm +from brokertest import * + +URL='QPID_INTEROP_URL' + +class InteropTest(BrokerTest): + + def setUp(self): + super(InteropTest, self).setUp() + self.url = os.environ[URL] + self.connect_opts = ['--broker', self.url, '--connection-options', '{protocol:amqp1.0}'] + + def connect(self, **kwargs): + """Python connection to interop URL""" + c = qm.Connection.establish(self.url, protocol='amqp1.0', **kwargs) + self.teardown_add(c) + return c + + def drain(self, queue, connection=None): + """ + Drain a queue to make sure it is empty. Throw away the messages. + """ + c = connection or self.connect() + r = c.session().receiver(queue) + try: + while True: + r.fetch(timeout=0) + r.session.acknowledge() + except qm.Empty: + pass + r.close() + + def clear_queue(self, queue, connection=None, properties=None, durable=False): + """ + Make empty queue, prefix with self.id(). Create if needed, drain if needed + @return queue name. + """ + queue = "interop-%s" % queue + c = connection or self.connect() + props = {'create':'always'} + if durable: props['node'] = {'durable':True} + if properties: props.update(properties) + self.drain("%s;%s" % (queue, props), c) + return queue + + +class SimpleTest(InteropTest): + """Simple test to check the broker is responding.""" + + def test_send_receive_python(self): + c = self.connect() + q = self.clear_queue('q', c) + s = c.session() + s.sender(q).send('foo') + self.assertEqual('foo', s.receiver(q).fetch().content) + + def test_send_receive_cpp(self): + q = self.clear_queue('q') + args = ['-b', self.url, '-a', q] + self.check_output(['qpid-send', '--content-string=cpp_foo'] + args) + self.assertEqual('cpp_foo', self.check_output(['qpid-receive'] + args).strip()) + + +class PythonTxTest(InteropTest): + + def tx_simple_setup(self): + """Start a transaction, remove messages from queue a, add messages to queue b""" + c = self.connect() + qa, qb = self.clear_queue('a', c, durable=True), self.clear_queue('b', c, durable=True) + + # Send messages to a, no transaction. + sa = c.session().sender(qa+";{create:always,node:{durable:true}}") + tx_msgs = ['x', 'y', 'z'] + for m in tx_msgs: sa.send(qm.Message(content=m, durable=True)) + + # Receive messages from a, in transaction. + tx = c.session(transactional=True) + txr = tx.receiver(qa) + self.assertEqual(tx_msgs, [txr.fetch(1).content for i in xrange(3)]) + tx.acknowledge() + + # Send messages to b, transactional, mixed with non-transactional. + sb = c.session().sender(qb+";{create:always,node:{durable:true}}") + txs = tx.sender(qb) + msgs = [str(i) for i in xrange(3)] + for tx_m, m in zip(tx_msgs, msgs): + txs.send(tx_m); + sb.send(m) + tx.sync() + return tx, qa, qb + + def test_tx_simple_commit(self): + tx, qa, qb = self.tx_simple_setup() + s = self.connect().session() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2']) + tx.commit() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2', 'x', 'y', 'z']) + + def test_tx_simple_rollback(self): + tx, qa, qb = self.tx_simple_setup() + s = self.connect().session() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2']) + tx.rollback() + assert_browse(s, qa, ['x', 'y', 'z']) + assert_browse(s, qb, ['0', '1', '2']) + + def test_tx_sequence(self): + tx = self.connect().session(transactional=True) + notx = self.connect().session() + q = self.clear_queue('q', tx.connection, durable=True) + s = tx.sender(q) + r = tx.receiver(q) + s.send('a') + tx.commit() + assert_browse(notx, q, ['a']) + s.send('b') + tx.commit() + assert_browse(notx, q, ['a', 'b']) + self.assertEqual('a', r.fetch().content) + tx.acknowledge(); + tx.commit() + assert_browse(notx, q, ['b']) + s.send('z') + tx.rollback() + assert_browse(notx, q, ['b']) + self.assertEqual('b', r.fetch().content) + tx.acknowledge(); + tx.rollback() + assert_browse(notx, q, ['b']) + + +class CppTxTest(InteropTest): + + def test_txtest2(self): + self.popen(["qpid-txtest2"] + self.connect_opts).assert_exit_ok() + + def test_send_receive(self): + q = self.clear_queue('q', durable=True) + sender = self.popen(["qpid-send", + "--address", q, + "--messages=100", + "--tx=10", + "--durable=yes"] + self.connect_opts) + receiver = self.popen(["qpid-receive", + "--address", q, + "--messages=90", + "--timeout=10", + "--tx=10"] + self.connect_opts) + sender.assert_exit_ok() + receiver.assert_exit_ok() + expect = [long(i) for i in range(91, 101)] + sn = lambda m: m.properties["sn"] + assert_browse(self.connect().session(), q, expect, transform=sn) + + +if __name__ == "__main__": + if not BrokerTest.amqp_tx_supported: + BrokerTest.amqp_tx_warning() + print "Skipping interop_tests" + sys.exit(0) + outdir = "interop_tests.tmp" + shutil.rmtree(outdir, True) + cmd = ["qpid-python-test", "-m", "interop_tests", "-DOUTDIR=%s"%outdir] + sys.argv[1:] + if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"] + if os.environ.get(URL): + os.execvp(cmd[0], cmd) + else: + dir = os.getcwd() + class StartBroker(BrokerTest): + def start_qpidd(self): pass + test = StartBroker('start_qpidd') + class Config: + def __init__(self): + self.defines = { 'OUTDIR': outdir } + test.configure(Config()) + test.setUp() + os.environ[URL] = test.broker().host_port() + os.chdir(dir) + p = subprocess.Popen(cmd) + status = p.wait() + test.tearDown() + sys.exit(status) |