diff options
Diffstat (limited to 'python/tests_0-10/message.py')
-rw-r--r-- | python/tests_0-10/message.py | 52 |
1 files changed, 22 insertions, 30 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index 8089709314..ba26dda309 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -34,8 +34,8 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-1a", exclusive=True) channel.queue_declare(queue="test-queue-1b", exclusive=True) #establish two consumers one of which excludes delivery of locally sent messages - channel.message_subscribe(destination="local_included", queue="test-queue-1a") - channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) + self.subscribe(destination="local_included", queue="test-queue-1a") + self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) @@ -61,9 +61,9 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-2", exclusive=True) #check that an exclusive consumer prevents other consumer being created: - channel.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) + self.subscribe(destination="first", queue="test-queue-2", exclusive=True) try: - channel.message_subscribe(destination="second", queue="test-queue-2") + self.subscribe(destination="second", queue="test-queue-2") self.fail("Expected consume request to fail due to previous exclusive consumer") except Closed, e: self.assertChannelException(403, e.args[0]) @@ -73,9 +73,9 @@ class MessageTests(TestBase): channel.channel_open() #check that an exclusive consumer cannot be created if a consumer already exists: - channel.message_subscribe(destination="first", queue="test-queue-2") + self.subscribe(channel, destination="first", queue="test-queue-2") try: - channel.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) + self.subscribe(destination="second", queue="test-queue-2", exclusive=True) self.fail("Expected exclusive consume request to fail due to previous consumer") except Closed, e: self.assertChannelException(403, e.args[0]) @@ -87,7 +87,7 @@ class MessageTests(TestBase): channel = self.channel try: #queue specified but doesn't exist: - channel.message_subscribe(queue="invalid-queue") + self.subscribe(queue="invalid-queue", destination="") self.fail("Expected failure when consuming from non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -96,7 +96,7 @@ class MessageTests(TestBase): channel.channel_open() try: #queue not specified and none previously declared for channel: - channel.message_subscribe(queue="") + self.subscribe(channel, queue="", destination="") self.fail("Expected failure when consuming from unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -110,9 +110,9 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-3", exclusive=True) #check that attempts to use duplicate tags are detected and prevented: - channel.message_subscribe(destination="first", queue="test-queue-3") + self.subscribe(destination="first", queue="test-queue-3") try: - channel.message_subscribe(destination="first", queue="test-queue-3") + self.subscribe(destination="first", queue="test-queue-3") self.fail("Expected consume request to fail due to non-unique tag") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -124,7 +124,7 @@ class MessageTests(TestBase): channel = self.channel #setup, declare a queue: channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.message_subscribe(destination="my-consumer", queue="test-queue-4") + self.subscribe(destination="my-consumer", queue="test-queue-4") channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) #cancel should stop messages being delivered @@ -150,7 +150,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue="test-ack-queue", exclusive=True) - channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) + self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) @@ -194,7 +194,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue="test-requeue", exclusive=True) - channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) + self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) @@ -225,7 +225,7 @@ class MessageTests(TestBase): #requeue unacked messages (Three and Five) channel.message_recover(requeue=True) - channel.message_subscribe(queue="test-requeue", destination="consumer_tag") + self.subscribe(queue="test-requeue", destination="consumer_tag") queue2 = self.client.queue("consumer_tag") msg3b = queue2.get(timeout=1) @@ -256,7 +256,7 @@ class MessageTests(TestBase): #setup: declare queue and subscribe channel = self.channel channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.message_subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) + subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") #set prefetch to 5: @@ -298,7 +298,7 @@ class MessageTests(TestBase): #setup: declare queue and subscribe channel = self.channel channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.message_subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) + subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") #set prefetch to 50 bytes (each message is 9 or 10 bytes): @@ -345,13 +345,13 @@ class MessageTests(TestBase): channel.queue_declare(queue = "r", exclusive=True) channel.queue_bind(queue = "r", exchange = "amq.fanout") - channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1) + self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) msg = self.client.queue("consumer").get(timeout = 1) self.assertEquals(msg.content.body, "blah, blah") channel.message_reject([msg.command_id, msg.command_id]) - channel.message_subscribe(queue = "r", destination = "checker") + self.subscribe(queue = "r", destination = "checker") msg = self.client.queue("checker").get(timeout = 1) self.assertEquals(msg.content.body, "blah, blah") @@ -365,8 +365,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) @@ -397,8 +395,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) @@ -431,8 +427,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) @@ -465,8 +459,6 @@ class MessageTests(TestBase): #create consumer (for now that defaults to infinite credit) channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") - #set credit to zero (can remove this once move to proper default for subscribe method) - channel.message_stop(destination = "c") #send batch of messages to queue for i in range(1, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) @@ -506,8 +498,8 @@ class MessageTests(TestBase): for i in range(1, 6): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) - channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 1) + self.subscribe(queue = "q", destination = "b", acquire_mode = 1) for i in range(6, 11): channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) @@ -532,7 +524,7 @@ class MessageTests(TestBase): channel.queue_declare(queue = "q", exclusive=True) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) msg = self.client.queue("a").get(timeout = 1) channel.message_acquire([msg.command_id, msg.command_id]) msg.complete() @@ -548,7 +540,7 @@ class MessageTests(TestBase): channel.queue_declare(queue = "q", exclusive=True) channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me")) - channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) + self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) msg = self.client.queue("a").get(timeout = 1) channel.message_cancel(destination = "a") channel.message_release([msg.command_id, msg.command_id]) |