diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/peer.py | 12 | ||||
-rw-r--r-- | python/qpid/testlib.py | 4 | ||||
-rw-r--r-- | python/tests_0-10/alternate-exchange.py | 8 | ||||
-rw-r--r-- | python/tests_0-10/broker.py | 10 | ||||
-rw-r--r-- | python/tests_0-10/dtx.py | 6 | ||||
-rw-r--r-- | python/tests_0-10/example.py | 2 | ||||
-rw-r--r-- | python/tests_0-10/message.py | 64 | ||||
-rw-r--r-- | python/tests_0-10/query.py | 15 | ||||
-rw-r--r-- | python/tests_0-10/queue.py | 27 | ||||
-rw-r--r-- | python/tests_0-10/tx.py | 12 |
10 files changed, 85 insertions, 75 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 6762f774f4..6ad5482f09 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -208,6 +208,8 @@ class Channel: self.responses.close() self.completion.close() self.incoming_completion.reset() + for f in self.futures.values(): + f.put_response(self, reason) def write(self, frame, content = None): if self.closed: @@ -324,7 +326,10 @@ class Channel: raise ValueError(resp) elif frame.method.result: if self.synchronous: - return future.get_response(timeout=10) + fr = future.get_response(timeout=10) + if self.closed: + raise Closed(self.reason) + return fr else: return future elif self.synchronous and not frame.method.response \ @@ -373,7 +378,10 @@ class Future: def get_response(self, timeout=None): self.completed.wait(timeout) - return self.response + if self.completed.isSet(): + return self.response + else: + return None def is_complete(self): return self.completed.isSet() diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 8a3abf840b..6820ae3bae 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -228,7 +228,7 @@ class TestBase(unittest.TestCase): def queue_declare(self, channel=None, *args, **keys): channel = channel or self.channel reply = channel.queue_declare(*args, **keys) - self.queues.append((channel, reply.queue)) + self.queues.append((channel, keys["queue"])) return reply def exchange_declare(self, channel=None, ticket=0, exchange='', @@ -254,7 +254,7 @@ class TestBase(unittest.TestCase): if not "uniqueTag" in dir(self): self.uniqueTag = 1 else: self.uniqueTag += 1 consumer_tag = "tag" + str(self.uniqueTag) - self.channel.message_consume(queue=queueName, destination=consumer_tag, no_ack=True) + self.channel.message_subscribe(queue=queueName, destination=consumer_tag) return self.client.queue(consumer_tag) def assertEmpty(self, queue): diff --git a/python/tests_0-10/alternate-exchange.py b/python/tests_0-10/alternate-exchange.py index 19405a1c9f..a1c6151fca 100644 --- a/python/tests_0-10/alternate-exchange.py +++ b/python/tests_0-10/alternate-exchange.py @@ -39,13 +39,13 @@ class AlternateExchangeTests(TestBase): #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages channel.queue_declare(queue="returns", exclusive=True) channel.queue_bind(queue="returns", exchange="secondary") - channel.message_consume(destination="a", queue="returns") + channel.message_subscribe(destination="a", queue="returns") returned = self.client.queue("a") #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages channel.queue_declare(queue="processed", exclusive=True) channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key") - channel.message_consume(destination="b", queue="processed") + channel.message_subscribe(destination="b", queue="processed") processed = self.client.queue("b") #publish to the primary exchange @@ -73,7 +73,7 @@ class AlternateExchangeTests(TestBase): channel.exchange_declare(exchange="dlq", type="fanout") channel.queue_declare(queue="deleted", exclusive=True) channel.queue_bind(exchange="dlq", queue="deleted") - channel.message_consume(destination="dlq", queue="deleted") + channel.message_subscribe(destination="dlq", queue="deleted") dlq = self.client.queue("dlq") #create a queue using the dlq as its alternate exchange: @@ -103,7 +103,7 @@ class AlternateExchangeTests(TestBase): channel.exchange_declare(exchange="dlq", type="fanout") channel.queue_declare(queue="immediate", exclusive=True) channel.queue_bind(exchange="dlq", queue="immediate") - channel.message_consume(destination="dlq", queue="immediate") + channel.message_subscribe(destination="dlq", queue="immediate") dlq = self.client.queue("dlq") #create a queue using the dlq as its alternate exchange: diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 6bc2f7ceb8..647f5d4fa5 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -35,7 +35,7 @@ class BrokerTests(TestBase): # No ack consumer ctag = "tag1" - ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True) + ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0) body = "test no-ack" ch.message_transfer(routing_key = "myqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) @@ -44,7 +44,7 @@ class BrokerTests(TestBase): # Acknowledging consumer self.queue_declare(ch, queue = "otherqueue") ctag = "tag2" - ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False) + ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1) body = "test ack" ch.message_transfer(routing_key = "otherqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) @@ -60,7 +60,7 @@ class BrokerTests(TestBase): self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") consumer_tag = "tag1" - channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) + channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0) queue = self.client.queue(consumer_tag) body = "Immediate Delivery" @@ -84,7 +84,7 @@ class BrokerTests(TestBase): channel.message_transfer(destination="test-exchange", routing_key="key", body=body) consumer_tag = "tag1" - channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True) + channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0) queue = self.client.queue(consumer_tag) msg = queue.get(timeout=5) self.assert_(msg.body == body) @@ -111,7 +111,7 @@ class BrokerTests(TestBase): def test_channel_flow(self): channel = self.channel channel.queue_declare(queue="flow_test_queue", exclusive=True) - channel.message_consume(destination="my-tag", queue="flow_test_queue") + channel.message_subscribe(destination="my-tag", queue="flow_test_queue") incoming = self.client.queue("my-tag") channel.channel_flow(active=False) diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index 2835d703ae..a5b53ac65b 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -366,7 +366,7 @@ class DtxTests(TestBase): #check the second message is available, but not the first self.assertMessageCount(1, "tx-queue") - channel.message_consume(queue="tx-queue", destination="results", no_ack=False) + channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1) msg = self.client.queue("results").get(timeout=1) self.assertEqual("two", msg.message_id) channel.message_cancel(destination="results") @@ -597,9 +597,9 @@ class DtxTests(TestBase): channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) def assertMessageCount(self, expected, queue): - self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count) + self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count) def assertMessageId(self, expected, queue): - self.channel.message_consume(queue=queue, destination="results", no_ack=True) + self.channel.message_subscribe(queue=queue, destination="results") self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) self.channel.message_cancel(destination="results") diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py index dc71b0590b..e4c80951ac 100644 --- a/python/tests_0-10/example.py +++ b/python/tests_0-10/example.py @@ -68,7 +68,7 @@ class ExampleTest (TestBase): # has fields corresponding to the reply method fields, plus a content # field that is filled if the reply includes content. In this case the # interesting field is the consumer_tag. - channel.message_consume(queue="test-queue", destination="consumer_tag") + channel.message_subscribe(queue="test-queue", destination="consumer_tag") # We can use the Client.queue(...) method to access the queue # corresponding to our consumer_tag. diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index b882cd5438..6cf2f3ef89 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -34,8 +34,8 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-1a", exclusive=True) channel.queue_declare(queue="test-queue-1b", exclusive=True) #establish two consumers one of which excludes delivery of locally sent messages - channel.message_consume(destination="local_included", queue="test-queue-1a") - channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True) + channel.message_subscribe(destination="local_included", queue="test-queue-1a") + channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local") @@ -61,9 +61,9 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-2", exclusive=True) #check that an exclusive consumer prevents other consumer being created: - channel.message_consume(destination="first", queue="test-queue-2", exclusive=True) + channel.message_subscribe(destination="first", queue="test-queue-2", exclusive=True) try: - channel.message_consume(destination="second", queue="test-queue-2") + channel.message_subscribe(destination="second", queue="test-queue-2") self.fail("Expected consume request to fail due to previous exclusive consumer") except Closed, e: self.assertChannelException(403, e.args[0]) @@ -73,9 +73,9 @@ class MessageTests(TestBase): channel.channel_open() #check that an exclusive consumer cannot be created if a consumer already exists: - channel.message_consume(destination="first", queue="test-queue-2") + channel.message_subscribe(destination="first", queue="test-queue-2") try: - channel.message_consume(destination="second", queue="test-queue-2", exclusive=True) + channel.message_subscribe(destination="second", queue="test-queue-2", exclusive=True) self.fail("Expected exclusive consume request to fail due to previous consumer") except Closed, e: self.assertChannelException(403, e.args[0]) @@ -87,7 +87,7 @@ class MessageTests(TestBase): channel = self.channel try: #queue specified but doesn't exist: - channel.message_consume(queue="invalid-queue") + channel.message_subscribe(queue="invalid-queue") self.fail("Expected failure when consuming from non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -96,7 +96,7 @@ class MessageTests(TestBase): channel.channel_open() try: #queue not specified and none previously declared for channel: - channel.message_consume(queue="") + channel.message_subscribe(queue="") self.fail("Expected failure when consuming from unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -110,9 +110,9 @@ class MessageTests(TestBase): channel.queue_declare(queue="test-queue-3", exclusive=True) #check that attempts to use duplicate tags are detected and prevented: - channel.message_consume(destination="first", queue="test-queue-3") + channel.message_subscribe(destination="first", queue="test-queue-3") try: - channel.message_consume(destination="first", queue="test-queue-3") + channel.message_subscribe(destination="first", queue="test-queue-3") self.fail("Expected consume request to fail due to non-unique tag") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -124,7 +124,7 @@ class MessageTests(TestBase): channel = self.channel #setup, declare a queue: channel.queue_declare(queue="test-queue-4", exclusive=True) - channel.message_consume(destination="my-consumer", queue="test-queue-4") + channel.message_subscribe(destination="my-consumer", queue="test-queue-4") channel.message_transfer(routing_key="test-queue-4", body="One") #cancel should stop messages being delivered @@ -150,7 +150,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue="test-ack-queue", exclusive=True) - channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False) + channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") channel.message_transfer(routing_key="test-ack-queue", body="One") @@ -194,7 +194,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue="test-requeue", exclusive=True) - channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False) + channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") channel.message_transfer(routing_key="test-requeue", body="One") @@ -225,7 +225,7 @@ class MessageTests(TestBase): #requeue unacked messages (Three and Five) channel.message_recover(requeue=True) - channel.message_consume(queue="test-requeue", destination="consumer_tag") + channel.message_subscribe(queue="test-requeue", destination="consumer_tag") queue2 = self.client.queue("consumer_tag") msg3b = queue2.get(timeout=1) @@ -256,7 +256,7 @@ class MessageTests(TestBase): #setup: declare queue and subscribe channel = self.channel channel.queue_declare(queue="test-prefetch-count", exclusive=True) - subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False) + subscription = channel.message_subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") #set prefetch to 5: @@ -298,7 +298,7 @@ class MessageTests(TestBase): #setup: declare queue and subscribe channel = self.channel channel.queue_declare(queue="test-prefetch-size", exclusive=True) - subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False) + subscription = channel.message_subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) queue = self.client.queue("consumer_tag") #set prefetch to 50 bytes (each message is 9 or 10 bytes): @@ -362,13 +362,13 @@ class MessageTests(TestBase): self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "empty") - #repeat for no_ack=False + #repeat for confirm_mode=1 for i in range(11, 21): channel.message_transfer(routing_key="test-get", body="Message %d" % i) for i in range(11, 21): tag = "queue %d" % i - reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) + reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag) self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "ok") msg = self.client.queue(tag).get(timeout=1) @@ -389,7 +389,7 @@ class MessageTests(TestBase): #get the unacked messages again (14, 16, 18, 20) for i in [14, 16, 18, 20]: tag = "queue %d" % i - reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) + reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag) self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "ok") msg = self.client.queue(tag).get(timeout=1) @@ -412,7 +412,7 @@ class MessageTests(TestBase): """ channel = self.channel channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") + channel.message_subscribe(queue="ref_queue", destination="c1") queue = self.client.queue("c1") refId = "myref" @@ -454,7 +454,7 @@ class MessageTests(TestBase): other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0}) ch2 = other.channel(1) ch2.channel_open() - ch2.message_consume(queue="ref_queue", destination="c1") + ch2.message_subscribe(queue="ref_queue", destination="c1") queue = other.queue("c1") msg = queue.get(timeout=1) @@ -469,7 +469,7 @@ class MessageTests(TestBase): """ channel = self.channel channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") + channel.message_subscribe(queue="ref_queue", destination="c1") queue = self.client.queue("c1") refId = "myref" @@ -502,8 +502,8 @@ class MessageTests(TestBase): #declare and consume from two queues channel.queue_declare(queue="q-one", exclusive=True) channel.queue_declare(queue="q-two", exclusive=True) - channel.message_consume(queue="q-one", destination="q-one") - channel.message_consume(queue="q-two", destination="q-two") + channel.message_subscribe(queue="q-one", destination="q-one") + channel.message_subscribe(queue="q-two", destination="q-two") queue1 = self.client.queue("q-one") queue2 = self.client.queue("q-two") @@ -590,7 +590,7 @@ class MessageTests(TestBase): def test_empty_reference(self): channel = self.channel channel.queue_declare(queue="ref_queue", exclusive=True) - channel.message_consume(queue="ref_queue", destination="c1") + channel.message_subscribe(queue="ref_queue", destination="c1") queue = self.client.queue("c1") refId = "myref" @@ -611,14 +611,14 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue = "q", exclusive=True) - channel.message_consume(queue = "q", destination = "consumer") + channel.message_subscribe(queue = "q", destination = "consumer") channel.message_transfer(routing_key = "q", body="blah, blah") msg = self.client.queue("consumer").get(timeout = 1) self.assertEquals(msg.body, "blah, blah") channel.message_cancel(destination = "consumer") msg.reject() - channel.message_consume(queue = "q", destination = "checker") + channel.message_subscribe(queue = "q", destination = "checker") msg = self.client.queue("checker").get(timeout = 1) self.assertEquals(msg.body, "blah, blah") @@ -634,7 +634,7 @@ class MessageTests(TestBase): channel = self.client.channel(2) channel.channel_open() - channel.message_consume(queue = "q", destination = "consumer") + channel.message_subscribe(queue = "q", destination = "consumer") offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value self.assertTrue(offset<=16) channel.message_append(reference="my-ref", bytes="qrstuvwxyz") @@ -654,7 +654,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue = "q", exclusive=True) #create consumer (for now that defaults to infinite credit) - channel.message_consume(queue = "q", destination = "c") + channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") #set credit to zero (can remove this once move to proper default for subscribe method) channel.message_stop(destination = "c") @@ -686,7 +686,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue = "q", exclusive=True) #create consumer (for now that defaults to infinite credit) - channel.message_consume(queue = "q", destination = "c") + channel.message_subscribe(queue = "q", destination = "c") channel.message_flow_mode(mode = 0, destination = "c") #set credit to zero (can remove this once move to proper default for subscribe method) channel.message_stop(destination = "c") @@ -720,7 +720,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue = "q", exclusive=True) #create consumer (for now that defaults to infinite credit) - channel.message_consume(queue = "q", destination = "c") + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") #set credit to zero (can remove this once move to proper default for subscribe method) channel.message_stop(destination = "c") @@ -754,7 +754,7 @@ class MessageTests(TestBase): channel = self.channel channel.queue_declare(queue = "q", exclusive=True) #create consumer (for now that defaults to infinite credit) - channel.message_consume(queue = "q", destination = "c") + channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) channel.message_flow_mode(mode = 1, destination = "c") #set credit to zero (can remove this once move to proper default for subscribe method) channel.message_stop(destination = "c") diff --git a/python/tests_0-10/query.py b/python/tests_0-10/query.py index c2e08c003c..06f33be85b 100644 --- a/python/tests_0-10/query.py +++ b/python/tests_0-10/query.py @@ -30,16 +30,16 @@ class QueryTests(TestBase): """ channel = self.channel #check returned type for the standard exchanges - self.assertEqual("direct", channel.exchange_query(name="amq.direct").type) - self.assertEqual("topic", channel.exchange_query(name="amq.topic").type) - self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type) - self.assertEqual("headers", channel.exchange_query(name="amq.match").type) - self.assertEqual("direct", channel.exchange_query(name="").type) + self.assert_type("direct", channel.exchange_query(name="amq.direct")) + self.assert_type("topic", channel.exchange_query(name="amq.topic")) + self.assert_type("fanout", channel.exchange_query(name="amq.fanout")) + self.assert_type("headers", channel.exchange_query(name="amq.match")) + self.assert_type("direct", channel.exchange_query(name="")) #declare an exchange channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False) #check that the result of a query is as expected response = channel.exchange_query(name="my-test-exchange") - self.assertEqual("direct", response.type) + self.assert_type("direct", response) self.assertEqual(False, response.durable) self.assertEqual(False, response.not_found) #delete the exchange @@ -47,6 +47,9 @@ class QueryTests(TestBase): #check that the query now reports not-found self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found) + def assert_type(self, expected_type, response): + self.assertEqual(expected_type, response.__getattr__("type")) + def test_binding_query_direct(self): """ Test that the binding_query method works as expected with the direct exchange diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index e7fe0b3ed4..8d99c50d32 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -38,18 +38,18 @@ class QueueTests(TestBase): channel.message_transfer(destination="test-exchange", routing_key="key", body="three") #check that the queue now reports 3 messages: - reply = channel.queue_declare(queue="test-queue") + channel.queue_declare(queue="test-queue") + reply = channel.queue_query(queue="test-queue") self.assertEqual(3, reply.message_count) #now do the purge, then test that three messages are purged and the count drops to 0 - reply = channel.queue_purge(queue="test-queue"); - self.assertEqual(3, reply.message_count) - reply = channel.queue_declare(queue="test-queue") - self.assertEqual(0, reply.message_count) + channel.queue_purge(queue="test-queue"); + reply = channel.queue_query(queue="test-queue") + self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone channel.message_transfer(destination="test-exchange", routing_key="key", body="four") - channel.message_consume(queue="test-queue", destination="tag", no_ack=True) + channel.message_subscribe(queue="test-queue", destination="tag") queue = self.client.queue("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.body) @@ -169,8 +169,8 @@ class QueueTests(TestBase): channel.queue_declare(queue="queue-1", exclusive="True") channel.queue_declare(queue="queue-2", exclusive="True") - channel.message_consume(queue="queue-1", destination="queue-1", no_ack=True) - channel.message_consume(queue="queue-2", destination="queue-2", no_ack=True) + channel.message_subscribe(queue="queue-1", destination="queue-1") + channel.message_subscribe(queue="queue-2", destination="queue-2") queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") @@ -213,8 +213,7 @@ class QueueTests(TestBase): channel.message_transfer(routing_key="delete-me", body="a") channel.message_transfer(routing_key="delete-me", body="b") channel.message_transfer(routing_key="delete-me", body="c") - reply = channel.queue_delete(queue="delete-me") - self.assertEqual(3, reply.message_count) + channel.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: channel.queue_declare(queue="delete-me", passive="True") @@ -256,7 +255,7 @@ class QueueTests(TestBase): channel.channel_open() #empty queue: - channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True) + channel.message_subscribe(destination="consumer_tag", queue="delete-me-2") queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.body) @@ -281,7 +280,7 @@ class QueueTests(TestBase): #create a queue and register a consumer: channel.queue_declare(queue="delete-me-3") channel.queue_declare(queue="delete-me-3", passive="True") - channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True) + channel.message_subscribe(destination="consumer_tag", queue="delete-me-3") #need new channel now: channel2 = self.client.channel(2) @@ -316,8 +315,8 @@ class QueueTests(TestBase): channel.queue_declare(queue="auto-delete-me", auto_delete=True) #consume from both channels - reply = channel.basic_consume(queue="auto-delete-me", no_ack=True) - channel2.basic_consume(queue="auto-delete-me", no_ack=True) + reply = channel.basic_consume(queue="auto-delete-me") + channel2.basic_consume(queue="auto-delete-me") #implicit cancel channel2.channel_close() diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py index b499c2d1f9..4c2f75d35e 100644 --- a/python/tests_0-10/tx.py +++ b/python/tests_0-10/tx.py @@ -41,13 +41,13 @@ class TxTests(TestBase): channel = self.channel channel.tx_select() - channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False) + channel.message_subscribe(queue="tx-commit-a", destination="qa", confirm_mode=1) queue_a = self.client.queue("qa") - channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False) + channel.message_subscribe(queue="tx-commit-b", destination="qb", confirm_mode=1) queue_b = self.client.queue("qb") - channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False) + channel.message_subscribe(queue="tx-commit-c", destination="qc", confirm_mode=1) queue_c = self.client.queue("qc") #check results @@ -174,7 +174,7 @@ class TxTests(TestBase): channel.tx_select() #consume and ack messages - channel.message_consume(queue=name_a, destination="sub_a", no_ack=False) + channel.message_subscribe(queue=name_a, destination="sub_a", confirm_mode=1) queue_a = self.client.queue("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) @@ -182,13 +182,13 @@ class TxTests(TestBase): msg.complete() - channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) + channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) msg.complete() - sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) + sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) |