diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/amqp-doc | 6 | ||||
-rw-r--r-- | python/qpid/spec.py | 18 | ||||
-rw-r--r-- | python/qpid/testlib.py | 6 | ||||
-rw-r--r-- | python/tests_0-9/dtx.py | 540 |
4 files changed, 557 insertions, 13 deletions
diff --git a/python/amqp-doc b/python/amqp-doc index 00226d63cb..1f5910f942 100755 --- a/python/amqp-doc +++ b/python/amqp-doc @@ -37,15 +37,17 @@ Options: """ % (msg, sys.argv[0])).strip() try: - opts, args = getopt(sys.argv[1:], "s:e", ["regexp", "spec="]) + opts, args = getopt(sys.argv[1:], "s:ea:", ["regexp", "spec=", "additional="]) except GetoptError, e: die(str(e)) regexp = False spec = "../specs/amqp.0-9.xml" +errata = [] for k, v in opts: if k == "-e" or k == "--regexp": regexp = True if k == "-s" or k == "--spec": spec = v + if k == "-a" or k == "--additional": errata.append(v) if regexp: def match(pattern, value): @@ -57,7 +59,7 @@ else: def match(pattern, value): return fnmatch(value, pattern) -spec = load(spec) +spec = load(spec, *errata) methods = {} patterns = args for pattern in patterns: diff --git a/python/qpid/spec.py b/python/qpid/spec.py index bb0e7eb58c..f8e37737e2 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -309,8 +309,10 @@ def load(specfile, *errata): for nd in root["constant"]: const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]), nd.get("@class"), get_docs(nd)) - spec.constants.add(const) - + try: + spec.constants.add(const) + except ValueError, e: + print "Warning:", e # domains are typedefs for nd in root["domain"]: spec.domains.add(Domain(spec, nd.index(), pythonize(nd["@name"]), @@ -320,18 +322,20 @@ def load(specfile, *errata): # classes for c_nd in root["class"]: cname = pythonize(c_nd["@name"]) - if root == spec_root: + if spec.classes.byname.has_key(cname): + klass = spec.classes.byname[cname] + else: klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"], get_docs(c_nd)) spec.classes.add(klass) - else: - klass = spec.classes.byname[cname] added_methods = [] load_fields(c_nd, klass.fields, spec.domains.byname) for m_nd in c_nd["method"]: mname = pythonize(m_nd["@name"]) - if root == spec_root: + if klass.methods.byname.has_key(mname): + meth = klass.methods.byname[mname] + else: meth = Method(klass, mname, int(m_nd["@index"]), m_nd.get_bool("@content", False), @@ -341,8 +345,6 @@ def load(specfile, *errata): get_docs(m_nd)) klass.methods.add(meth) added_methods.append(meth) - else: - meth = klass.methods.byname[mname] load_fields(m_nd, meth.fields, spec.domains.byname) # resolve the responses for m in added_methods: diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 48a6755d25..fa904ff029 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -99,7 +99,7 @@ Options: self.specfile = "0-8" self.errata = [] try: - opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"]) + opts, self.tests = getopt(args, "s:e:b:h?dvi:I:", ["help", "spec", "errata=", "server", "verbose", "ignore", "ignore-file"]) except GetoptError, e: self._die(str(e)) for opt, value in opts: @@ -278,14 +278,14 @@ class TestBase(unittest.TestCase): self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) def assertChannelException(self, expectedCode, message): - if not isinstance(message, Message): self.fail("expected channel_close method") + if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) self.assertEqual("channel", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) def assertConnectionException(self, expectedCode, message): - if not isinstance(message, Message): self.fail("expected connection_close method") + if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message)) self.assertEqual("connection", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) diff --git a/python/tests_0-9/dtx.py b/python/tests_0-9/dtx.py new file mode 100644 index 0000000000..ec82c72d49 --- /dev/null +++ b/python/tests_0-9/dtx.py @@ -0,0 +1,540 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import testrunner, TestBase +from struct import pack, unpack + +class DtxTests(TestBase): + """ + Tests for the amqp dtx related classes. + + Tests of the form test_simple_xxx test the basic transactional + behaviour. The approach here is to 'swap' a message from one queue + to another by consuming and re-publishing in the same + transaction. That transaction is then completed in different ways + and the appropriate result verified. + + The other tests enforce more specific rules and behaviour on a + per-method or per-field basis. + """ + + XA_RBROLLBACK = 1 + XA_OK = 8 + + def test_simple_commit(self): + """ + Test basic one-phase commit behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "commit") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags) + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("commit", "queue-b") + + def test_simple_prepare_commit(self): + """ + Test basic two-phase commit behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "prepare-commit") + + #prepare + self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #commit + self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags) + + #check result + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(1, "queue-b") + self.assertMessageId("prepare-commit", "queue-b") + + + def test_simple_rollback(self): + """ + Test basic rollback behaviour. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "rollback") + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("rollback", "queue-a") + + def test_simple_prepare_rollback(self): + """ + Test basic rollback behaviour after the transaction has been prepared. + """ + channel = self.channel + tx = self.xid("my-xid") + self.txswap(tx, "prepare-rollback") + + #prepare + self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) + + #neither queue should have any messages accessible + self.assertMessageCount(0, "queue-a") + self.assertMessageCount(0, "queue-b") + + #rollback + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) + + #check result + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("prepare-rollback", "queue-a") + + def test_select_required(self): + """ + check that an error is flagged if select is not issued before + start or end + """ + channel = self.channel + tx = self.xid("dummy") + try: + channel.dtx_demarcation_start(xid=tx) + + #if we get here we have failed, but need to do some cleanup: + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + self.fail("Channel not selected for use with dtx, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_start_already_known(self): + """ + Verify that an attempt to start an association with a + transaction that is already known is not allowed (unless the + join flag is set). + """ + #create two channels on different connection & select them for use with dtx: + channel1 = self.channel + channel1.dtx_demarcation_select() + + other = self.connect() + channel2 = other.channel(1) + channel2.channel_open() + channel2.dtx_demarcation_select() + + #create a xid + tx = self.xid("dummy") + #start work on one channel under that xid: + channel1.dtx_demarcation_start(xid=tx) + #then start on the other without the join set + failed = False + try: + channel2.dtx_demarcation_start(xid=tx) + except Closed, e: + failed = True + error = e + + #cleanup: + if not failed: + channel2.dtx_demarcation_end(xid=tx) + other.close() + channel1.dtx_demarcation_end(xid=tx) + channel1.dtx_coordination_rollback(xid=tx) + + #verification: + if failed: self.assertConnectionException(503, e.args[0]) + else: self.fail("Xid already known, expected exception!") + + def test_forget_xid_on_completion(self): + """ + Verify that a xid is 'forgotten' - and can therefore be used + again - once it is completed. + """ + channel = self.channel + #do some transactional work & complete the transaction + self.test_simple_commit() + + #start association for the same xid as the previously completed txn + tx = self.xid("my-xid") + channel.dtx_demarcation_start(xid=tx) + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + + def test_start_join_and_resume(self): + """ + Ensure the correct error is signalled when both the join and + resume flags are set on starting an association between a + channel and a transcation. + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("dummy") + try: + channel.dtx_demarcation_start(xid=tx, join=True, resume=True) + #failed, but need some cleanup: + channel.dtx_demarcation_end(xid=tx) + channel.dtx_coordination_rollback(xid=tx) + self.fail("Join and resume both set, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_start_join(self): + """ + Verify 'join' behaviour, where a channel is associated with a + transaction that is already associated with another channel. + """ + #create two channels & select them for use with dtx: + channel1 = self.channel + channel1.dtx_demarcation_select() + + channel2 = self.client.channel(2) + channel2.channel_open() + channel2.dtx_demarcation_select() + + #setup + channel1.queue_declare(queue="one", exclusive=True) + channel1.queue_declare(queue="two", exclusive=True) + channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage") + channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + + #create a xid + tx = self.xid("dummy") + #start work on one channel under that xid: + channel1.dtx_demarcation_start(xid=tx) + #then start on the other with the join flag set + channel2.dtx_demarcation_start(xid=tx, join=True) + + #do work through each channel + self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' + self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' + + #mark end on both channels + channel1.dtx_demarcation_end(xid=tx) + channel2.dtx_demarcation_end(xid=tx) + + #commit and check + channel1.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + + def test_suspend_resume(self): + """ + Test suspension and resumption of an association + """ + channel = self.channel + channel.dtx_demarcation_select() + + #setup + channel.queue_declare(queue="one", exclusive=True) + channel.queue_declare(queue="two", exclusive=True) + channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage") + channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage") + + tx = self.xid("dummy") + + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' + channel.dtx_demarcation_end(xid=tx, suspend=True) + + channel.dtx_demarcation_start(xid=tx, resume=True) + self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' + channel.dtx_demarcation_end(xid=tx) + + #commit and check + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "one") + self.assertMessageCount(1, "two") + self.assertMessageId("a", "two") + self.assertMessageId("b", "one") + + def test_end_suspend_and_fail(self): + """ + Verify that the correct error is signalled if the suspend and + fail flag are both set when disassociating a transaction from + the channel + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("suspend_and_fail") + channel.dtx_demarcation_start(xid=tx) + try: + channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) + self.fail("Suspend and fail both set, expected exception!") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.dtx_coordination_rollback(xid=tx) + channel.channel_close() + other.close() + + + def test_end_unknown_xid(self): + """ + Verifies that the correct exception is thrown when an attempt + is made to end the association for a xid not previously + associated with the channel + """ + channel = self.channel + channel.dtx_demarcation_select() + tx = self.xid("unknown-xid") + try: + channel.dtx_demarcation_end(xid=tx) + self.fail("Attempted to end association with unknown xid, expected exception!") + except Closed, e: + #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... + self.assertConnectionException(503, e.args[0]) + + def test_end(self): + """ + Verify that the association is terminated by end and subsequent + operations are non-transactional + """ + channel = self.client.channel(2) + channel.channel_open() + channel.queue_declare(queue="tx-queue", exclusive=True) + + #publish a message under a transaction + channel.dtx_demarcation_select() + tx = self.xid("dummy") + channel.dtx_demarcation_start(xid=tx) + channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage") + channel.dtx_demarcation_end(xid=tx) + + #now that association with txn is ended, publish another message + channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage") + + #check the second message is available, but not the first + self.assertMessageCount(1, "tx-queue") + channel.message_consume(queue="tx-queue", destination="results", no_ack=False) + msg = self.client.queue("results").get(timeout=1) + self.assertEqual("two", msg.message_id) + channel.message_cancel(destination="results") + #ack the message then close the channel + msg.ok() + channel.channel_close() + + channel = self.channel + #commit the transaction and check that the first message (and + #only the first message) is then delivered + channel.dtx_coordination_commit(xid=tx, one_phase=True) + self.assertMessageCount(1, "tx-queue") + self.assertMessageId("one", "tx-queue") + + def test_invalid_commit_one_phase_true(self): + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.channel(1) + tester.channel_open() + tester.queue_declare(queue="dummy", exclusive=True) + tester.dtx_demarcation_select() + tx = self.xid("dummy") + tester.dtx_demarcation_start(xid=tx) + tester.message_transfer(routing_key="dummy", body="whatever") + tester.dtx_demarcation_end(xid=tx) + tester.dtx_coordination_prepare(xid=tx) + failed = False + try: + tester.dtx_coordination_commit(xid=tx, one_phase=True) + except Closed, e: + failed = True + error = e + + if failed: + self.channel.dtx_coordination_rollback(xid=tx) + self.assertConnectionException(503, e.args[0]) + else: + tester.channel_close() + other.close() + self.fail("Invalid use of one_phase=True, expected exception!") + + def test_invalid_commit_one_phase_false(self): + """ + Test that a commit with one_phase = False is rejected if the + transaction in question has not yet been prepared. + """ + """ + Test that a commit with one_phase = True is rejected if the + transaction in question has already been prepared. + """ + other = self.connect() + tester = other.channel(1) + tester.channel_open() + tester.queue_declare(queue="dummy", exclusive=True) + tester.dtx_demarcation_select() + tx = self.xid("dummy") + tester.dtx_demarcation_start(xid=tx) + tester.message_transfer(routing_key="dummy", body="whatever") + tester.dtx_demarcation_end(xid=tx) + failed = False + try: + tester.dtx_coordination_commit(xid=tx, one_phase=False) + except Closed, e: + failed = True + error = e + + if failed: + self.channel.dtx_coordination_rollback(xid=tx) + self.assertConnectionException(503, e.args[0]) + else: + tester.channel_close() + other.close() + self.fail("Invalid use of one_phase=False, expected exception!") + + def test_implicit_end(self): + """ + Test that an association is implicitly ended when the channel + is closed (whether by exception or explicit client request) + and the transaction in question is marked as rollback only. + """ + channel1 = self.channel + channel2 = self.client.channel(2) + channel2.channel_open() + + #setup: + channel2.queue_declare(queue="dummy", exclusive=True) + channel2.message_transfer(routing_key="dummy", body="whatever") + tx = self.xid("dummy") + + channel2.dtx_demarcation_select() + channel2.dtx_demarcation_start(xid=tx) + channel2.message_get(queue="dummy", destination="dummy") + self.client.queue("dummy").get(timeout=1).ok() + channel2.message_transfer(routing_key="dummy", body="whatever") + channel2.channel_close() + + self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags) + channel1.dtx_coordination_rollback(xid=tx) + + def test_recover(self): + """ + Test basic recover behaviour + """ + channel = self.channel + + channel.dtx_demarcation_select() + channel.queue_declare(queue="dummy", exclusive=True) + + prepared = [] + for i in range(1, 10): + tx = self.xid("tx%s" % (i)) + channel.dtx_demarcation_start(xid=tx) + channel.message_transfer(routing_key="dummy", body="message%s" % (i)) + channel.dtx_demarcation_end(xid=tx) + if i in [2, 5, 6, 8]: + channel.dtx_coordination_prepare(xid=tx) + prepared.append(tx) + else: + channel.dtx_coordination_rollback(xid=tx) + + indoubt = channel.dtx_coordination_recover().xids + #convert indoubt table to a list of xids (note: this will change for 0-10) + data = indoubt["xids"] + xids = [] + pos = 0 + while pos < len(data): + size = unpack("!B", data[pos])[0] + start = pos + 1 + end = start + size + xid = data[start:end] + xids.append(xid) + pos = end + + #rollback the prepared transactions returned by recover + for x in xids: + channel.dtx_coordination_rollback(xid=x) + + #validate against the expected list of prepared transactions + actual = set(xids) + expected = set(prepared) + intersection = actual.intersection(expected) + + if intersection != expected: + missing = expected.difference(actual) + extra = actual.difference(expected) + for x in missing: + channel.dtx_coordination_rollback(xid=x) + self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) + + def xid(self, txid, branchqual = ''): + return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual + + def txswap(self, tx, id): + channel = self.channel + #declare two queues: + channel.queue_declare(queue="queue-a", exclusive=True) + channel.queue_declare(queue="queue-b", exclusive=True) + #put message with specified id on one queue: + channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage") + + #start the transaction: + channel.dtx_demarcation_select() + self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags) + + #'swap' the message from one queue to the other, under that transaction: + self.swap(self.channel, "queue-a", "queue-b") + + #mark the end of the transactional work: + self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags) + + def swap(self, channel, src, dest): + #consume from src: + channel.message_get(destination="temp-swap", queue=src) + msg = self.client.queue("temp-swap").get(timeout=1) + msg.ok(); + + #re-publish to dest + channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) + + def assertMessageCount(self, expected, queue): + self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count) + + def assertMessageId(self, expected, queue): + self.channel.message_consume(queue=queue, destination="results", no_ack=True) + self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) + self.channel.message_cancel(destination="results") |