summaryrefslogtreecommitdiff
path: root/python/tests_0-10
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-26 18:38:35 +0000
committerGordon Sim <gsim@apache.org>2008-03-26 18:38:35 +0000
commit719c2529a14527c236e871603136ccbe44f632d3 (patch)
tree499f5c7b1d2348e46e34cb12d9c9dd5169901022 /python/tests_0-10
parent5c8e2d27f805eff9f6a457d895fa38dc495301fd (diff)
downloadqpid-python-719c2529a14527c236e871603136ccbe44f632d3.tar.gz
Update to dtx inline with latest spec:
* Updated dtx handling in c++ broker to take account of separation of completion and acceptance. * Added final dtx method defs to extra xml fragment and implemented appropriate handlers in c++ broker. * Converted dtx python tests (recover test still requires some work on decoding arrays). * Allow creation of structs without type codes through a python session method. * Fixed exception handling in python client for commands with results. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641464 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
-rw-r--r--python/tests_0-10/dtx.py505
1 files changed, 268 insertions, 237 deletions
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index f84f91c75a..2483c6f16d 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -18,12 +18,13 @@
#
from qpid.client import Client, Closed
from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.datatypes import Message, RangedSet
+from qpid.session import SessionException
+from qpid.testlib import TestBase010
from struct import pack, unpack
from time import sleep
-class DtxTests(TestBase):
+class DtxTests(TestBase010):
"""
Tests for the amqp dtx related classes.
@@ -43,15 +44,15 @@ class DtxTests(TestBase):
tx_counter = 0
def reset_channel(self):
- self.channel.session_close()
- self.channel = self.client.channel(self.channel.id + 1)
- self.channel.session_open()
+ self.session.close()
+ self.session = self.conn.session("dtx-session", 1)
def test_simple_commit(self):
"""
Test basic one-phase commit behaviour.
"""
- channel = self.channel
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "commit")
@@ -60,9 +61,9 @@ class DtxTests(TestBase):
self.assertMessageCount(0, "queue-b")
#commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status)
- #should close and reopen channel to ensure no unacked messages are held
+ #should close and reopen session to ensure no unacked messages are held
self.reset_channel()
#check result
@@ -74,19 +75,20 @@ class DtxTests(TestBase):
"""
Test basic two-phase commit behaviour.
"""
- channel = self.channel
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "prepare-commit")
#prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
#neither queue should have any messages accessible
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(0, "queue-b")
#commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status)
self.reset_channel()
@@ -100,7 +102,8 @@ class DtxTests(TestBase):
"""
Test basic rollback behaviour.
"""
- channel = self.channel
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "rollback")
@@ -109,7 +112,7 @@ class DtxTests(TestBase):
self.assertMessageCount(0, "queue-b")
#rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
self.reset_channel()
@@ -122,19 +125,20 @@ class DtxTests(TestBase):
"""
Test basic rollback behaviour after the transaction has been prepared.
"""
- channel = self.channel
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ session = self.session
tx = self.xid("my-xid")
self.txswap(tx, "prepare-rollback")
#prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
#neither queue should have any messages accessible
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(0, "queue-b")
#rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
self.reset_channel()
@@ -148,17 +152,17 @@ class DtxTests(TestBase):
check that an error is flagged if select is not issued before
start or end
"""
- channel = self.channel
+ session = self.session
tx = self.xid("dummy")
try:
- channel.dtx_demarcation_start(xid=tx)
+ session.dtx_start(xid=tx)
#if we get here we have failed, but need to do some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
- self.fail("Channel not selected for use with dtx, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ session.dtx_end(xid=tx)
+ session.dtx_rollback(xid=tx)
+ self.fail("Session not selected for use with dtx, expected exception!")
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
def test_start_already_known(self):
"""
@@ -166,36 +170,35 @@ class DtxTests(TestBase):
transaction that is already known is not allowed (unless the
join flag is set).
"""
- #create two channels on different connection & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
+ #create two sessions on different connection & select them for use with dtx:
+ session1 = self.session
+ session1.dtx_select()
other = self.connect()
- channel2 = other.channel(1)
- channel2.session_open()
- channel2.dtx_demarcation_select()
+ session2 = other.session("other", 0)
+ session2.dtx_select()
#create a xid
tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
+ #start work on one session under that xid:
+ session1.dtx_start(xid=tx)
#then start on the other without the join set
failed = False
try:
- channel2.dtx_demarcation_start(xid=tx)
- except Closed, e:
+ session2.dtx_start(xid=tx)
+ except SessionException, e:
failed = True
error = e
#cleanup:
if not failed:
- channel2.dtx_demarcation_end(xid=tx)
+ session2.dtx_end(xid=tx)
other.close()
- channel1.dtx_demarcation_end(xid=tx)
- channel1.dtx_coordination_rollback(xid=tx)
+ session1.dtx_end(xid=tx)
+ session1.dtx_rollback(xid=tx)
#verification:
- if failed: self.assertConnectionException(503, e.args[0])
+ if failed: self.assertEquals(503, error.args[0].error_code)
else: self.fail("Xid already known, expected exception!")
def test_forget_xid_on_completion(self):
@@ -205,69 +208,69 @@ class DtxTests(TestBase):
"""
#do some transactional work & complete the transaction
self.test_simple_commit()
- # channel has been reset, so reselect for use with dtx
- self.channel.dtx_demarcation_select()
+ # session has been reset, so reselect for use with dtx
+ self.session.dtx_select()
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
- self.channel.dtx_demarcation_start(xid=tx)
- self.channel.dtx_demarcation_end(xid=tx)
- self.channel.dtx_coordination_rollback(xid=tx)
+ self.session.dtx_start(xid=tx)
+ self.session.dtx_end(xid=tx)
+ self.session.dtx_rollback(xid=tx)
def test_start_join_and_resume(self):
"""
Ensure the correct error is signalled when both the join and
resume flags are set on starting an association between a
- channel and a transcation.
+ session and a transcation.
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
tx = self.xid("dummy")
try:
- channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
+ session.dtx_start(xid=tx, join=True, resume=True)
#failed, but need some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
+ session.dtx_end(xid=tx)
+ session.dtx_rollback(xid=tx)
self.fail("Join and resume both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
def test_start_join(self):
"""
- Verify 'join' behaviour, where a channel is associated with a
- transaction that is already associated with another channel.
+ Verify 'join' behaviour, where a session is associated with a
+ transaction that is already associated with another session.
"""
- #create two channels & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
+ guard = self.keepQueuesAlive(["one", "two"])
+ #create two sessions & select them for use with dtx:
+ session1 = self.session
+ session1.dtx_select()
- channel2 = self.client.channel(2)
- channel2.session_open()
- channel2.dtx_demarcation_select()
+ session2 = self.conn.session("second", 2)
+ session2.dtx_select()
#setup
- channel1.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel1.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+ session1.queue_declare(queue="one", auto_delete=True)
+ session1.queue_declare(queue="two", auto_delete=True)
+ session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage"))
+ session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage"))
#create a xid
tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
+ #start work on one session under that xid:
+ session1.dtx_start(xid=tx)
#then start on the other with the join flag set
- channel2.dtx_demarcation_start(xid=tx, join=True)
+ session2.dtx_start(xid=tx, join=True)
- #do work through each channel
- self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
- self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
+ #do work through each session
+ self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two'
+ self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one'
- #mark end on both channels
- channel1.dtx_demarcation_end(xid=tx)
- channel2.dtx_demarcation_end(xid=tx)
+ #mark end on both sessions
+ session1.dtx_end(xid=tx)
+ session2.dtx_end(xid=tx)
#commit and check
- channel1.dtx_coordination_commit(xid=tx, one_phase=True)
+ session1.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
@@ -278,27 +281,27 @@ class DtxTests(TestBase):
"""
Test suspension and resumption of an association
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
#setup
- channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+ session.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
+ session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
- channel.dtx_demarcation_end(xid=tx, suspend=True)
+ session.dtx_start(xid=tx)
+ self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
+ session.dtx_end(xid=tx, suspend=True)
- channel.dtx_demarcation_start(xid=tx, resume=True)
- self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_start(xid=tx, resume=True)
+ self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
+ session.dtx_end(xid=tx)
#commit and check
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
@@ -310,27 +313,27 @@ class DtxTests(TestBase):
done on another transaction when the first transaction is
suspended
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
#setup
- channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
- channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+ session.queue_declare(queue="one", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="two", exclusive=True, auto_delete=True)
+ session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
+ session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
- channel.dtx_demarcation_end(xid=tx, suspend=True)
+ session.dtx_start(xid=tx)
+ self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
+ session.dtx_end(xid=tx, suspend=True)
- channel.dtx_demarcation_start(xid=tx, resume=True)
- self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_start(xid=tx, resume=True)
+ self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
+ session.dtx_end(xid=tx)
#commit and check
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "one")
self.assertMessageCount(1, "two")
self.assertMessageId("a", "two")
@@ -340,24 +343,23 @@ class DtxTests(TestBase):
"""
Verify that the correct error is signalled if the suspend and
fail flag are both set when disassociating a transaction from
- the channel
+ the session
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
tx = self.xid("suspend_and_fail")
- channel.dtx_demarcation_start(xid=tx)
+ session.dtx_start(xid=tx)
try:
- channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
+ session.dtx_end(xid=tx, suspend=True, fail=True)
self.fail("Suspend and fail both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
#cleanup
other = self.connect()
- channel = other.channel(1)
- channel.session_open()
- channel.dtx_coordination_rollback(xid=tx)
- channel.session_close()
+ session = other.session("cleanup", 1)
+ session.dtx_rollback(xid=tx)
+ session.close()
other.close()
@@ -365,51 +367,51 @@ class DtxTests(TestBase):
"""
Verifies that the correct exception is thrown when an attempt
is made to end the association for a xid not previously
- associated with the channel
+ associated with the session
"""
- channel = self.channel
- channel.dtx_demarcation_select()
+ session = self.session
+ session.dtx_select()
tx = self.xid("unknown-xid")
try:
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_end(xid=tx)
self.fail("Attempted to end association with unknown xid, expected exception!")
- except Closed, e:
+ except SessionException, e:
#FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
- self.assertConnectionException(503, e.args[0])
+ self.assertEquals(503, e.args[0].error_code)
def test_end(self):
"""
Verify that the association is terminated by end and subsequent
operations are non-transactional
"""
- channel = self.client.channel(2)
- channel.session_open()
- channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
+ guard = self.keepQueuesAlive(["tx-queue"])
+ session = self.conn.session("alternate", 1)
+ session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
#publish a message under a transaction
- channel.dtx_demarcation_select()
+ session.dtx_select()
tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage"))
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_start(xid=tx)
+ session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage"))
+ session.dtx_end(xid=tx)
#now that association with txn is ended, publish another message
- channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage"))
+ session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage"))
#check the second message is available, but not the first
self.assertMessageCount(1, "tx-queue")
- self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1)
- msg = self.client.queue("results").get(timeout=1)
- self.assertEqual("two", msg.content['message_id'])
- channel.message_cancel(destination="results")
- #ack the message then close the channel
- msg.complete()
- channel.session_close()
-
- channel = self.channel
+ self.subscribe(session, queue="tx-queue", destination="results")
+ msg = session.incoming("results").get(timeout=1)
+ self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id'))
+ session.message_cancel(destination="results")
+ #ack the message then close the session
+ session.message_accept(RangedSet(msg.id))
+ session.close()
+
+ session = self.session
#commit the transaction and check that the first message (and
#only the first message) is then delivered
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
+ session.dtx_commit(xid=tx, one_phase=True)
self.assertMessageCount(1, "tx-queue")
self.assertMessageId("one", "tx-queue")
@@ -419,27 +421,26 @@ class DtxTests(TestBase):
transaction in question has already been prepared.
"""
other = self.connect()
- tester = other.channel(1)
- tester.session_open()
+ tester = other.session("tester", 1)
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- tester.dtx_demarcation_select()
+ tester.dtx_select()
tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- tester.dtx_demarcation_end(xid=tx)
- tester.dtx_coordination_prepare(xid=tx)
+ tester.dtx_start(xid=tx)
+ tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+ tester.dtx_end(xid=tx)
+ tester.dtx_prepare(xid=tx)
failed = False
try:
- tester.dtx_coordination_commit(xid=tx, one_phase=True)
- except Closed, e:
+ tester.dtx_commit(xid=tx, one_phase=True)
+ except SessionException, e:
failed = True
error = e
if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(503, error.args[0].error_code)
else:
- tester.session_close()
+ tester.close()
other.close()
self.fail("Invalid use of one_phase=True, expected exception!")
@@ -453,99 +454,99 @@ class DtxTests(TestBase):
transaction in question has already been prepared.
"""
other = self.connect()
- tester = other.channel(1)
- tester.session_open()
+ tester = other.session("tester", 1)
tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- tester.dtx_demarcation_select()
+ tester.dtx_select()
tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- tester.dtx_demarcation_end(xid=tx)
+ tester.dtx_start(xid=tx)
+ tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+ tester.dtx_end(xid=tx)
failed = False
try:
- tester.dtx_coordination_commit(xid=tx, one_phase=False)
- except Closed, e:
+ tester.dtx_commit(xid=tx, one_phase=False)
+ except SessionException, e:
failed = True
error = e
if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
+ self.session.dtx_rollback(xid=tx)
+ self.assertEquals(503, error.args[0].error_code)
else:
- tester.session_close()
+ tester.close()
other.close()
self.fail("Invalid use of one_phase=False, expected exception!")
def test_implicit_end(self):
"""
- Test that an association is implicitly ended when the channel
+ Test that an association is implicitly ended when the session
is closed (whether by exception or explicit client request)
and the transaction in question is marked as rollback only.
"""
- channel1 = self.channel
- channel2 = self.client.channel(2)
- channel2.session_open()
+ session1 = self.session
+ session2 = self.conn.session("other", 2)
#setup:
- channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
- channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+ session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever"))
tx = self.xid("dummy")
- channel2.dtx_demarcation_select()
- channel2.dtx_demarcation_start(xid=tx)
- channel2.message_subscribe(queue="dummy", destination="dummy", confirm_mode=1)
- channel2.message_flow(destination="dummy", unit=0, value=1)
- channel2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF)
- self.client.queue("dummy").get(timeout=1).complete()
- channel2.message_cancel(destination="dummy")
- channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
- channel2.session_close()
+ session2.dtx_select()
+ session2.dtx_start(xid=tx)
+ session2.message_subscribe(queue="dummy", destination="dummy")
+ session2.message_flow(destination="dummy", unit=0, value=1)
+ session2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF)
+ msg = session2.incoming("dummy").get(timeout=1)
+ session2.message_accept(RangedSet(msg.id))
+ session2.message_cancel(destination="dummy")
+ session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever"))
+ session2.close()
- self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status)
- channel1.dtx_coordination_rollback(xid=tx)
+ self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status)
+ session1.dtx_rollback(xid=tx)
def test_get_timeout(self):
"""
Check that get-timeout returns the correct value, (and that a
transaction with a timeout can complete normally)
"""
- channel = self.channel
+ session = self.session
tx = self.xid("dummy")
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
- self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ session.dtx_select()
+ session.dtx_start(xid=tx)
+ self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout)
+ session.dtx_set_timeout(xid=tx, timeout=60)
+ self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
+ self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
def test_set_timeout(self):
"""
Test the timeout of a transaction results in the expected
behaviour
"""
- #open new channel to allow self.channel to be used in checking te queue
- channel = self.client.channel(2)
- channel.session_open()
+
+ guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+ #open new session to allow self.session to be used in checking the queue
+ session = self.conn.session("worker", 1)
#setup:
tx = self.xid("dummy")
- channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage"))
-
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "queue-a", "queue-b")
- channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
+ session.queue_declare(queue="queue-a", auto_delete=True)
+ session.queue_declare(queue="queue-b", auto_delete=True)
+ session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage"))
+
+ session.dtx_select()
+ session.dtx_start(xid=tx)
+ self.swap(session, "queue-a", "queue-b")
+ session.dtx_set_timeout(xid=tx, timeout=2)
sleep(3)
#check that the work has been rolled back already
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
self.assertMessageId("timeout", "queue-a")
#check the correct codes are returned when we try to complete the txn
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status)
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status)
+ self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
+ self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)
@@ -553,28 +554,29 @@ class DtxTests(TestBase):
"""
Test basic recover behaviour
"""
- channel = self.channel
+ session = self.session
- channel.dtx_demarcation_select()
- channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+ session.dtx_select()
+ session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
prepared = []
for i in range(1, 10):
tx = self.xid("tx%s" % (i))
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i)))
- channel.dtx_demarcation_end(xid=tx)
+ session.dtx_start(xid=tx)
+ session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i)))
+ session.dtx_end(xid=tx)
if i in [2, 5, 6, 8]:
- channel.dtx_coordination_prepare(xid=tx)
+ session.dtx_prepare(xid=tx)
prepared.append(tx)
else:
- channel.dtx_coordination_rollback(xid=tx)
+ session.dtx_rollback(xid=tx)
- xids = channel.dtx_coordination_recover().in_doubt
+ xids = session.dtx_recover().in_doubt
+ print "xids=%s" % xids
#rollback the prepared transactions returned by recover
for x in xids:
- channel.dtx_coordination_rollback(xid=x)
+ session.dtx_rollback(xid=x)
#validate against the expected list of prepared transactions
actual = set(xids)
@@ -585,61 +587,90 @@ class DtxTests(TestBase):
missing = expected.difference(actual)
extra = actual.difference(expected)
for x in missing:
- channel.dtx_coordination_rollback(xid=x)
+ session.dtx_rollback(xid=x)
self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
def test_bad_resume(self):
"""
Test that a resume on a session not selected for use with dtx fails
"""
- channel = self.channel
+ session = self.session
try:
- channel.dtx_demarcation_start(resume=True)
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
+ session.dtx_start(resume=True)
+ except SessionException, e:
+ self.assertEquals(503, e.args[0].error_code)
def xid(self, txid):
DtxTests.tx_counter += 1
branchqual = "v%s" % DtxTests.tx_counter
- return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
-
+ return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
+
def txswap(self, tx, id):
- channel = self.channel
+ session = self.session
#declare two queues:
- channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="queue-a", auto_delete=True)
+ session.queue_declare(queue="queue-b", auto_delete=True)
+
#put message with specified id on one queue:
- channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage"))
+ dp=session.delivery_properties(routing_key="queue-a")
+ mp=session.message_properties(correlation_id=id)
+ session.message_transfer(message=Message(dp, mp, "DtxMessage"))
#start the transaction:
- channel.dtx_demarcation_select()
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
+ session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
#'swap' the message from one queue to the other, under that transaction:
- self.swap(self.channel, "queue-a", "queue-b")
+ self.swap(self.session, "queue-a", "queue-b")
#mark the end of the transactional work:
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
- def swap(self, channel, src, dest):
+ def swap(self, session, src, dest):
#consume from src:
- channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1)
- channel.message_flow(destination="temp-swap", unit=0, value=1)
- channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
- msg = self.client.queue("temp-swap").get(timeout=1)
- channel.message_cancel(destination="temp-swap")
- msg.complete();
-
- #re-publish to dest
- channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']},
- body=msg.content.body))
+ session.message_subscribe(destination="temp-swap", queue=src)
+ session.message_flow(destination="temp-swap", unit=0, value=1)
+ session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
+ msg = session.incoming("temp-swap").get(timeout=1)
+ session.message_cancel(destination="temp-swap")
+ session.message_accept(RangedSet(msg.id))
+ #todo: also complete at this point?
+
+ #re-publish to dest:
+ dp=session.delivery_properties(routing_key=dest)
+ mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id'))
+ session.message_transfer(message=Message(dp, mp, msg.body))
def assertMessageCount(self, expected, queue):
- self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count)
+ self.assertEqual(expected, self.session.queue_query(queue=queue).message_count)
def assertMessageId(self, expected, queue):
- self.channel.message_subscribe(queue=queue, destination="results")
- self.channel.message_flow(destination="results", unit=0, value=1)
- self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
- self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
- self.channel.message_cancel(destination="results")
+ self.session.message_subscribe(queue=queue, destination="results")
+ self.session.message_flow(destination="results", unit=0, value=1)
+ self.session.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
+ self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id'))
+ self.session.message_cancel(destination="results")
+
+ def getMessageProperty(self, msg, prop):
+ for h in msg.headers:
+ if hasattr(h, prop): return getattr(h, prop)
+ return None
+
+ def keepQueuesAlive(self, names):
+ session = self.conn.session("nasty", 99)
+ for n in names:
+ session.queue_declare(queue=n, auto_delete=True)
+ session.message_subscribe(destination=n, queue=n)
+ return session
+
+ def createMessage(self, session, key, id, body):
+ dp=session.delivery_properties(routing_key=key)
+ mp=session.message_properties(correlation_id=id)
+ session.message_transfer(message=Message(dp, mp, body))
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)