diff options
Diffstat (limited to 'python/tests_0-10/tx.py')
| -rw-r--r-- | python/tests_0-10/tx.py | 46 |
1 files changed, 33 insertions, 13 deletions
diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index 59298bad1b..5aef2b00e8 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -30,23 +30,30 @@ class TxTests(TestBase010): """ Test that commited publishes are delivered and commited acks are not re-delivered """ + session = self.session + + #declare queues and create subscribers in the checking session + #to ensure that the queues are not auto-deleted too early: + self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"]) + session.message_subscribe(queue="tx-commit-a", destination="qa") + session.message_subscribe(queue="tx-commit-b", destination="qb") + session.message_subscribe(queue="tx-commit-c", destination="qc") + + #use a separate session for actual work session2 = self.conn.session("worker", 2) self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c") session2.tx_commit() session2.close() - #use a different session with new subscriptions to ensure - #there is no redelivery of acked messages: - session = self.session session.tx_select() - self.subscribe(session, queue="tx-commit-a", destination="qa") + self.enable_flow("qa") queue_a = session.incoming("qa") - self.subscribe(session, queue="tx-commit-b", destination="qb") + self.enable_flow("qb") queue_b = session.incoming("qb") - self.subscribe(session, queue="tx-commit-c", destination="qc") + self.enable_flow("qc") queue_c = session.incoming("qc") #check results @@ -76,6 +83,12 @@ class TxTests(TestBase010): """ Test that a session closed with an open transaction is effectively rolled back """ + session = self.session + self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"]) + session.message_subscribe(queue="tx-autorollback-a", destination="qa") + session.message_subscribe(queue="tx-autorollback-b", destination="qb") + session.message_subscribe(queue="tx-autorollback-c", destination="qc") + session2 = self.conn.session("worker", 2) queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") @@ -87,16 +100,15 @@ class TxTests(TestBase010): session2.close() - session = self.session session.tx_select() - self.subscribe(session, queue="tx-autorollback-a", destination="qa") + self.enable_flow("qa") queue_a = session.incoming("qa") - self.subscribe(session, queue="tx-autorollback-b", destination="qb") + self.enable_flow("qb") queue_b = session.incoming("qb") - self.subscribe(session, queue="tx-autorollback-c", destination="qc") + self.enable_flow("qc") queue_c = session.incoming("qc") #check results @@ -169,9 +181,7 @@ class TxTests(TestBase010): commit and rollback """ #setup: - session.queue_declare(queue=name_a, exclusive=True, auto_delete=True) - session.queue_declare(queue=name_b, exclusive=True, auto_delete=True) - session.queue_declare(queue=name_c, exclusive=True, auto_delete=True) + self.declare_queues([name_a, name_b, name_c]) key = "my_key_" + name_b topic = "my_topic_" + name_c @@ -232,6 +242,11 @@ class TxTests(TestBase010): session.message_transfer(message=Message(dp, mp, "TxMessage 7")) return queue_a, queue_b, queue_c, acked + def declare_queues(self, names, session=None): + session = session or self.session + for n in names: + session.queue_declare(queue=n, auto_delete=True) + def subscribe(self, session=None, **keys): session = session or self.session consumer_tag = keys["destination"] @@ -239,6 +254,11 @@ class TxTests(TestBase010): session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + def enable_flow(self, tag, session=None): + session = session or self.session + session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF) + def complete(self, session, msg): session.receiver._completed.add(msg.id)#TODO: this may be done automatically session.channel.session_completed(session.receiver._completed) |
