summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/amqp-doc6
-rw-r--r--python/qpid/spec.py18
-rw-r--r--python/qpid/testlib.py6
-rw-r--r--python/tests_0-9/dtx.py540
4 files changed, 557 insertions, 13 deletions
diff --git a/python/amqp-doc b/python/amqp-doc
index 00226d63cb..1f5910f942 100755
--- a/python/amqp-doc
+++ b/python/amqp-doc
@@ -37,15 +37,17 @@ Options:
""" % (msg, sys.argv[0])).strip()
try:
- opts, args = getopt(sys.argv[1:], "s:e", ["regexp", "spec="])
+ opts, args = getopt(sys.argv[1:], "s:ea:", ["regexp", "spec=", "additional="])
except GetoptError, e:
die(str(e))
regexp = False
spec = "../specs/amqp.0-9.xml"
+errata = []
for k, v in opts:
if k == "-e" or k == "--regexp": regexp = True
if k == "-s" or k == "--spec": spec = v
+ if k == "-a" or k == "--additional": errata.append(v)
if regexp:
def match(pattern, value):
@@ -57,7 +59,7 @@ else:
def match(pattern, value):
return fnmatch(value, pattern)
-spec = load(spec)
+spec = load(spec, *errata)
methods = {}
patterns = args
for pattern in patterns:
diff --git a/python/qpid/spec.py b/python/qpid/spec.py
index bb0e7eb58c..f8e37737e2 100644
--- a/python/qpid/spec.py
+++ b/python/qpid/spec.py
@@ -309,8 +309,10 @@ def load(specfile, *errata):
for nd in root["constant"]:
const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]),
nd.get("@class"), get_docs(nd))
- spec.constants.add(const)
-
+ try:
+ spec.constants.add(const)
+ except ValueError, e:
+ print "Warning:", e
# domains are typedefs
for nd in root["domain"]:
spec.domains.add(Domain(spec, nd.index(), pythonize(nd["@name"]),
@@ -320,18 +322,20 @@ def load(specfile, *errata):
# classes
for c_nd in root["class"]:
cname = pythonize(c_nd["@name"])
- if root == spec_root:
+ if spec.classes.byname.has_key(cname):
+ klass = spec.classes.byname[cname]
+ else:
klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"],
get_docs(c_nd))
spec.classes.add(klass)
- else:
- klass = spec.classes.byname[cname]
added_methods = []
load_fields(c_nd, klass.fields, spec.domains.byname)
for m_nd in c_nd["method"]:
mname = pythonize(m_nd["@name"])
- if root == spec_root:
+ if klass.methods.byname.has_key(mname):
+ meth = klass.methods.byname[mname]
+ else:
meth = Method(klass, mname,
int(m_nd["@index"]),
m_nd.get_bool("@content", False),
@@ -341,8 +345,6 @@ def load(specfile, *errata):
get_docs(m_nd))
klass.methods.add(meth)
added_methods.append(meth)
- else:
- meth = klass.methods.byname[mname]
load_fields(m_nd, meth.fields, spec.domains.byname)
# resolve the responses
for m in added_methods:
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 48a6755d25..fa904ff029 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -99,7 +99,7 @@ Options:
self.specfile = "0-8"
self.errata = []
try:
- opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"])
+ opts, self.tests = getopt(args, "s:e:b:h?dvi:I:", ["help", "spec", "errata=", "server", "verbose", "ignore", "ignore-file"])
except GetoptError, e:
self._die(str(e))
for opt, value in opts:
@@ -278,14 +278,14 @@ class TestBase(unittest.TestCase):
self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
def assertChannelException(self, expectedCode, message):
- if not isinstance(message, Message): self.fail("expected channel_close method")
+ if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message))
self.assertEqual("channel", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
def assertConnectionException(self, expectedCode, message):
- if not isinstance(message, Message): self.fail("expected connection_close method")
+ if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message))
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
diff --git a/python/tests_0-9/dtx.py b/python/tests_0-9/dtx.py
new file mode 100644
index 0000000000..ec82c72d49
--- /dev/null
+++ b/python/tests_0-9/dtx.py
@@ -0,0 +1,540 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from struct import pack, unpack
+
+class DtxTests(TestBase):
+ """
+ Tests for the amqp dtx related classes.
+
+ Tests of the form test_simple_xxx test the basic transactional
+ behaviour. The approach here is to 'swap' a message from one queue
+ to another by consuming and re-publishing in the same
+ transaction. That transaction is then completed in different ways
+ and the appropriate result verified.
+
+ The other tests enforce more specific rules and behaviour on a
+ per-method or per-field basis.
+ """
+
+ XA_RBROLLBACK = 1
+ XA_OK = 8
+
+ def test_simple_commit(self):
+ """
+ Test basic one-phase commit behaviour.
+ """
+ channel = self.channel
+ tx = self.xid("my-xid")
+ self.txswap(tx, "commit")
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #commit
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags)
+
+ #check result
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(1, "queue-b")
+ self.assertMessageId("commit", "queue-b")
+
+ def test_simple_prepare_commit(self):
+ """
+ Test basic two-phase commit behaviour.
+ """
+ channel = self.channel
+ tx = self.xid("my-xid")
+ self.txswap(tx, "prepare-commit")
+
+ #prepare
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags)
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #commit
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags)
+
+ #check result
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(1, "queue-b")
+ self.assertMessageId("prepare-commit", "queue-b")
+
+
+ def test_simple_rollback(self):
+ """
+ Test basic rollback behaviour.
+ """
+ channel = self.channel
+ tx = self.xid("my-xid")
+ self.txswap(tx, "rollback")
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #rollback
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
+
+ #check result
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("rollback", "queue-a")
+
+ def test_simple_prepare_rollback(self):
+ """
+ Test basic rollback behaviour after the transaction has been prepared.
+ """
+ channel = self.channel
+ tx = self.xid("my-xid")
+ self.txswap(tx, "prepare-rollback")
+
+ #prepare
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags)
+
+ #neither queue should have any messages accessible
+ self.assertMessageCount(0, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+
+ #rollback
+ self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
+
+ #check result
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("prepare-rollback", "queue-a")
+
+ def test_select_required(self):
+ """
+ check that an error is flagged if select is not issued before
+ start or end
+ """
+ channel = self.channel
+ tx = self.xid("dummy")
+ try:
+ channel.dtx_demarcation_start(xid=tx)
+
+ #if we get here we have failed, but need to do some cleanup:
+ channel.dtx_demarcation_end(xid=tx)
+ channel.dtx_coordination_rollback(xid=tx)
+ self.fail("Channel not selected for use with dtx, expected exception!")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_start_already_known(self):
+ """
+ Verify that an attempt to start an association with a
+ transaction that is already known is not allowed (unless the
+ join flag is set).
+ """
+ #create two channels on different connection & select them for use with dtx:
+ channel1 = self.channel
+ channel1.dtx_demarcation_select()
+
+ other = self.connect()
+ channel2 = other.channel(1)
+ channel2.channel_open()
+ channel2.dtx_demarcation_select()
+
+ #create a xid
+ tx = self.xid("dummy")
+ #start work on one channel under that xid:
+ channel1.dtx_demarcation_start(xid=tx)
+ #then start on the other without the join set
+ failed = False
+ try:
+ channel2.dtx_demarcation_start(xid=tx)
+ except Closed, e:
+ failed = True
+ error = e
+
+ #cleanup:
+ if not failed:
+ channel2.dtx_demarcation_end(xid=tx)
+ other.close()
+ channel1.dtx_demarcation_end(xid=tx)
+ channel1.dtx_coordination_rollback(xid=tx)
+
+ #verification:
+ if failed: self.assertConnectionException(503, e.args[0])
+ else: self.fail("Xid already known, expected exception!")
+
+ def test_forget_xid_on_completion(self):
+ """
+ Verify that a xid is 'forgotten' - and can therefore be used
+ again - once it is completed.
+ """
+ channel = self.channel
+ #do some transactional work & complete the transaction
+ self.test_simple_commit()
+
+ #start association for the same xid as the previously completed txn
+ tx = self.xid("my-xid")
+ channel.dtx_demarcation_start(xid=tx)
+ channel.dtx_demarcation_end(xid=tx)
+ channel.dtx_coordination_rollback(xid=tx)
+
+ def test_start_join_and_resume(self):
+ """
+ Ensure the correct error is signalled when both the join and
+ resume flags are set on starting an association between a
+ channel and a transcation.
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+ tx = self.xid("dummy")
+ try:
+ channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
+ #failed, but need some cleanup:
+ channel.dtx_demarcation_end(xid=tx)
+ channel.dtx_coordination_rollback(xid=tx)
+ self.fail("Join and resume both set, expected exception!")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_start_join(self):
+ """
+ Verify 'join' behaviour, where a channel is associated with a
+ transaction that is already associated with another channel.
+ """
+ #create two channels & select them for use with dtx:
+ channel1 = self.channel
+ channel1.dtx_demarcation_select()
+
+ channel2 = self.client.channel(2)
+ channel2.channel_open()
+ channel2.dtx_demarcation_select()
+
+ #setup
+ channel1.queue_declare(queue="one", exclusive=True)
+ channel1.queue_declare(queue="two", exclusive=True)
+ channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
+ channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
+
+ #create a xid
+ tx = self.xid("dummy")
+ #start work on one channel under that xid:
+ channel1.dtx_demarcation_start(xid=tx)
+ #then start on the other with the join flag set
+ channel2.dtx_demarcation_start(xid=tx, join=True)
+
+ #do work through each channel
+ self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
+ self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
+
+ #mark end on both channels
+ channel1.dtx_demarcation_end(xid=tx)
+ channel2.dtx_demarcation_end(xid=tx)
+
+ #commit and check
+ channel1.dtx_coordination_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "one")
+ self.assertMessageCount(1, "two")
+ self.assertMessageId("a", "two")
+ self.assertMessageId("b", "one")
+
+
+ def test_suspend_resume(self):
+ """
+ Test suspension and resumption of an association
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+
+ #setup
+ channel.queue_declare(queue="one", exclusive=True)
+ channel.queue_declare(queue="two", exclusive=True)
+ channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
+ channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
+
+ tx = self.xid("dummy")
+
+ channel.dtx_demarcation_start(xid=tx)
+ self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
+ channel.dtx_demarcation_end(xid=tx, suspend=True)
+
+ channel.dtx_demarcation_start(xid=tx, resume=True)
+ self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
+ channel.dtx_demarcation_end(xid=tx)
+
+ #commit and check
+ channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "one")
+ self.assertMessageCount(1, "two")
+ self.assertMessageId("a", "two")
+ self.assertMessageId("b", "one")
+
+ def test_end_suspend_and_fail(self):
+ """
+ Verify that the correct error is signalled if the suspend and
+ fail flag are both set when disassociating a transaction from
+ the channel
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+ tx = self.xid("suspend_and_fail")
+ channel.dtx_demarcation_start(xid=tx)
+ try:
+ channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
+ self.fail("Suspend and fail both set, expected exception!")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ #cleanup
+ other = self.connect()
+ channel = other.channel(1)
+ channel.channel_open()
+ channel.dtx_coordination_rollback(xid=tx)
+ channel.channel_close()
+ other.close()
+
+
+ def test_end_unknown_xid(self):
+ """
+ Verifies that the correct exception is thrown when an attempt
+ is made to end the association for a xid not previously
+ associated with the channel
+ """
+ channel = self.channel
+ channel.dtx_demarcation_select()
+ tx = self.xid("unknown-xid")
+ try:
+ channel.dtx_demarcation_end(xid=tx)
+ self.fail("Attempted to end association with unknown xid, expected exception!")
+ except Closed, e:
+ #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
+ self.assertConnectionException(503, e.args[0])
+
+ def test_end(self):
+ """
+ Verify that the association is terminated by end and subsequent
+ operations are non-transactional
+ """
+ channel = self.client.channel(2)
+ channel.channel_open()
+ channel.queue_declare(queue="tx-queue", exclusive=True)
+
+ #publish a message under a transaction
+ channel.dtx_demarcation_select()
+ tx = self.xid("dummy")
+ channel.dtx_demarcation_start(xid=tx)
+ channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage")
+ channel.dtx_demarcation_end(xid=tx)
+
+ #now that association with txn is ended, publish another message
+ channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage")
+
+ #check the second message is available, but not the first
+ self.assertMessageCount(1, "tx-queue")
+ channel.message_consume(queue="tx-queue", destination="results", no_ack=False)
+ msg = self.client.queue("results").get(timeout=1)
+ self.assertEqual("two", msg.message_id)
+ channel.message_cancel(destination="results")
+ #ack the message then close the channel
+ msg.ok()
+ channel.channel_close()
+
+ channel = self.channel
+ #commit the transaction and check that the first message (and
+ #only the first message) is then delivered
+ channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ self.assertMessageCount(1, "tx-queue")
+ self.assertMessageId("one", "tx-queue")
+
+ def test_invalid_commit_one_phase_true(self):
+ """
+ Test that a commit with one_phase = True is rejected if the
+ transaction in question has already been prepared.
+ """
+ other = self.connect()
+ tester = other.channel(1)
+ tester.channel_open()
+ tester.queue_declare(queue="dummy", exclusive=True)
+ tester.dtx_demarcation_select()
+ tx = self.xid("dummy")
+ tester.dtx_demarcation_start(xid=tx)
+ tester.message_transfer(routing_key="dummy", body="whatever")
+ tester.dtx_demarcation_end(xid=tx)
+ tester.dtx_coordination_prepare(xid=tx)
+ failed = False
+ try:
+ tester.dtx_coordination_commit(xid=tx, one_phase=True)
+ except Closed, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.channel.dtx_coordination_rollback(xid=tx)
+ self.assertConnectionException(503, e.args[0])
+ else:
+ tester.channel_close()
+ other.close()
+ self.fail("Invalid use of one_phase=True, expected exception!")
+
+ def test_invalid_commit_one_phase_false(self):
+ """
+ Test that a commit with one_phase = False is rejected if the
+ transaction in question has not yet been prepared.
+ """
+ """
+ Test that a commit with one_phase = True is rejected if the
+ transaction in question has already been prepared.
+ """
+ other = self.connect()
+ tester = other.channel(1)
+ tester.channel_open()
+ tester.queue_declare(queue="dummy", exclusive=True)
+ tester.dtx_demarcation_select()
+ tx = self.xid("dummy")
+ tester.dtx_demarcation_start(xid=tx)
+ tester.message_transfer(routing_key="dummy", body="whatever")
+ tester.dtx_demarcation_end(xid=tx)
+ failed = False
+ try:
+ tester.dtx_coordination_commit(xid=tx, one_phase=False)
+ except Closed, e:
+ failed = True
+ error = e
+
+ if failed:
+ self.channel.dtx_coordination_rollback(xid=tx)
+ self.assertConnectionException(503, e.args[0])
+ else:
+ tester.channel_close()
+ other.close()
+ self.fail("Invalid use of one_phase=False, expected exception!")
+
+ def test_implicit_end(self):
+ """
+ Test that an association is implicitly ended when the channel
+ is closed (whether by exception or explicit client request)
+ and the transaction in question is marked as rollback only.
+ """
+ channel1 = self.channel
+ channel2 = self.client.channel(2)
+ channel2.channel_open()
+
+ #setup:
+ channel2.queue_declare(queue="dummy", exclusive=True)
+ channel2.message_transfer(routing_key="dummy", body="whatever")
+ tx = self.xid("dummy")
+
+ channel2.dtx_demarcation_select()
+ channel2.dtx_demarcation_start(xid=tx)
+ channel2.message_get(queue="dummy", destination="dummy")
+ self.client.queue("dummy").get(timeout=1).ok()
+ channel2.message_transfer(routing_key="dummy", body="whatever")
+ channel2.channel_close()
+
+ self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags)
+ channel1.dtx_coordination_rollback(xid=tx)
+
+ def test_recover(self):
+ """
+ Test basic recover behaviour
+ """
+ channel = self.channel
+
+ channel.dtx_demarcation_select()
+ channel.queue_declare(queue="dummy", exclusive=True)
+
+ prepared = []
+ for i in range(1, 10):
+ tx = self.xid("tx%s" % (i))
+ channel.dtx_demarcation_start(xid=tx)
+ channel.message_transfer(routing_key="dummy", body="message%s" % (i))
+ channel.dtx_demarcation_end(xid=tx)
+ if i in [2, 5, 6, 8]:
+ channel.dtx_coordination_prepare(xid=tx)
+ prepared.append(tx)
+ else:
+ channel.dtx_coordination_rollback(xid=tx)
+
+ indoubt = channel.dtx_coordination_recover().xids
+ #convert indoubt table to a list of xids (note: this will change for 0-10)
+ data = indoubt["xids"]
+ xids = []
+ pos = 0
+ while pos < len(data):
+ size = unpack("!B", data[pos])[0]
+ start = pos + 1
+ end = start + size
+ xid = data[start:end]
+ xids.append(xid)
+ pos = end
+
+ #rollback the prepared transactions returned by recover
+ for x in xids:
+ channel.dtx_coordination_rollback(xid=x)
+
+ #validate against the expected list of prepared transactions
+ actual = set(xids)
+ expected = set(prepared)
+ intersection = actual.intersection(expected)
+
+ if intersection != expected:
+ missing = expected.difference(actual)
+ extra = actual.difference(expected)
+ for x in missing:
+ channel.dtx_coordination_rollback(xid=x)
+ self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
+
+ def xid(self, txid, branchqual = ''):
+ return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+
+ def txswap(self, tx, id):
+ channel = self.channel
+ #declare two queues:
+ channel.queue_declare(queue="queue-a", exclusive=True)
+ channel.queue_declare(queue="queue-b", exclusive=True)
+ #put message with specified id on one queue:
+ channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage")
+
+ #start the transaction:
+ channel.dtx_demarcation_select()
+ self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags)
+
+ #'swap' the message from one queue to the other, under that transaction:
+ self.swap(self.channel, "queue-a", "queue-b")
+
+ #mark the end of the transactional work:
+ self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags)
+
+ def swap(self, channel, src, dest):
+ #consume from src:
+ channel.message_get(destination="temp-swap", queue=src)
+ msg = self.client.queue("temp-swap").get(timeout=1)
+ msg.ok();
+
+ #re-publish to dest
+ channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)
+
+ def assertMessageCount(self, expected, queue):
+ self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count)
+
+ def assertMessageId(self, expected, queue):
+ self.channel.message_consume(queue=queue, destination="results", no_ack=True)
+ self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id)
+ self.channel.message_cancel(destination="results")