summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/qpid/peer.py12
-rw-r--r--python/qpid/testlib.py4
-rw-r--r--python/tests_0-10/alternate-exchange.py8
-rw-r--r--python/tests_0-10/broker.py10
-rw-r--r--python/tests_0-10/dtx.py6
-rw-r--r--python/tests_0-10/example.py2
-rw-r--r--python/tests_0-10/message.py64
-rw-r--r--python/tests_0-10/query.py15
-rw-r--r--python/tests_0-10/queue.py27
-rw-r--r--python/tests_0-10/tx.py12
10 files changed, 85 insertions, 75 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 6762f774f4..6ad5482f09 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -208,6 +208,8 @@ class Channel:
self.responses.close()
self.completion.close()
self.incoming_completion.reset()
+ for f in self.futures.values():
+ f.put_response(self, reason)
def write(self, frame, content = None):
if self.closed:
@@ -324,7 +326,10 @@ class Channel:
raise ValueError(resp)
elif frame.method.result:
if self.synchronous:
- return future.get_response(timeout=10)
+ fr = future.get_response(timeout=10)
+ if self.closed:
+ raise Closed(self.reason)
+ return fr
else:
return future
elif self.synchronous and not frame.method.response \
@@ -373,7 +378,10 @@ class Future:
def get_response(self, timeout=None):
self.completed.wait(timeout)
- return self.response
+ if self.completed.isSet():
+ return self.response
+ else:
+ return None
def is_complete(self):
return self.completed.isSet()
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 8a3abf840b..6820ae3bae 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -228,7 +228,7 @@ class TestBase(unittest.TestCase):
def queue_declare(self, channel=None, *args, **keys):
channel = channel or self.channel
reply = channel.queue_declare(*args, **keys)
- self.queues.append((channel, reply.queue))
+ self.queues.append((channel, keys["queue"]))
return reply
def exchange_declare(self, channel=None, ticket=0, exchange='',
@@ -254,7 +254,7 @@ class TestBase(unittest.TestCase):
if not "uniqueTag" in dir(self): self.uniqueTag = 1
else: self.uniqueTag += 1
consumer_tag = "tag" + str(self.uniqueTag)
- self.channel.message_consume(queue=queueName, destination=consumer_tag, no_ack=True)
+ self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
return self.client.queue(consumer_tag)
def assertEmpty(self, queue):
diff --git a/python/tests_0-10/alternate-exchange.py b/python/tests_0-10/alternate-exchange.py
index 19405a1c9f..a1c6151fca 100644
--- a/python/tests_0-10/alternate-exchange.py
+++ b/python/tests_0-10/alternate-exchange.py
@@ -39,13 +39,13 @@ class AlternateExchangeTests(TestBase):
#declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
channel.queue_declare(queue="returns", exclusive=True)
channel.queue_bind(queue="returns", exchange="secondary")
- channel.message_consume(destination="a", queue="returns")
+ channel.message_subscribe(destination="a", queue="returns")
returned = self.client.queue("a")
#declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
channel.queue_declare(queue="processed", exclusive=True)
channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key")
- channel.message_consume(destination="b", queue="processed")
+ channel.message_subscribe(destination="b", queue="processed")
processed = self.client.queue("b")
#publish to the primary exchange
@@ -73,7 +73,7 @@ class AlternateExchangeTests(TestBase):
channel.exchange_declare(exchange="dlq", type="fanout")
channel.queue_declare(queue="deleted", exclusive=True)
channel.queue_bind(exchange="dlq", queue="deleted")
- channel.message_consume(destination="dlq", queue="deleted")
+ channel.message_subscribe(destination="dlq", queue="deleted")
dlq = self.client.queue("dlq")
#create a queue using the dlq as its alternate exchange:
@@ -103,7 +103,7 @@ class AlternateExchangeTests(TestBase):
channel.exchange_declare(exchange="dlq", type="fanout")
channel.queue_declare(queue="immediate", exclusive=True)
channel.queue_bind(exchange="dlq", queue="immediate")
- channel.message_consume(destination="dlq", queue="immediate")
+ channel.message_subscribe(destination="dlq", queue="immediate")
dlq = self.client.queue("dlq")
#create a queue using the dlq as its alternate exchange:
diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py
index 6bc2f7ceb8..647f5d4fa5 100644
--- a/python/tests_0-10/broker.py
+++ b/python/tests_0-10/broker.py
@@ -35,7 +35,7 @@ class BrokerTests(TestBase):
# No ack consumer
ctag = "tag1"
- ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True)
+ ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0)
body = "test no-ack"
ch.message_transfer(routing_key = "myqueue", body = body)
msg = self.client.queue(ctag).get(timeout = 5)
@@ -44,7 +44,7 @@ class BrokerTests(TestBase):
# Acknowledging consumer
self.queue_declare(ch, queue = "otherqueue")
ctag = "tag2"
- ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False)
+ ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1)
body = "test ack"
ch.message_transfer(routing_key = "otherqueue", body = body)
msg = self.client.queue(ctag).get(timeout = 5)
@@ -60,7 +60,7 @@ class BrokerTests(TestBase):
self.queue_declare(channel, queue="test-queue")
channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
consumer_tag = "tag1"
- channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True)
+ channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
queue = self.client.queue(consumer_tag)
body = "Immediate Delivery"
@@ -84,7 +84,7 @@ class BrokerTests(TestBase):
channel.message_transfer(destination="test-exchange", routing_key="key", body=body)
consumer_tag = "tag1"
- channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True)
+ channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
queue = self.client.queue(consumer_tag)
msg = queue.get(timeout=5)
self.assert_(msg.body == body)
@@ -111,7 +111,7 @@ class BrokerTests(TestBase):
def test_channel_flow(self):
channel = self.channel
channel.queue_declare(queue="flow_test_queue", exclusive=True)
- channel.message_consume(destination="my-tag", queue="flow_test_queue")
+ channel.message_subscribe(destination="my-tag", queue="flow_test_queue")
incoming = self.client.queue("my-tag")
channel.channel_flow(active=False)
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index 2835d703ae..a5b53ac65b 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -366,7 +366,7 @@ class DtxTests(TestBase):
#check the second message is available, but not the first
self.assertMessageCount(1, "tx-queue")
- channel.message_consume(queue="tx-queue", destination="results", no_ack=False)
+ channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1)
msg = self.client.queue("results").get(timeout=1)
self.assertEqual("two", msg.message_id)
channel.message_cancel(destination="results")
@@ -597,9 +597,9 @@ class DtxTests(TestBase):
channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)
def assertMessageCount(self, expected, queue):
- self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count)
+ self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count)
def assertMessageId(self, expected, queue):
- self.channel.message_consume(queue=queue, destination="results", no_ack=True)
+ self.channel.message_subscribe(queue=queue, destination="results")
self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id)
self.channel.message_cancel(destination="results")
diff --git a/python/tests_0-10/example.py b/python/tests_0-10/example.py
index dc71b0590b..e4c80951ac 100644
--- a/python/tests_0-10/example.py
+++ b/python/tests_0-10/example.py
@@ -68,7 +68,7 @@ class ExampleTest (TestBase):
# has fields corresponding to the reply method fields, plus a content
# field that is filled if the reply includes content. In this case the
# interesting field is the consumer_tag.
- channel.message_consume(queue="test-queue", destination="consumer_tag")
+ channel.message_subscribe(queue="test-queue", destination="consumer_tag")
# We can use the Client.queue(...) method to access the queue
# corresponding to our consumer_tag.
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index b882cd5438..6cf2f3ef89 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -34,8 +34,8 @@ class MessageTests(TestBase):
channel.queue_declare(queue="test-queue-1a", exclusive=True)
channel.queue_declare(queue="test-queue-1b", exclusive=True)
#establish two consumers one of which excludes delivery of locally sent messages
- channel.message_consume(destination="local_included", queue="test-queue-1a")
- channel.message_consume(destination="local_excluded", queue="test-queue-1b", no_local=True)
+ channel.message_subscribe(destination="local_included", queue="test-queue-1a")
+ channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
#send a message
channel.message_transfer(routing_key="test-queue-1a", body="consume_no_local")
@@ -61,9 +61,9 @@ class MessageTests(TestBase):
channel.queue_declare(queue="test-queue-2", exclusive=True)
#check that an exclusive consumer prevents other consumer being created:
- channel.message_consume(destination="first", queue="test-queue-2", exclusive=True)
+ channel.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
try:
- channel.message_consume(destination="second", queue="test-queue-2")
+ channel.message_subscribe(destination="second", queue="test-queue-2")
self.fail("Expected consume request to fail due to previous exclusive consumer")
except Closed, e:
self.assertChannelException(403, e.args[0])
@@ -73,9 +73,9 @@ class MessageTests(TestBase):
channel.channel_open()
#check that an exclusive consumer cannot be created if a consumer already exists:
- channel.message_consume(destination="first", queue="test-queue-2")
+ channel.message_subscribe(destination="first", queue="test-queue-2")
try:
- channel.message_consume(destination="second", queue="test-queue-2", exclusive=True)
+ channel.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
self.fail("Expected exclusive consume request to fail due to previous consumer")
except Closed, e:
self.assertChannelException(403, e.args[0])
@@ -87,7 +87,7 @@ class MessageTests(TestBase):
channel = self.channel
try:
#queue specified but doesn't exist:
- channel.message_consume(queue="invalid-queue")
+ channel.message_subscribe(queue="invalid-queue")
self.fail("Expected failure when consuming from non-existent queue")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -96,7 +96,7 @@ class MessageTests(TestBase):
channel.channel_open()
try:
#queue not specified and none previously declared for channel:
- channel.message_consume(queue="")
+ channel.message_subscribe(queue="")
self.fail("Expected failure when consuming from unspecified queue")
except Closed, e:
self.assertConnectionException(530, e.args[0])
@@ -110,9 +110,9 @@ class MessageTests(TestBase):
channel.queue_declare(queue="test-queue-3", exclusive=True)
#check that attempts to use duplicate tags are detected and prevented:
- channel.message_consume(destination="first", queue="test-queue-3")
+ channel.message_subscribe(destination="first", queue="test-queue-3")
try:
- channel.message_consume(destination="first", queue="test-queue-3")
+ channel.message_subscribe(destination="first", queue="test-queue-3")
self.fail("Expected consume request to fail due to non-unique tag")
except Closed, e:
self.assertConnectionException(530, e.args[0])
@@ -124,7 +124,7 @@ class MessageTests(TestBase):
channel = self.channel
#setup, declare a queue:
channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.message_consume(destination="my-consumer", queue="test-queue-4")
+ channel.message_subscribe(destination="my-consumer", queue="test-queue-4")
channel.message_transfer(routing_key="test-queue-4", body="One")
#cancel should stop messages being delivered
@@ -150,7 +150,7 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue="test-ack-queue", exclusive=True)
- channel.message_consume(queue="test-ack-queue", destination="consumer_tag", no_ack=False)
+ channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
channel.message_transfer(routing_key="test-ack-queue", body="One")
@@ -194,7 +194,7 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue="test-requeue", exclusive=True)
- channel.message_consume(queue="test-requeue", destination="consumer_tag", no_ack=False)
+ channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
channel.message_transfer(routing_key="test-requeue", body="One")
@@ -225,7 +225,7 @@ class MessageTests(TestBase):
#requeue unacked messages (Three and Five)
channel.message_recover(requeue=True)
- channel.message_consume(queue="test-requeue", destination="consumer_tag")
+ channel.message_subscribe(queue="test-requeue", destination="consumer_tag")
queue2 = self.client.queue("consumer_tag")
msg3b = queue2.get(timeout=1)
@@ -256,7 +256,7 @@ class MessageTests(TestBase):
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.message_consume(queue="test-prefetch-count", destination="consumer_tag", no_ack=False)
+ subscription = channel.message_subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
#set prefetch to 5:
@@ -298,7 +298,7 @@ class MessageTests(TestBase):
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.message_consume(queue="test-prefetch-size", destination="consumer_tag", no_ack=False)
+ subscription = channel.message_subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
#set prefetch to 50 bytes (each message is 9 or 10 bytes):
@@ -362,13 +362,13 @@ class MessageTests(TestBase):
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "empty")
- #repeat for no_ack=False
+ #repeat for confirm_mode=1
for i in range(11, 21):
channel.message_transfer(routing_key="test-get", body="Message %d" % i)
for i in range(11, 21):
tag = "queue %d" % i
- reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
+ reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
@@ -389,7 +389,7 @@ class MessageTests(TestBase):
#get the unacked messages again (14, 16, 18, 20)
for i in [14, 16, 18, 20]:
tag = "queue %d" % i
- reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
+ reply = channel.message_get(confirm_mode=1, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
msg = self.client.queue(tag).get(timeout=1)
@@ -412,7 +412,7 @@ class MessageTests(TestBase):
"""
channel = self.channel
channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
+ channel.message_subscribe(queue="ref_queue", destination="c1")
queue = self.client.queue("c1")
refId = "myref"
@@ -454,7 +454,7 @@ class MessageTests(TestBase):
other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
ch2 = other.channel(1)
ch2.channel_open()
- ch2.message_consume(queue="ref_queue", destination="c1")
+ ch2.message_subscribe(queue="ref_queue", destination="c1")
queue = other.queue("c1")
msg = queue.get(timeout=1)
@@ -469,7 +469,7 @@ class MessageTests(TestBase):
"""
channel = self.channel
channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
+ channel.message_subscribe(queue="ref_queue", destination="c1")
queue = self.client.queue("c1")
refId = "myref"
@@ -502,8 +502,8 @@ class MessageTests(TestBase):
#declare and consume from two queues
channel.queue_declare(queue="q-one", exclusive=True)
channel.queue_declare(queue="q-two", exclusive=True)
- channel.message_consume(queue="q-one", destination="q-one")
- channel.message_consume(queue="q-two", destination="q-two")
+ channel.message_subscribe(queue="q-one", destination="q-one")
+ channel.message_subscribe(queue="q-two", destination="q-two")
queue1 = self.client.queue("q-one")
queue2 = self.client.queue("q-two")
@@ -590,7 +590,7 @@ class MessageTests(TestBase):
def test_empty_reference(self):
channel = self.channel
channel.queue_declare(queue="ref_queue", exclusive=True)
- channel.message_consume(queue="ref_queue", destination="c1")
+ channel.message_subscribe(queue="ref_queue", destination="c1")
queue = self.client.queue("c1")
refId = "myref"
@@ -611,14 +611,14 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
- channel.message_consume(queue = "q", destination = "consumer")
+ channel.message_subscribe(queue = "q", destination = "consumer")
channel.message_transfer(routing_key = "q", body="blah, blah")
msg = self.client.queue("consumer").get(timeout = 1)
self.assertEquals(msg.body, "blah, blah")
channel.message_cancel(destination = "consumer")
msg.reject()
- channel.message_consume(queue = "q", destination = "checker")
+ channel.message_subscribe(queue = "q", destination = "checker")
msg = self.client.queue("checker").get(timeout = 1)
self.assertEquals(msg.body, "blah, blah")
@@ -634,7 +634,7 @@ class MessageTests(TestBase):
channel = self.client.channel(2)
channel.channel_open()
- channel.message_consume(queue = "q", destination = "consumer")
+ channel.message_subscribe(queue = "q", destination = "consumer")
offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
self.assertTrue(offset<=16)
channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
@@ -654,7 +654,7 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
#create consumer (for now that defaults to infinite credit)
- channel.message_consume(queue = "q", destination = "c")
+ channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
#set credit to zero (can remove this once move to proper default for subscribe method)
channel.message_stop(destination = "c")
@@ -686,7 +686,7 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
#create consumer (for now that defaults to infinite credit)
- channel.message_consume(queue = "q", destination = "c")
+ channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
#set credit to zero (can remove this once move to proper default for subscribe method)
channel.message_stop(destination = "c")
@@ -720,7 +720,7 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
#create consumer (for now that defaults to infinite credit)
- channel.message_consume(queue = "q", destination = "c")
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
#set credit to zero (can remove this once move to proper default for subscribe method)
channel.message_stop(destination = "c")
@@ -754,7 +754,7 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue = "q", exclusive=True)
#create consumer (for now that defaults to infinite credit)
- channel.message_consume(queue = "q", destination = "c")
+ channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
#set credit to zero (can remove this once move to proper default for subscribe method)
channel.message_stop(destination = "c")
diff --git a/python/tests_0-10/query.py b/python/tests_0-10/query.py
index c2e08c003c..06f33be85b 100644
--- a/python/tests_0-10/query.py
+++ b/python/tests_0-10/query.py
@@ -30,16 +30,16 @@ class QueryTests(TestBase):
"""
channel = self.channel
#check returned type for the standard exchanges
- self.assertEqual("direct", channel.exchange_query(name="amq.direct").type)
- self.assertEqual("topic", channel.exchange_query(name="amq.topic").type)
- self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type)
- self.assertEqual("headers", channel.exchange_query(name="amq.match").type)
- self.assertEqual("direct", channel.exchange_query(name="").type)
+ self.assert_type("direct", channel.exchange_query(name="amq.direct"))
+ self.assert_type("topic", channel.exchange_query(name="amq.topic"))
+ self.assert_type("fanout", channel.exchange_query(name="amq.fanout"))
+ self.assert_type("headers", channel.exchange_query(name="amq.match"))
+ self.assert_type("direct", channel.exchange_query(name=""))
#declare an exchange
channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
#check that the result of a query is as expected
response = channel.exchange_query(name="my-test-exchange")
- self.assertEqual("direct", response.type)
+ self.assert_type("direct", response)
self.assertEqual(False, response.durable)
self.assertEqual(False, response.not_found)
#delete the exchange
@@ -47,6 +47,9 @@ class QueryTests(TestBase):
#check that the query now reports not-found
self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
+ def assert_type(self, expected_type, response):
+ self.assertEqual(expected_type, response.__getattr__("type"))
+
def test_binding_query_direct(self):
"""
Test that the binding_query method works as expected with the direct exchange
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index e7fe0b3ed4..8d99c50d32 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -38,18 +38,18 @@ class QueueTests(TestBase):
channel.message_transfer(destination="test-exchange", routing_key="key", body="three")
#check that the queue now reports 3 messages:
- reply = channel.queue_declare(queue="test-queue")
+ channel.queue_declare(queue="test-queue")
+ reply = channel.queue_query(queue="test-queue")
self.assertEqual(3, reply.message_count)
#now do the purge, then test that three messages are purged and the count drops to 0
- reply = channel.queue_purge(queue="test-queue");
- self.assertEqual(3, reply.message_count)
- reply = channel.queue_declare(queue="test-queue")
- self.assertEqual(0, reply.message_count)
+ channel.queue_purge(queue="test-queue");
+ reply = channel.queue_query(queue="test-queue")
+ self.assertEqual(0, reply.message_count)
#send a further message and consume it, ensuring that the other messages are really gone
channel.message_transfer(destination="test-exchange", routing_key="key", body="four")
- channel.message_consume(queue="test-queue", destination="tag", no_ack=True)
+ channel.message_subscribe(queue="test-queue", destination="tag")
queue = self.client.queue("tag")
msg = queue.get(timeout=1)
self.assertEqual("four", msg.body)
@@ -169,8 +169,8 @@ class QueueTests(TestBase):
channel.queue_declare(queue="queue-1", exclusive="True")
channel.queue_declare(queue="queue-2", exclusive="True")
- channel.message_consume(queue="queue-1", destination="queue-1", no_ack=True)
- channel.message_consume(queue="queue-2", destination="queue-2", no_ack=True)
+ channel.message_subscribe(queue="queue-1", destination="queue-1")
+ channel.message_subscribe(queue="queue-2", destination="queue-2")
queue1 = self.client.queue("queue-1")
queue2 = self.client.queue("queue-2")
@@ -213,8 +213,7 @@ class QueueTests(TestBase):
channel.message_transfer(routing_key="delete-me", body="a")
channel.message_transfer(routing_key="delete-me", body="b")
channel.message_transfer(routing_key="delete-me", body="c")
- reply = channel.queue_delete(queue="delete-me")
- self.assertEqual(3, reply.message_count)
+ channel.queue_delete(queue="delete-me")
#check that it has gone be declaring passively
try:
channel.queue_declare(queue="delete-me", passive="True")
@@ -256,7 +255,7 @@ class QueueTests(TestBase):
channel.channel_open()
#empty queue:
- channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True)
+ channel.message_subscribe(destination="consumer_tag", queue="delete-me-2")
queue = self.client.queue("consumer_tag")
msg = queue.get(timeout=1)
self.assertEqual("message", msg.body)
@@ -281,7 +280,7 @@ class QueueTests(TestBase):
#create a queue and register a consumer:
channel.queue_declare(queue="delete-me-3")
channel.queue_declare(queue="delete-me-3", passive="True")
- channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True)
+ channel.message_subscribe(destination="consumer_tag", queue="delete-me-3")
#need new channel now:
channel2 = self.client.channel(2)
@@ -316,8 +315,8 @@ class QueueTests(TestBase):
channel.queue_declare(queue="auto-delete-me", auto_delete=True)
#consume from both channels
- reply = channel.basic_consume(queue="auto-delete-me", no_ack=True)
- channel2.basic_consume(queue="auto-delete-me", no_ack=True)
+ reply = channel.basic_consume(queue="auto-delete-me")
+ channel2.basic_consume(queue="auto-delete-me")
#implicit cancel
channel2.channel_close()
diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py
index b499c2d1f9..4c2f75d35e 100644
--- a/python/tests_0-10/tx.py
+++ b/python/tests_0-10/tx.py
@@ -41,13 +41,13 @@ class TxTests(TestBase):
channel = self.channel
channel.tx_select()
- channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False)
+ channel.message_subscribe(queue="tx-commit-a", destination="qa", confirm_mode=1)
queue_a = self.client.queue("qa")
- channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False)
+ channel.message_subscribe(queue="tx-commit-b", destination="qb", confirm_mode=1)
queue_b = self.client.queue("qb")
- channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False)
+ channel.message_subscribe(queue="tx-commit-c", destination="qc", confirm_mode=1)
queue_c = self.client.queue("qc")
#check results
@@ -174,7 +174,7 @@ class TxTests(TestBase):
channel.tx_select()
#consume and ack messages
- channel.message_consume(queue=name_a, destination="sub_a", no_ack=False)
+ channel.message_subscribe(queue=name_a, destination="sub_a", confirm_mode=1)
queue_a = self.client.queue("sub_a")
for i in range(1, 5):
msg = queue_a.get(timeout=1)
@@ -182,13 +182,13 @@ class TxTests(TestBase):
msg.complete()
- channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
+ channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1)
queue_b = self.client.queue("sub_b")
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
msg.complete()
- sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False)
+ sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1)
queue_c = self.client.queue("sub_c")
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)