summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/cpp_failing_0-10.txt1
-rw-r--r--qpid/python/qpid/testlib.py9
-rw-r--r--qpid/python/tests_0-10/alternate-exchange.py8
-rw-r--r--qpid/python/tests_0-10/broker.py12
-rw-r--r--qpid/python/tests_0-10/dtx.py4
-rw-r--r--qpid/python/tests_0-10/example.py2
-rw-r--r--qpid/python/tests_0-10/message.py52
-rw-r--r--qpid/python/tests_0-10/queue.py10
-rw-r--r--qpid/python/tests_0-10/tx.py12
9 files changed, 58 insertions, 52 deletions
diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt
index 5b2fb593e1..878afee3c5 100644
--- a/qpid/python/cpp_failing_0-10.txt
+++ b/qpid/python/cpp_failing_0-10.txt
@@ -1,3 +1,2 @@
tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate
-tests_0-10.basic.BasicTests.test_get
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index 28c07ba43a..c2e3024a7e 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -259,8 +259,17 @@ class TestBase(unittest.TestCase):
else: self.uniqueTag += 1
consumer_tag = "tag" + str(self.uniqueTag)
self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
+ self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
return self.client.queue(consumer_tag)
+ def subscribe(self, channel=None, **keys):
+ channel = channel or self.channel
+ consumer_tag = keys["destination"]
+ channel.message_subscribe(**keys)
+ channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
+ channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+
def assertEmpty(self, queue):
"""Assert that the queue is empty"""
try:
diff --git a/qpid/python/tests_0-10/alternate-exchange.py b/qpid/python/tests_0-10/alternate-exchange.py
index d6ac62ccfe..a749d733b0 100644
--- a/qpid/python/tests_0-10/alternate-exchange.py
+++ b/qpid/python/tests_0-10/alternate-exchange.py
@@ -39,13 +39,13 @@ class AlternateExchangeTests(TestBase):
#declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
channel.queue_declare(queue="returns", exclusive=True)
channel.queue_bind(queue="returns", exchange="secondary")
- channel.message_subscribe(destination="a", queue="returns")
+ self.subscribe(destination="a", queue="returns")
returned = self.client.queue("a")
#declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
channel.queue_declare(queue="processed", exclusive=True)
channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key")
- channel.message_subscribe(destination="b", queue="processed")
+ self.subscribe(destination="b", queue="processed")
processed = self.client.queue("b")
#publish to the primary exchange
@@ -73,7 +73,7 @@ class AlternateExchangeTests(TestBase):
channel.exchange_declare(exchange="dlq", type="fanout")
channel.queue_declare(queue="deleted", exclusive=True)
channel.queue_bind(exchange="dlq", queue="deleted")
- channel.message_subscribe(destination="dlq", queue="deleted")
+ self.subscribe(destination="dlq", queue="deleted")
dlq = self.client.queue("dlq")
#create a queue using the dlq as its alternate exchange:
@@ -103,7 +103,7 @@ class AlternateExchangeTests(TestBase):
channel.exchange_declare(exchange="dlq", type="fanout")
channel.queue_declare(queue="immediate", exclusive=True)
channel.queue_bind(exchange="dlq", queue="immediate")
- channel.message_subscribe(destination="dlq", queue="immediate")
+ self.subscribe(destination="dlq", queue="immediate")
dlq = self.client.queue("dlq")
#create a queue using the dlq as its alternate exchange:
diff --git a/qpid/python/tests_0-10/broker.py b/qpid/python/tests_0-10/broker.py
index 0eb71287ec..0df7eb09fa 100644
--- a/qpid/python/tests_0-10/broker.py
+++ b/qpid/python/tests_0-10/broker.py
@@ -35,7 +35,7 @@ class BrokerTests(TestBase):
# No ack consumer
ctag = "tag1"
- ch.message_subscribe(queue = "myqueue", destination = ctag, confirm_mode = 0)
+ self.subscribe(ch, queue = "myqueue", destination = ctag)
body = "test no-ack"
ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"}))
msg = self.client.queue(ctag).get(timeout = 5)
@@ -44,7 +44,9 @@ class BrokerTests(TestBase):
# Acknowledging consumer
self.queue_declare(ch, queue = "otherqueue")
ctag = "tag2"
- ch.message_subscribe(queue = "otherqueue", destination = ctag, confirm_mode = 1)
+ self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1)
+ ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
+ ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
body = "test ack"
ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"}))
msg = self.client.queue(ctag).get(timeout = 5)
@@ -60,7 +62,7 @@ class BrokerTests(TestBase):
self.queue_declare(channel, queue="test-queue")
channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
consumer_tag = "tag1"
- channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
+ self.subscribe(queue="test-queue", destination=consumer_tag)
queue = self.client.queue(consumer_tag)
body = "Immediate Delivery"
@@ -84,7 +86,7 @@ class BrokerTests(TestBase):
channel.message_transfer(destination="test-exchange", content = Content(body, properties = {"routing_key" : "key"}))
consumer_tag = "tag1"
- channel.message_subscribe(queue="test-queue", destination=consumer_tag, confirm_mode = 0)
+ self.subscribe(queue="test-queue", destination=consumer_tag)
queue = self.client.queue(consumer_tag)
msg = queue.get(timeout=5)
self.assert_(msg.content.body == body)
@@ -111,7 +113,7 @@ class BrokerTests(TestBase):
def test_channel_flow(self):
channel = self.channel
channel.queue_declare(queue="flow_test_queue", exclusive=True)
- channel.message_subscribe(destination="my-tag", queue="flow_test_queue")
+ self.subscribe(destination="my-tag", queue="flow_test_queue")
incoming = self.client.queue("my-tag")
channel.channel_flow(active=False)
diff --git a/qpid/python/tests_0-10/dtx.py b/qpid/python/tests_0-10/dtx.py
index 29a4d3bf0d..b5645cb596 100644
--- a/qpid/python/tests_0-10/dtx.py
+++ b/qpid/python/tests_0-10/dtx.py
@@ -366,7 +366,7 @@ class DtxTests(TestBase):
#check the second message is available, but not the first
self.assertMessageCount(1, "tx-queue")
- channel.message_subscribe(queue="tx-queue", destination="results", confirm_mode=1)
+ self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1)
msg = self.client.queue("results").get(timeout=1)
self.assertEqual("two", msg.content['message_id'])
channel.message_cancel(destination="results")
@@ -602,5 +602,7 @@ class DtxTests(TestBase):
def assertMessageId(self, expected, queue):
self.channel.message_subscribe(queue=queue, destination="results")
+ self.channel.message_flow(destination="results", unit=0, value=1)
+ self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
self.channel.message_cancel(destination="results")
diff --git a/qpid/python/tests_0-10/example.py b/qpid/python/tests_0-10/example.py
index e3e2c3b095..9dbe73e3cb 100644
--- a/qpid/python/tests_0-10/example.py
+++ b/qpid/python/tests_0-10/example.py
@@ -69,6 +69,8 @@ class ExampleTest (TestBase):
# field that is filled if the reply includes content. In this case the
# interesting field is the consumer_tag.
channel.message_subscribe(queue="test-queue", destination="consumer_tag")
+ channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
+ channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
# We can use the Client.queue(...) method to access the queue
# corresponding to our consumer_tag.
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index 8089709314..ba26dda309 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -34,8 +34,8 @@ class MessageTests(TestBase):
channel.queue_declare(queue="test-queue-1a", exclusive=True)
channel.queue_declare(queue="test-queue-1b", exclusive=True)
#establish two consumers one of which excludes delivery of locally sent messages
- channel.message_subscribe(destination="local_included", queue="test-queue-1a")
- channel.message_subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
#send a message
channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
@@ -61,9 +61,9 @@ class MessageTests(TestBase):
channel.queue_declare(queue="test-queue-2", exclusive=True)
#check that an exclusive consumer prevents other consumer being created:
- channel.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
+ self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
try:
- channel.message_subscribe(destination="second", queue="test-queue-2")
+ self.subscribe(destination="second", queue="test-queue-2")
self.fail("Expected consume request to fail due to previous exclusive consumer")
except Closed, e:
self.assertChannelException(403, e.args[0])
@@ -73,9 +73,9 @@ class MessageTests(TestBase):
channel.channel_open()
#check that an exclusive consumer cannot be created if a consumer already exists:
- channel.message_subscribe(destination="first", queue="test-queue-2")
+ self.subscribe(channel, destination="first", queue="test-queue-2")
try:
- channel.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
+ self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
self.fail("Expected exclusive consume request to fail due to previous consumer")
except Closed, e:
self.assertChannelException(403, e.args[0])
@@ -87,7 +87,7 @@ class MessageTests(TestBase):
channel = self.channel
try:
#queue specified but doesn't exist:
- channel.message_subscribe(queue="invalid-queue")
+ self.subscribe(queue="invalid-queue", destination="")
self.fail("Expected failure when consuming from non-existent queue")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -96,7 +96,7 @@ class MessageTests(TestBase):
channel.channel_open()
try:
#queue not specified and none previously declared for channel:
- channel.message_subscribe(queue="")
+ self.subscribe(channel, queue="", destination="")
self.fail("Expected failure when consuming from unspecified queue")
except Closed, e:
self.assertConnectionException(530, e.args[0])
@@ -110,9 +110,9 @@ class MessageTests(TestBase):
channel.queue_declare(queue="test-queue-3", exclusive=True)
#check that attempts to use duplicate tags are detected and prevented:
- channel.message_subscribe(destination="first", queue="test-queue-3")
+ self.subscribe(destination="first", queue="test-queue-3")
try:
- channel.message_subscribe(destination="first", queue="test-queue-3")
+ self.subscribe(destination="first", queue="test-queue-3")
self.fail("Expected consume request to fail due to non-unique tag")
except Closed, e:
self.assertConnectionException(530, e.args[0])
@@ -124,7 +124,7 @@ class MessageTests(TestBase):
channel = self.channel
#setup, declare a queue:
channel.queue_declare(queue="test-queue-4", exclusive=True)
- channel.message_subscribe(destination="my-consumer", queue="test-queue-4")
+ self.subscribe(destination="my-consumer", queue="test-queue-4")
channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
#cancel should stop messages being delivered
@@ -150,7 +150,7 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue="test-ack-queue", exclusive=True)
- channel.message_subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
+ self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
@@ -194,7 +194,7 @@ class MessageTests(TestBase):
channel = self.channel
channel.queue_declare(queue="test-requeue", exclusive=True)
- channel.message_subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
+ self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
@@ -225,7 +225,7 @@ class MessageTests(TestBase):
#requeue unacked messages (Three and Five)
channel.message_recover(requeue=True)
- channel.message_subscribe(queue="test-requeue", destination="consumer_tag")
+ self.subscribe(queue="test-requeue", destination="consumer_tag")
queue2 = self.client.queue("consumer_tag")
msg3b = queue2.get(timeout=1)
@@ -256,7 +256,7 @@ class MessageTests(TestBase):
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-count", exclusive=True)
- subscription = channel.message_subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
+ subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
#set prefetch to 5:
@@ -298,7 +298,7 @@ class MessageTests(TestBase):
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-size", exclusive=True)
- subscription = channel.message_subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
+ subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
queue = self.client.queue("consumer_tag")
#set prefetch to 50 bytes (each message is 9 or 10 bytes):
@@ -345,13 +345,13 @@ class MessageTests(TestBase):
channel.queue_declare(queue = "r", exclusive=True)
channel.queue_bind(queue = "r", exchange = "amq.fanout")
- channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
+ self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
msg = self.client.queue("consumer").get(timeout = 1)
self.assertEquals(msg.content.body, "blah, blah")
channel.message_reject([msg.command_id, msg.command_id])
- channel.message_subscribe(queue = "r", destination = "checker")
+ self.subscribe(queue = "r", destination = "checker")
msg = self.client.queue("checker").get(timeout = 1)
self.assertEquals(msg.content.body, "blah, blah")
@@ -365,8 +365,6 @@ class MessageTests(TestBase):
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
- #set credit to zero (can remove this once move to proper default for subscribe method)
- channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
@@ -397,8 +395,6 @@ class MessageTests(TestBase):
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c")
channel.message_flow_mode(mode = 0, destination = "c")
- #set credit to zero (can remove this once move to proper default for subscribe method)
- channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
@@ -431,8 +427,6 @@ class MessageTests(TestBase):
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
- #set credit to zero (can remove this once move to proper default for subscribe method)
- channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
@@ -465,8 +459,6 @@ class MessageTests(TestBase):
#create consumer (for now that defaults to infinite credit)
channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
channel.message_flow_mode(mode = 1, destination = "c")
- #set credit to zero (can remove this once move to proper default for subscribe method)
- channel.message_stop(destination = "c")
#send batch of messages to queue
for i in range(1, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
@@ -506,8 +498,8 @@ class MessageTests(TestBase):
for i in range(1, 6):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
- channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
- channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
for i in range(6, 11):
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
@@ -532,7 +524,7 @@ class MessageTests(TestBase):
channel.queue_declare(queue = "q", exclusive=True)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
- channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
msg = self.client.queue("a").get(timeout = 1)
channel.message_acquire([msg.command_id, msg.command_id])
msg.complete()
@@ -548,7 +540,7 @@ class MessageTests(TestBase):
channel.queue_declare(queue = "q", exclusive=True)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
- channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+ self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
msg = self.client.queue("a").get(timeout = 1)
channel.message_cancel(destination = "a")
channel.message_release([msg.command_id, msg.command_id])
diff --git a/qpid/python/tests_0-10/queue.py b/qpid/python/tests_0-10/queue.py
index 05fa1aebc6..e3438116c8 100644
--- a/qpid/python/tests_0-10/queue.py
+++ b/qpid/python/tests_0-10/queue.py
@@ -49,7 +49,7 @@ class QueueTests(TestBase):
#send a further message and consume it, ensuring that the other messages are really gone
channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
- channel.message_subscribe(queue="test-queue", destination="tag")
+ self.subscribe(queue="test-queue", destination="tag")
queue = self.client.queue("tag")
msg = queue.get(timeout=1)
self.assertEqual("four", msg.content.body)
@@ -169,8 +169,8 @@ class QueueTests(TestBase):
channel.queue_declare(queue="queue-1", exclusive="True")
channel.queue_declare(queue="queue-2", exclusive="True")
- channel.message_subscribe(queue="queue-1", destination="queue-1")
- channel.message_subscribe(queue="queue-2", destination="queue-2")
+ self.subscribe(queue="queue-1", destination="queue-1")
+ self.subscribe(queue="queue-2", destination="queue-2")
queue1 = self.client.queue("queue-1")
queue2 = self.client.queue("queue-2")
@@ -257,7 +257,7 @@ class QueueTests(TestBase):
channel.channel_open()
#empty queue:
- channel.message_subscribe(destination="consumer_tag", queue="delete-me-2")
+ self.subscribe(channel, destination="consumer_tag", queue="delete-me-2")
queue = self.client.queue("consumer_tag")
msg = queue.get(timeout=1)
self.assertEqual("message", msg.content.body)
@@ -282,7 +282,7 @@ class QueueTests(TestBase):
#create a queue and register a consumer:
channel.queue_declare(queue="delete-me-3")
channel.queue_declare(queue="delete-me-3", passive="True")
- channel.message_subscribe(destination="consumer_tag", queue="delete-me-3")
+ self.subscribe(destination="consumer_tag", queue="delete-me-3")
#need new channel now:
channel2 = self.client.channel(2)
diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py
index 7c50de4ee2..2415a88fb2 100644
--- a/qpid/python/tests_0-10/tx.py
+++ b/qpid/python/tests_0-10/tx.py
@@ -41,13 +41,13 @@ class TxTests(TestBase):
channel = self.channel
channel.tx_select()
- channel.message_subscribe(queue="tx-commit-a", destination="qa", confirm_mode=1)
+ self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
queue_a = self.client.queue("qa")
- channel.message_subscribe(queue="tx-commit-b", destination="qb", confirm_mode=1)
+ self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
queue_b = self.client.queue("qb")
- channel.message_subscribe(queue="tx-commit-c", destination="qc", confirm_mode=1)
+ self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
queue_c = self.client.queue("qc")
#check results
@@ -176,7 +176,7 @@ class TxTests(TestBase):
channel.tx_select()
#consume and ack messages
- channel.message_subscribe(queue=name_a, destination="sub_a", confirm_mode=1)
+ self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
queue_a = self.client.queue("sub_a")
for i in range(1, 5):
msg = queue_a.get(timeout=1)
@@ -184,13 +184,13 @@ class TxTests(TestBase):
msg.complete()
- channel.message_subscribe(queue=name_b, destination="sub_b", confirm_mode=1)
+ self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
queue_b = self.client.queue("sub_b")
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.content.body)
msg.complete()
- sub_c = channel.message_subscribe(queue=name_c, destination="sub_c", confirm_mode=1)
+ sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
queue_c = self.client.queue("sub_c")
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.content.body)