diff options
Diffstat (limited to 'qpid/python')
-rw-r--r-- | qpid/python/cpp_failing_0-10.txt | 1 | ||||
-rw-r--r-- | qpid/python/qpid/testlib.py | 9 | ||||
-rw-r--r-- | qpid/python/tests_0-10/alternate-exchange.py | 8 | ||||
-rw-r--r-- | qpid/python/tests_0-10/broker.py | 12 | ||||
-rw-r--r-- | qpid/python/tests_0-10/dtx.py | 4 | ||||
-rw-r--r-- | qpid/python/tests_0-10/example.py | 2 | ||||
-rw-r--r-- | qpid/python/tests_0-10/message.py | 52 | ||||
-rw-r--r-- | qpid/python/tests_0-10/queue.py | 10 | ||||
-rw-r--r-- | qpid/python/tests_0-10/tx.py | 12 |
9 files changed, 58 insertions, 52 deletions
diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt index 5b2fb593e1..878afee3c5 100644 --- a/qpid/python/cpp_failing_0-10.txt +++ b/qpid/python/cpp_failing_0-10.txt @@ -1,3 +1,2 @@ tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate -tests_0-10.basic.BasicTests.test_get diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 28c07ba43a..c2e3024a7e 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -259,8 +259,17 @@ class TestBase(unittest.TestCase): else: self.uniqueTag += 1 consumer_tag = "tag" + str(self.uniqueTag) self.channel.message_subscribe(queue=queueName, destination=consumer_tag) + self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) return self.client.queue(consumer_tag) + def subscribe(self, channel=None, **keys): + channel = channel or self.channel + consumer_tag = keys["destination"] + channel.message_subscribe(**keys) + channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + def assertEmpty(self, queue): """Assert that the queue is empty""" try: diff --git a/qpid/python/tests_0-10/alternate-exchange.py b/qpid/python/tests_0-10/alternate-exchange.py index d6ac62ccfe..a749d733b0 100644 --- a/qpid/python/tests_0-10/alternate-exchange.py +++ b/qpid/python/tests_0-10/alternate-exchange.py @@ -39,13 +39,13 @@ class AlternateExchangeTests(TestBase): #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages channel.queue_declare(queue="returns", exclusive=True) channel.queue_bind(queue="returns", exchange="secondary") - channel.message_subscribe(destination="a", queue="returns") + self.subscribe(destination="a", queue="returns") returned = self.client.queue("a") #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages channel.queue_declare(queue="processed", exclusive=True) channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key") - channel.message_subscribe(destination="b", queue="processed") + self.subscribe(destination="b", queue="processed") processed = self.client.queue("b") #publish to the primary exchange @@ -73,7 +73,7 @@ class AlternateExchangeTests(TestBase): channel.exchange_declare(exchange="dlq", type="fanout") channel.queue_declare(queue="deleted", exclusive=True) channel.queue_bind(exchange="dlq", queue="deleted") - channel.message_subscribe(destination="dlq", queue="deleted") + self.subscribe(destination="dlq", queue="deleted") dlq = self.client.queue("dlq") #create a queue using the dlq as its alternate exchange: @@ -103,7 +103,7 @@ class AlternateExchangeTests(TestBase): channel.exchange_declare(exchange="dlq", type="fanout") channel.queue_declare(queue="immediate", exclusive=True) channel.queue_bind(exchange="dlq", queue="immediate") - channel.message_subscribe(destination="dlq", queue="immediate") + self.subscribe(destination="dlq", queue="immediate") dlq = self.client.queue("dlq") #create a queue using the dlq as its alternate exchange: diff --git a/qpid/python/tests_0-10/broker.py b/qpid/python/tests_0-10/broker.py index 0eb71287ec..0df7eb09fa 100644 --- a/qpid/python/tests_0-10/broker.py +++ b/qpid/python/tests_0-10/broker.py @@ -35,7 +35,7 @@ class BrokerTests(TestBase): # No ack consumer ctag = "tag1" - ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0) + self.subscribe(ch, queue = "myqueue", destination = ctag) body = "test no-ack" ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"})) msg = self.client.queue(ctag).get(timeout = 5) @@ -44,7 +44,9 @@ class BrokerTests(TestBase): # Acknowledging consumer self.queue_declare(ch, queue = "otherqueue") ctag = "tag2" - ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1) + self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1) + ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) + ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) body = "test ack" ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"})) msg = self.client.queue(ctag).get(timeout = 5) @@ -60,7 +62,7 @@ class BrokerTests(TestBase): self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") consumer_tag = "tag1" - channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0) + self.subscribe(queue="test-queue", destination=consumer_tag) queue = self.client.queue(consumer_tag) body = "Immediate Delivery" @@ -84,7 +86,7 @@ class BrokerTests(TestBase): channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"})) consumer_tag = "tag1" - channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0) + self.subscribe(queue="test-queue", destination=consumer_tag) queue = self.client.queue(consumer_tag) msg = queue.get(timeout=5) self.assert_(msg.content.body == body) @@ -111,7 +113,7 @@ class BrokerTests(TestBase): def test_channel_flow(self): channel = self.channel channel.queue_declare(queue="flow_test_queue", exclusive=True) - channel.message_subscribe(destination="my-tag", queue="flow_test_queue") + self.subscribe(destination="my-tag", queue="flow_test_queue") incoming = self.client.queue("my-tag") channel.channel_flow(active=False) diff --git a/qpid/python/tests_0-10/dtx.py b/qpid/python/tests_0-10/dtx.py index 29a4d3bf0d..b5645cb596 100644 --- a/qpid/python/tests_0-10/dtx.py +++ b/qpid/python/tests_0-10/dtx.py @@ -366,7 +366,7 @@ class DtxTests(TestBase): #check the second message is available, but not the first self.assertMessageCount(1, "tx-queue") - channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1) + self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1) msg = self.client.queue("results").get(timeout=1) self.assertEqual("two", msg.content['message_id']) channel.message_cancel(destination="results") @@ -602,5 +602,7 @@ class DtxTests(TestBase): def assertMessageId(self, expected, queue): self.channel.message_subscribe(queue=queue, destination="results") + self.channel.message_flow(destination="results", unit=0, value=1) + self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF) self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id']) self.channel.message_cancel(destination="results") diff --git a/qpid/python/tests_0-10/example.py b/qpid/python/tests_0-10/example.py index e3e2c3b095..9dbe73e3cb 100644 --- a/qpid/python/tests_0-10/example.py +++ b/qpid/python/tests_0-10/example.py @@ -69,6 +69,8 @@ class ExampleTest (TestBase): # field that is filled if the reply includes content. In this case the # interesting field is the consumer_tag. channel.message_subscribe(queue="test-queue", destination="consumer_tag") + channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) + channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) # We can use the Client.queue(...) method to access the queue # corresponding to our consumer_tag. diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index 8089709314..ba26dda309 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -34,8 +34,8 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-1a", exclusive=True) channel.queue_declare(queue="test-queue-1b", exclusive=True) #establish two consumers one of which excludes delivery of locally sent messages - channel.message_subscribe(destination="local_included", queue="test-queue-1a") - channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) @@ -61,9 +61,9 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-2", exclusive=True) #check that an exclusive consumer prevents other consumer being created: - channel.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) + self.subscribe(destination="first", queue="test-queue-2", exclusive=True) try: - channel.message_subscribe(destination="second", queue="test-queue-2") + self.subscribe(destination="second", queue="test-queue-2") self.fail("Expected consume request to fail due to previous exclusive consumer") except Closed, e: self.assertChannelException(403, e.args[0]) @@ -73,9 +73,9 @@ class MessageTests(TestBase): channel.channel_open() #check that an exclusive consumer cannot be created if a consumer already exists: - channel.message_subscribe(destination="first", queue="test-queue-2") + self.subscribe(channel, destination="first", queue="test-queue-2") try: - channel.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) + self.subscribe(destination="second", queue="test-queue-2", exclusive=True) self.fail("Expected exclusive consume request to fail due to previous consumer") except Closed, e: self.assertChannelException(403, e.args[0]) @@ -87,7 +87,7 @@ class MessageTests(TestBase): channel = self.channel try: #queue specified but doesn't exist: - channel.message_subscribe(queue="invalid-queue") + self.subscribe(queue="invalid-queue", destination="") self.fail("Expected failure when consuming from non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -96,7 +96,7 @@ class MessageTests(TestBase): channel.channel_open() try: #queue not specified and none previously declared for channel: - channel.message_subscribe(queue="") + self.subscribe(channel, queue="", destination="") self.fail("Expected failure when consuming from unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -110,9 +110,9 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-3", exclusive=True) #check that attempts to use duplicate tags are detected and prevented: - channel.message_subscribe(destination="first", queue="test-queue-3") + self.subscribe(destination="first", queue="test-queue-3") try: - channel.message_subscribe(destination="first", queue="test-queue-3") + self.subscribe(destination="first", queue="test-queue-3") self.fail("Expected consume request to fail due to non-unique tag") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -124,7 +124,7 @@ class MessageTests(TestBase): channel = self.channel #setup, declare a queue: channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.message_subscribe(destination="my-consumer", queue="test-queue-4") + self.subscribe(destination="my-consumer", queue="test-queue-4") channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) #cancel should stop messages being delivered @@ -150,7 +150,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue="test-ack-queue", exclusive=True) - channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) + self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) @@ -194,7 +194,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue="test-requeue", exclusive=True) - channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) + self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) @@ -225,7 +225,7 @@ class MessageTests(TestBase): #requeue unacked messages (Three and Five) channel.message_recover(requeue=True) - channel.message_subscribe(queue="test-requeue", destination="consumer_tag") + self.subscribe(queue="test-requeue", destination="consumer_tag") queue2 = self.client.queue("consumer_tag") msg3b = queue2.get(timeout=1) @@ -256,7 +256,7 @@ class MessageTests(TestBase): #setup: declare queue and subscribe channel = self.channel channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.message_subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) + subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") #set prefetch to 5: @@ -298,7 +298,7 @@ class MessageTests(TestBase): #setup: declare queue and subscribe channel = self.channel channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.message_subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) + subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") #set prefetch to 50 bytes (each message is 9 or 10 bytes): @@ -345,13 +345,13 @@ class MessageTests(TestBase): channel.queue_declare(queue = "r", exclusive=True) channel.queue_bind(queue = "r", exchange = "amq.fanout") - channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1) + self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) msg = self.client.queue("consumer").get(timeout = 1) self.assertEquals(msg.content.body, "blah, blah") channel.message_reject([msg.command_id, msg.command_id]) - channel.message_subscribe(queue = "r", destination = "checker") + self.subscribe(queue = "r", destination = "checker") msg = self.client.queue("checker").get(timeout = 1) self.assertEquals(msg.content.body, "blah, blah") @@ -365,8 +365,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) @@ -397,8 +395,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) @@ -431,8 +427,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) @@ -465,8 +459,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) @@ -506,8 +498,8 @@ class MessageTests(TestBase): for i in range(1, 6): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 1) + self.subscribe(queue = "q", destination = "b", acquire_mode = 1) for i in range(6, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) @@ -532,7 +524,7 @@ class MessageTests(TestBase): channel.queue_declare(queue = "q", exclusive=True) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) msg = self.client.queue("a").get(timeout = 1) channel.message_acquire([msg.command_id, msg.command_id]) msg.complete() @@ -548,7 +540,7 @@ class MessageTests(TestBase): channel.queue_declare(queue = "q", exclusive=True) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me")) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) msg = self.client.queue("a").get(timeout = 1) channel.message_cancel(destination = "a") channel.message_release([msg.command_id, msg.command_id]) diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py index 05fa1aebc6..e3438116c8 100644 --- a/qpid/python/tests_0-10/queue.py +++ b/qpid/python/tests_0-10/queue.py @@ -49,7 +49,7 @@ class QueueTests(TestBase): #send a further message and consume it, ensuring that the other messages are really gone channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"})) - channel.message_subscribe(queue="test-queue", destination="tag") + self.subscribe(queue="test-queue", destination="tag") queue = self.client.queue("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.content.body) @@ -169,8 +169,8 @@ class QueueTests(TestBase): channel.queue_declare(queue="queue-1", exclusive="True") channel.queue_declare(queue="queue-2", exclusive="True") - channel.message_subscribe(queue="queue-1", destination="queue-1") - channel.message_subscribe(queue="queue-2", destination="queue-2") + self.subscribe(queue="queue-1", destination="queue-1") + self.subscribe(queue="queue-2", destination="queue-2") queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") @@ -257,7 +257,7 @@ class QueueTests(TestBase): channel.channel_open() #empty queue: - channel.message_subscribe(destination="consumer_tag", queue="delete-me-2") + self.subscribe(channel, destination="consumer_tag", queue="delete-me-2") queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.content.body) @@ -282,7 +282,7 @@ class QueueTests(TestBase): #create a queue and register a consumer: channel.queue_declare(queue="delete-me-3") channel.queue_declare(queue="delete-me-3", passive="True") - channel.message_subscribe(destination="consumer_tag", queue="delete-me-3") + self.subscribe(destination="consumer_tag", queue="delete-me-3") #need new channel now: channel2 = self.client.channel(2) diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py index 7c50de4ee2..2415a88fb2 100644 --- a/qpid/python/tests_0-10/tx.py +++ b/qpid/python/tests_0-10/tx.py @@ -41,13 +41,13 @@ class TxTests(TestBase): channel = self.channel channel.tx_select() - channel.message_subscribe(queue="tx-commit-a", destination="qa", confirm_mode=1) + self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1) queue_a = self.client.queue("qa") - channel.message_subscribe(queue="tx-commit-b", destination="qb", confirm_mode=1) + self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1) queue_b = self.client.queue("qb") - channel.message_subscribe(queue="tx-commit-c", destination="qc", confirm_mode=1) + self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1) queue_c = self.client.queue("qc") #check results @@ -176,7 +176,7 @@ class TxTests(TestBase): channel.tx_select() #consume and ack messages - channel.message_subscribe(queue=name_a, destination="sub_a", confirm_mode=1) + self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1) queue_a = self.client.queue("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) @@ -184,13 +184,13 @@ class TxTests(TestBase): msg.complete() - channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1) + self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.content.body) msg.complete() - sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1) + sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.content.body) |