diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-26 18:38:35 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-26 18:38:35 +0000 |
| commit | 719c2529a14527c236e871603136ccbe44f632d3 (patch) | |
| tree | 499f5c7b1d2348e46e34cb12d9c9dd5169901022 /python/tests_0-10 | |
| parent | 5c8e2d27f805eff9f6a457d895fa38dc495301fd (diff) | |
| download | qpid-python-719c2529a14527c236e871603136ccbe44f632d3.tar.gz | |
Update to dtx inline with latest spec:
* Updated dtx handling in c++ broker to take account of separation of completion and acceptance.
* Added final dtx method defs to extra xml fragment and implemented appropriate handlers in c++ broker.
* Converted dtx python tests (recover test still requires some work on decoding arrays).
* Allow creation of structs without type codes through a python session method.
* Fixed exception handling in python client for commands with results.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641464 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
| -rw-r--r-- | python/tests_0-10/dtx.py | 505 |
1 files changed, 268 insertions, 237 deletions
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index f84f91c75a..2483c6f16d 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -18,12 +18,13 @@ # from qpid.client import Client, Closed from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.datatypes import Message, RangedSet +from qpid.session import SessionException +from qpid.testlib import TestBase010 from struct import pack, unpack from time import sleep -class DtxTests(TestBase): +class DtxTests(TestBase010): """ Tests for the amqp dtx related classes. @@ -43,15 +44,15 @@ class DtxTests(TestBase): tx_counter = 0 def reset_channel(self): - self.channel.session_close() - self.channel = self.client.channel(self.channel.id + 1) - self.channel.session_open() + self.session.close() + self.session = self.conn.session("dtx-session", 1) def test_simple_commit(self): """ Test basic one-phase commit behaviour. """ - channel = self.channel + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session tx = self.xid("my-xid") self.txswap(tx, "commit") @@ -60,9 +61,9 @@ class DtxTests(TestBase): self.assertMessageCount(0, "queue-b") #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status) + self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status) - #should close and reopen channel to ensure no unacked messages are held + #should close and reopen session to ensure no unacked messages are held self.reset_channel() #check result @@ -74,19 +75,20 @@ class DtxTests(TestBase): """ Test basic two-phase commit behaviour. """ - channel = self.channel + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session tx = self.xid("my-xid") self.txswap(tx, "prepare-commit") #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) #neither queue should have any messages accessible self.assertMessageCount(0, "queue-a") self.assertMessageCount(0, "queue-b") #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status) + self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status) self.reset_channel() @@ -100,7 +102,8 @@ class DtxTests(TestBase): """ Test basic rollback behaviour. """ - channel = self.channel + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session tx = self.xid("my-xid") self.txswap(tx, "rollback") @@ -109,7 +112,7 @@ class DtxTests(TestBase): self.assertMessageCount(0, "queue-b") #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) self.reset_channel() @@ -122,19 +125,20 @@ class DtxTests(TestBase): """ Test basic rollback behaviour after the transaction has been prepared. """ - channel = self.channel + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + session = self.session tx = self.xid("my-xid") self.txswap(tx, "prepare-rollback") #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) #neither queue should have any messages accessible self.assertMessageCount(0, "queue-a") self.assertMessageCount(0, "queue-b") #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) self.reset_channel() @@ -148,17 +152,17 @@ class DtxTests(TestBase): check that an error is flagged if select is not issued before start or end """ - channel = self.channel + session = self.session tx = self.xid("dummy") try: - channel.dtx_demarcation_start(xid=tx) + session.dtx_start(xid=tx) #if we get here we have failed, but need to do some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - self.fail("Channel not selected for use with dtx, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) + session.dtx_end(xid=tx) + session.dtx_rollback(xid=tx) + self.fail("Session not selected for use with dtx, expected exception!") + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) def test_start_already_known(self): """ @@ -166,36 +170,35 @@ class DtxTests(TestBase): transaction that is already known is not allowed (unless the join flag is set). """ - #create two channels on different connection & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() + #create two sessions on different connection & select them for use with dtx: + session1 = self.session + session1.dtx_select() other = self.connect() - channel2 = other.channel(1) - channel2.session_open() - channel2.dtx_demarcation_select() + session2 = other.session("other", 0) + session2.dtx_select() #create a xid tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) + #start work on one session under that xid: + session1.dtx_start(xid=tx) #then start on the other without the join set failed = False try: - channel2.dtx_demarcation_start(xid=tx) - except Closed, e: + session2.dtx_start(xid=tx) + except SessionException, e: failed = True error = e #cleanup: if not failed: - channel2.dtx_demarcation_end(xid=tx) + session2.dtx_end(xid=tx) other.close() - channel1.dtx_demarcation_end(xid=tx) - channel1.dtx_coordination_rollback(xid=tx) + session1.dtx_end(xid=tx) + session1.dtx_rollback(xid=tx) #verification: - if failed: self.assertConnectionException(503, e.args[0]) + if failed: self.assertEquals(503, error.args[0].error_code) else: self.fail("Xid already known, expected exception!") def test_forget_xid_on_completion(self): @@ -205,69 +208,69 @@ class DtxTests(TestBase): """ #do some transactional work & complete the transaction self.test_simple_commit() - # channel has been reset, so reselect for use with dtx - self.channel.dtx_demarcation_select() + # session has been reset, so reselect for use with dtx + self.session.dtx_select() #start association for the same xid as the previously completed txn tx = self.xid("my-xid") - self.channel.dtx_demarcation_start(xid=tx) - self.channel.dtx_demarcation_end(xid=tx) - self.channel.dtx_coordination_rollback(xid=tx) + self.session.dtx_start(xid=tx) + self.session.dtx_end(xid=tx) + self.session.dtx_rollback(xid=tx) def test_start_join_and_resume(self): """ Ensure the correct error is signalled when both the join and resume flags are set on starting an association between a - channel and a transcation. + session and a transcation. """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() tx = self.xid("dummy") try: - channel.dtx_demarcation_start(xid=tx, join=True, resume=True) + session.dtx_start(xid=tx, join=True, resume=True) #failed, but need some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) + session.dtx_end(xid=tx) + session.dtx_rollback(xid=tx) self.fail("Join and resume both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) def test_start_join(self): """ - Verify 'join' behaviour, where a channel is associated with a - transaction that is already associated with another channel. + Verify 'join' behaviour, where a session is associated with a + transaction that is already associated with another session. """ - #create two channels & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() + guard = self.keepQueuesAlive(["one", "two"]) + #create two sessions & select them for use with dtx: + session1 = self.session + session1.dtx_select() - channel2 = self.client.channel(2) - channel2.session_open() - channel2.dtx_demarcation_select() + session2 = self.conn.session("second", 2) + session2.dtx_select() #setup - channel1.queue_declare(queue="one", exclusive=True, auto_delete=True) - channel1.queue_declare(queue="two", exclusive=True, auto_delete=True) - channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) - channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + session1.queue_declare(queue="one", auto_delete=True) + session1.queue_declare(queue="two", auto_delete=True) + session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage")) + session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage")) #create a xid tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) + #start work on one session under that xid: + session1.dtx_start(xid=tx) #then start on the other with the join flag set - channel2.dtx_demarcation_start(xid=tx, join=True) + session2.dtx_start(xid=tx, join=True) - #do work through each channel - self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' - self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' + #do work through each session + self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two' + self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one' - #mark end on both channels - channel1.dtx_demarcation_end(xid=tx) - channel2.dtx_demarcation_end(xid=tx) + #mark end on both sessions + session1.dtx_end(xid=tx) + session2.dtx_end(xid=tx) #commit and check - channel1.dtx_coordination_commit(xid=tx, one_phase=True) + session1.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") @@ -278,27 +281,27 @@ class DtxTests(TestBase): """ Test suspension and resumption of an association """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() #setup - channel.queue_declare(queue="one", exclusive=True, auto_delete=True) - channel.queue_declare(queue="two", exclusive=True, auto_delete=True) - channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) - channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + session.queue_declare(queue="one", exclusive=True, auto_delete=True) + session.queue_declare(queue="two", exclusive=True, auto_delete=True) + session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) + session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' - channel.dtx_demarcation_end(xid=tx, suspend=True) + session.dtx_start(xid=tx) + self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' + session.dtx_end(xid=tx, suspend=True) - channel.dtx_demarcation_start(xid=tx, resume=True) - self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' - channel.dtx_demarcation_end(xid=tx) + session.dtx_start(xid=tx, resume=True) + self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' + session.dtx_end(xid=tx) #commit and check - channel.dtx_coordination_commit(xid=tx, one_phase=True) + session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") @@ -310,27 +313,27 @@ class DtxTests(TestBase): done on another transaction when the first transaction is suspended """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() #setup - channel.queue_declare(queue="one", exclusive=True, auto_delete=True) - channel.queue_declare(queue="two", exclusive=True, auto_delete=True) - channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage")) - channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage")) + session.queue_declare(queue="one", exclusive=True, auto_delete=True) + session.queue_declare(queue="two", exclusive=True, auto_delete=True) + session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) + session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' - channel.dtx_demarcation_end(xid=tx, suspend=True) + session.dtx_start(xid=tx) + self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' + session.dtx_end(xid=tx, suspend=True) - channel.dtx_demarcation_start(xid=tx, resume=True) - self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' - channel.dtx_demarcation_end(xid=tx) + session.dtx_start(xid=tx, resume=True) + self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' + session.dtx_end(xid=tx) #commit and check - channel.dtx_coordination_commit(xid=tx, one_phase=True) + session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") @@ -340,24 +343,23 @@ class DtxTests(TestBase): """ Verify that the correct error is signalled if the suspend and fail flag are both set when disassociating a transaction from - the channel + the session """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() tx = self.xid("suspend_and_fail") - channel.dtx_demarcation_start(xid=tx) + session.dtx_start(xid=tx) try: - channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) + session.dtx_end(xid=tx, suspend=True, fail=True) self.fail("Suspend and fail both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) #cleanup other = self.connect() - channel = other.channel(1) - channel.session_open() - channel.dtx_coordination_rollback(xid=tx) - channel.session_close() + session = other.session("cleanup", 1) + session.dtx_rollback(xid=tx) + session.close() other.close() @@ -365,51 +367,51 @@ class DtxTests(TestBase): """ Verifies that the correct exception is thrown when an attempt is made to end the association for a xid not previously - associated with the channel + associated with the session """ - channel = self.channel - channel.dtx_demarcation_select() + session = self.session + session.dtx_select() tx = self.xid("unknown-xid") try: - channel.dtx_demarcation_end(xid=tx) + session.dtx_end(xid=tx) self.fail("Attempted to end association with unknown xid, expected exception!") - except Closed, e: + except SessionException, e: #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... - self.assertConnectionException(503, e.args[0]) + self.assertEquals(503, e.args[0].error_code) def test_end(self): """ Verify that the association is terminated by end and subsequent operations are non-transactional """ - channel = self.client.channel(2) - channel.session_open() - channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True) + guard = self.keepQueuesAlive(["tx-queue"]) + session = self.conn.session("alternate", 1) + session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True) #publish a message under a transaction - channel.dtx_demarcation_select() + session.dtx_select() tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage")) - channel.dtx_demarcation_end(xid=tx) + session.dtx_start(xid=tx) + session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage")) + session.dtx_end(xid=tx) #now that association with txn is ended, publish another message - channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage")) + session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage")) #check the second message is available, but not the first self.assertMessageCount(1, "tx-queue") - self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1) - msg = self.client.queue("results").get(timeout=1) - self.assertEqual("two", msg.content['message_id']) - channel.message_cancel(destination="results") - #ack the message then close the channel - msg.complete() - channel.session_close() - - channel = self.channel + self.subscribe(session, queue="tx-queue", destination="results") + msg = session.incoming("results").get(timeout=1) + self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id')) + session.message_cancel(destination="results") + #ack the message then close the session + session.message_accept(RangedSet(msg.id)) + session.close() + + session = self.session #commit the transaction and check that the first message (and #only the first message) is then delivered - channel.dtx_coordination_commit(xid=tx, one_phase=True) + session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "tx-queue") self.assertMessageId("one", "tx-queue") @@ -419,27 +421,26 @@ class DtxTests(TestBase): transaction in question has already been prepared. """ other = self.connect() - tester = other.channel(1) - tester.session_open() + tester = other.session("tester", 1) tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - tester.dtx_demarcation_select() + tester.dtx_select() tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) - tester.dtx_demarcation_end(xid=tx) - tester.dtx_coordination_prepare(xid=tx) + tester.dtx_start(xid=tx) + tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + tester.dtx_end(xid=tx) + tester.dtx_prepare(xid=tx) failed = False try: - tester.dtx_coordination_commit(xid=tx, one_phase=True) - except Closed, e: + tester.dtx_commit(xid=tx, one_phase=True) + except SessionException, e: failed = True error = e if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) + self.session.dtx_rollback(xid=tx) + self.assertEquals(503, error.args[0].error_code) else: - tester.session_close() + tester.close() other.close() self.fail("Invalid use of one_phase=True, expected exception!") @@ -453,99 +454,99 @@ class DtxTests(TestBase): transaction in question has already been prepared. """ other = self.connect() - tester = other.channel(1) - tester.session_open() + tester = other.session("tester", 1) tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - tester.dtx_demarcation_select() + tester.dtx_select() tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) - tester.dtx_demarcation_end(xid=tx) + tester.dtx_start(xid=tx) + tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) + tester.dtx_end(xid=tx) failed = False try: - tester.dtx_coordination_commit(xid=tx, one_phase=False) - except Closed, e: + tester.dtx_commit(xid=tx, one_phase=False) + except SessionException, e: failed = True error = e if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) + self.session.dtx_rollback(xid=tx) + self.assertEquals(503, error.args[0].error_code) else: - tester.session_close() + tester.close() other.close() self.fail("Invalid use of one_phase=False, expected exception!") def test_implicit_end(self): """ - Test that an association is implicitly ended when the channel + Test that an association is implicitly ended when the session is closed (whether by exception or explicit client request) and the transaction in question is marked as rollback only. """ - channel1 = self.channel - channel2 = self.client.channel(2) - channel2.session_open() + session1 = self.session + session2 = self.conn.session("other", 2) #setup: - channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True) - channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) + session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever")) tx = self.xid("dummy") - channel2.dtx_demarcation_select() - channel2.dtx_demarcation_start(xid=tx) - channel2.message_subscribe(queue="dummy", destination="dummy", confirm_mode=1) - channel2.message_flow(destination="dummy", unit=0, value=1) - channel2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF) - self.client.queue("dummy").get(timeout=1).complete() - channel2.message_cancel(destination="dummy") - channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever")) - channel2.session_close() + session2.dtx_select() + session2.dtx_start(xid=tx) + session2.message_subscribe(queue="dummy", destination="dummy") + session2.message_flow(destination="dummy", unit=0, value=1) + session2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF) + msg = session2.incoming("dummy").get(timeout=1) + session2.message_accept(RangedSet(msg.id)) + session2.message_cancel(destination="dummy") + session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever")) + session2.close() - self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status) - channel1.dtx_coordination_rollback(xid=tx) + self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status) + session1.dtx_rollback(xid=tx) def test_get_timeout(self): """ Check that get-timeout returns the correct value, (and that a transaction with a timeout can complete normally) """ - channel = self.channel + session = self.session tx = self.xid("dummy") - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) - channel.dtx_coordination_set_timeout(xid=tx, timeout=60) - self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) - self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status) - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + session.dtx_select() + session.dtx_start(xid=tx) + self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout) + session.dtx_set_timeout(xid=tx, timeout=60) + self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) + self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) def test_set_timeout(self): """ Test the timeout of a transaction results in the expected behaviour """ - #open new channel to allow self.channel to be used in checking te queue - channel = self.client.channel(2) - channel.session_open() + + guard = self.keepQueuesAlive(["queue-a", "queue-b"]) + #open new session to allow self.session to be used in checking the queue + session = self.conn.session("worker", 1) #setup: tx = self.xid("dummy") - channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) - channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) - channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage")) - - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "queue-a", "queue-b") - channel.dtx_coordination_set_timeout(xid=tx, timeout=2) + session.queue_declare(queue="queue-a", auto_delete=True) + session.queue_declare(queue="queue-b", auto_delete=True) + session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage")) + + session.dtx_select() + session.dtx_start(xid=tx) + self.swap(session, "queue-a", "queue-b") + session.dtx_set_timeout(xid=tx, timeout=2) sleep(3) #check that the work has been rolled back already self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") self.assertMessageId("timeout", "queue-a") #check the correct codes are returned when we try to complete the txn - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status) - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) @@ -553,28 +554,29 @@ class DtxTests(TestBase): """ Test basic recover behaviour """ - channel = self.channel + session = self.session - channel.dtx_demarcation_select() - channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True) + session.dtx_select() + session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) prepared = [] for i in range(1, 10): tx = self.xid("tx%s" % (i)) - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i))) - channel.dtx_demarcation_end(xid=tx) + session.dtx_start(xid=tx) + session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i))) + session.dtx_end(xid=tx) if i in [2, 5, 6, 8]: - channel.dtx_coordination_prepare(xid=tx) + session.dtx_prepare(xid=tx) prepared.append(tx) else: - channel.dtx_coordination_rollback(xid=tx) + session.dtx_rollback(xid=tx) - xids = channel.dtx_coordination_recover().in_doubt + xids = session.dtx_recover().in_doubt + print "xids=%s" % xids #rollback the prepared transactions returned by recover for x in xids: - channel.dtx_coordination_rollback(xid=x) + session.dtx_rollback(xid=x) #validate against the expected list of prepared transactions actual = set(xids) @@ -585,61 +587,90 @@ class DtxTests(TestBase): missing = expected.difference(actual) extra = actual.difference(expected) for x in missing: - channel.dtx_coordination_rollback(xid=x) + session.dtx_rollback(xid=x) self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) def test_bad_resume(self): """ Test that a resume on a session not selected for use with dtx fails """ - channel = self.channel + session = self.session try: - channel.dtx_demarcation_start(resume=True) - except Closed, e: - self.assertConnectionException(503, e.args[0]) + session.dtx_start(resume=True) + except SessionException, e: + self.assertEquals(503, e.args[0].error_code) def xid(self, txid): DtxTests.tx_counter += 1 branchqual = "v%s" % DtxTests.tx_counter - return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual - + return self.session.xid(format=0, global_id=txid, branch_id=branchqual) + def txswap(self, tx, id): - channel = self.channel + session = self.session #declare two queues: - channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) - channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) + session.queue_declare(queue="queue-a", auto_delete=True) + session.queue_declare(queue="queue-b", auto_delete=True) + #put message with specified id on one queue: - channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage")) + dp=session.delivery_properties(routing_key="queue-a") + mp=session.message_properties(correlation_id=id) + session.message_transfer(message=Message(dp, mp, "DtxMessage")) #start the transaction: - channel.dtx_demarcation_select() - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status) + session.dtx_select() + self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) #'swap' the message from one queue to the other, under that transaction: - self.swap(self.channel, "queue-a", "queue-b") + self.swap(self.session, "queue-a", "queue-b") #mark the end of the transactional work: - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status) + self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status) - def swap(self, channel, src, dest): + def swap(self, session, src, dest): #consume from src: - channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1) - channel.message_flow(destination="temp-swap", unit=0, value=1) - channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF) - msg = self.client.queue("temp-swap").get(timeout=1) - channel.message_cancel(destination="temp-swap") - msg.complete(); - - #re-publish to dest - channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']}, - body=msg.content.body)) + session.message_subscribe(destination="temp-swap", queue=src) + session.message_flow(destination="temp-swap", unit=0, value=1) + session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF) + msg = session.incoming("temp-swap").get(timeout=1) + session.message_cancel(destination="temp-swap") + session.message_accept(RangedSet(msg.id)) + #todo: also complete at this point? + + #re-publish to dest: + dp=session.delivery_properties(routing_key=dest) + mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id')) + session.message_transfer(message=Message(dp, mp, msg.body)) def assertMessageCount(self, expected, queue): - self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count) + self.assertEqual(expected, self.session.queue_query(queue=queue).message_count) def assertMessageId(self, expected, queue): - self.channel.message_subscribe(queue=queue, destination="results") - self.channel.message_flow(destination="results", unit=0, value=1) - self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF) - self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id']) - self.channel.message_cancel(destination="results") + self.session.message_subscribe(queue=queue, destination="results") + self.session.message_flow(destination="results", unit=0, value=1) + self.session.message_flow(destination="results", unit=1, value=0xFFFFFFFF) + self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id')) + self.session.message_cancel(destination="results") + + def getMessageProperty(self, msg, prop): + for h in msg.headers: + if hasattr(h, prop): return getattr(h, prop) + return None + + def keepQueuesAlive(self, names): + session = self.conn.session("nasty", 99) + for n in names: + session.queue_declare(queue=n, auto_delete=True) + session.message_subscribe(destination=n, queue=n) + return session + + def createMessage(self, session, key, id, body): + dp=session.delivery_properties(routing_key=key) + mp=session.message_properties(correlation_id=id) + session.message_transfer(message=Message(dp, mp, body)) + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) |
