diff options
author | Alan Conway <aconway@apache.org> | 2015-02-27 16:37:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2015-02-27 16:37:06 +0000 |
commit | 3aaa53e9103b6019c9e31d15186b12a95a1993be (patch) | |
tree | f5950c063ff08f574c808023ece7745739ca7027 /qpid/cpp/src/tests/interop_tests.py | |
parent | 9c9f0e2c935d11c0f8d1ebddf1bbb78c3c22c606 (diff) | |
download | qpid-python-3aaa53e9103b6019c9e31d15186b12a95a1993be.tar.gz |
QPID-4710: [AMQP 1.0] Support for transactions in qpid::messaging C++ client.
Implements the "transactional retire and settle immediately" option for
transactions as specified in AMQP 1.0 in the qpid::messaging C++ client.
NOTE: Transactions over AMQP 1.0 require proton 0.9 or greater. With older
versions, attempting a transactions over AMQP 1.0 will raise a link-detached
exception "Node not found: tx-transaction"
1. Added descriptor list to Variant with support in Encoder and PnData.
Required to support transactions, need to be able to create described lists.
Variant changes are source and binary compatible.
A Variant now has a Variant::List of descripors which can be numeric or string.
Nested descriptors are implemented by putting multiple descriptors in the list.
Other minor changes:
- Variant refactor: don't delete impl on every assignment.
- Add Variant constructors that take a string encoding.
(new constructors, not defaulted arguments, so the change is binary and source compatible.)
- Growable buffer support for Encoder.
- Printing described Variant prints descriptors in form @descriptor value
2. Added transaction support to AMQP 1.0 client code
Added messaging/amqp/Transaction.h,cpp: transaction logic
- communicate with coordinator, send declare/dischange messages.
- add tx state info to transfers and acknowledgements.
- Sync session after discharge.
- A transactional session automatically acks any message retrieved by fetch/get
to bring them into the transaction. This is consistent the 0-10 client.
Minor fixes to existing client code:
- Fix use of pn_drain API in C++ client to work with C++ and Java brokers.
- Make amqp::Exception derive from qpid::Exception
3. Fixes to existing broker code:
- Incoming.cpp fix: start async completion before processing message.
- Delay accept of dischage message till commit is complete.
- newSession - handle failover during session creation.
4. Added tests
interop_tests.py: transaction tests that can run against an external broker, see comments.
ha_tests.py: Enable transaction tests over AMQP 1.0.
Minor test fixes:
- brokertest.py don't set default logging if QPID_LOG env vars set.
- brokertest.py Pass kwargs to broker() create function.
- qpid-receive: capacity should never be larger than message count.
- Accept user:pass as well as user/pass in Url.
- brokertest.py: Always do a ready() check on all brokers.
If proton < 0.9 is used, transaction tests will be skipped or will downgrade to
the amqp0-10 protocol with a printed warning.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1662743 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/interop_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/interop_tests.py | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/interop_tests.py b/qpid/cpp/src/tests/interop_tests.py new file mode 100755 index 0000000000..d5533ead21 --- /dev/null +++ b/qpid/cpp/src/tests/interop_tests.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A set of tests that can be run against a foreign AMQP 1.0 broker. + +RUNNING WITH A FOREIGN BROKER: + +1. Start the broker +2. Create persistent queues named: interop-a interop-b interop-q tx-1 tx-2 +3. Export the environment variable QPID_INTEROP_URL with the URL to connect to your broker + in the form [user[:password]@]host[:port] +4. From the build directory run this test: + ctest -VV -R interop_tests + +If QPID_INTEROP_URL is not set, a qpidd broker will be started for the test. +""" + +import os, sys, shutil, subprocess +import qpid_messaging as qm +from brokertest import * + +URL='QPID_INTEROP_URL' + +class InteropTest(BrokerTest): + + def setUp(self): + super(InteropTest, self).setUp() + self.url = os.environ[URL] + self.connect_opts = ['--broker', self.url, '--connection-options', '{protocol:amqp1.0}'] + + def connect(self, **kwargs): + """Python connection to interop URL""" + c = qm.Connection.establish(self.url, protocol='amqp1.0', **kwargs) + self.teardown_add(c) + return c + + def drain(self, queue, connection=None): + """ + Drain a queue to make sure it is empty. Throw away the messages. + """ + c = connection or self.connect() + r = c.session().receiver(queue) + try: + while True: + r.fetch(timeout=0) + r.session.acknowledge() + except qm.Empty: + pass + r.close() + + def clear_queue(self, queue, connection=None, properties=None, durable=False): + """ + Make empty queue, prefix with self.id(). Create if needed, drain if needed + @return queue name. + """ + queue = "interop-%s" % queue + c = connection or self.connect() + props = {'create':'always'} + if durable: props['node'] = {'durable':True} + if properties: props.update(properties) + self.drain("%s;%s" % (queue, props), c) + return queue + + +class SimpleTest(InteropTest): + """Simple test to check the broker is responding.""" + + def test_send_receive_python(self): + c = self.connect() + q = self.clear_queue('q', c) + s = c.session() + s.sender(q).send('foo') + self.assertEqual('foo', s.receiver(q).fetch().content) + + def test_send_receive_cpp(self): + q = self.clear_queue('q') + args = ['-b', self.url, '-a', q] + self.check_output(['qpid-send', '--content-string=cpp_foo'] + args) + self.assertEqual('cpp_foo', self.check_output(['qpid-receive'] + args).strip()) + + +class PythonTxTest(InteropTest): + + def tx_simple_setup(self): + """Start a transaction, remove messages from queue a, add messages to queue b""" + c = self.connect() + qa, qb = self.clear_queue('a', c, durable=True), self.clear_queue('b', c, durable=True) + + # Send messages to a, no transaction. + sa = c.session().sender(qa+";{create:always,node:{durable:true}}") + tx_msgs = ['x', 'y', 'z'] + for m in tx_msgs: sa.send(qm.Message(content=m, durable=True)) + + # Receive messages from a, in transaction. + tx = c.session(transactional=True) + txr = tx.receiver(qa) + self.assertEqual(tx_msgs, [txr.fetch(1).content for i in xrange(3)]) + tx.acknowledge() + + # Send messages to b, transactional, mixed with non-transactional. + sb = c.session().sender(qb+";{create:always,node:{durable:true}}") + txs = tx.sender(qb) + msgs = [str(i) for i in xrange(3)] + for tx_m, m in zip(tx_msgs, msgs): + txs.send(tx_m); + sb.send(m) + tx.sync() + return tx, qa, qb + + def test_tx_simple_commit(self): + tx, qa, qb = self.tx_simple_setup() + s = self.connect().session() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2']) + tx.commit() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2', 'x', 'y', 'z']) + + def test_tx_simple_rollback(self): + tx, qa, qb = self.tx_simple_setup() + s = self.connect().session() + assert_browse(s, qa, []) + assert_browse(s, qb, ['0', '1', '2']) + tx.rollback() + assert_browse(s, qa, ['x', 'y', 'z']) + assert_browse(s, qb, ['0', '1', '2']) + + def test_tx_sequence(self): + tx = self.connect().session(transactional=True) + notx = self.connect().session() + q = self.clear_queue('q', tx.connection, durable=True) + s = tx.sender(q) + r = tx.receiver(q) + s.send('a') + tx.commit() + assert_browse(notx, q, ['a']) + s.send('b') + tx.commit() + assert_browse(notx, q, ['a', 'b']) + self.assertEqual('a', r.fetch().content) + tx.acknowledge(); + tx.commit() + assert_browse(notx, q, ['b']) + s.send('z') + tx.rollback() + assert_browse(notx, q, ['b']) + self.assertEqual('b', r.fetch().content) + tx.acknowledge(); + tx.rollback() + assert_browse(notx, q, ['b']) + + +class CppTxTest(InteropTest): + + def test_txtest2(self): + self.popen(["qpid-txtest2"] + self.connect_opts).assert_exit_ok() + + def test_send_receive(self): + q = self.clear_queue('q', durable=True) + sender = self.popen(["qpid-send", + "--address", q, + "--messages=100", + "--tx=10", + "--durable=yes"] + self.connect_opts) + receiver = self.popen(["qpid-receive", + "--address", q, + "--messages=90", + "--timeout=10", + "--tx=10"] + self.connect_opts) + sender.assert_exit_ok() + receiver.assert_exit_ok() + expect = [long(i) for i in range(91, 101)] + sn = lambda m: m.properties["sn"] + assert_browse(self.connect().session(), q, expect, transform=sn) + + +if __name__ == "__main__": + if not BrokerTest.amqp_tx_supported: + BrokerTest.amqp_tx_warning() + print "Skipping interop_tests" + exit(0) + outdir = "interop_tests.tmp" + shutil.rmtree(outdir, True) + cmd = ["qpid-python-test", "-m", "interop_tests", "-DOUTDIR=%s"%outdir] + sys.argv[1:] + if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"] + if os.environ.get(URL): + os.execvp(cmd[0], cmd) + else: + dir = os.getcwd() + class StartBroker(BrokerTest): + def start_qpidd(self): pass + test = StartBroker('start_qpidd') + class Config: + def __init__(self): + self.defines = { 'OUTDIR': outdir } + test.configure(Config()) + test.setUp() + os.environ[URL] = test.broker().host_port() + os.chdir(dir) + p = subprocess.Popen(cmd) + status = p.wait() + test.tearDown() + sys.exit(status) |