summaryrefslogtreecommitdiff
path: root/python/tests_0-10/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests_0-10/message.py')
-rw-r--r--python/tests_0-10/message.py51
1 files changed, 33 insertions, 18 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index aaefb52392..dd80e79d36 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -635,9 +635,7 @@ class MessageTests(TestBase010):
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#use fanout for now:
- session.exchange_bind(exchange="amq.fanout", queue="q")
- session.message_transfer(destination="amq.fanout", message=Message("acquire me"))
- #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
@@ -647,12 +645,13 @@ class MessageTests(TestBase010):
#message should still be on the queue:
self.assertEquals(1, session.queue_query(queue = "q").message_count)
- response = session.message_acquire(RangedSet(msg.id))
+ transfers = RangedSet(msg.id)
+ response = session.message_acquire(transfers)
#check that we get notification (i.e. message_acquired)
- self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ self.assert_(msg.id in response.transfers)
#message should have been removed from the queue:
self.assertEquals(0, session.queue_query(queue = "q").message_count)
- session.message_accept(RangedSet(msg.id))
+ session.message_accept(transfers)
def test_release(self):
@@ -800,12 +799,12 @@ class MessageTests(TestBase010):
session = self.session
#publish some messages
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#create a not-acquired subscriber
- session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
session.message_flow(unit = 0, value = 10, destination = "a")
@@ -816,19 +815,18 @@ class MessageTests(TestBase010):
self.assertEquals("message-%d" % (i), msg.body)
if (i % 2):
#try to acquire every second message
- session.message_acquire([msg.command_id, msg.command_id])
+ response = session.message_acquire(RangedSet(msg.id))
#check that acquire succeeds
- response = session.control_queue.get(timeout=1)
- self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
- session.message_release(RangedSet(msg.id))
- session.channel._completed.add(msg.id)
- session.channel.session_completed(session.channel._completed)
-
- msg.complete()
+ self.assert_(msg.id in response.transfers)
+ session.message_accept(RangedSet(msg.id))
+ else:
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
self.assertEmpty(queue)
#create a second not-acquired subscriber
- session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
session.message_flow(unit = 0, value = 1, destination = "b")
#check it gets those not consumed
@@ -836,7 +834,9 @@ class MessageTests(TestBase010):
for i in [2,4,6,8,10]:
msg = queue.get(timeout = 1)
self.assertEquals("message-%d" % (i), msg.body)
- msg.complete()
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
session.message_flow(unit = 0, value = 1, destination = "b")
self.assertEmpty(queue)
@@ -899,6 +899,21 @@ class MessageTests(TestBase010):
msg = queue.get(timeout = 3)
self.assertEquals("message-body", msg.body)
+ def test_empty_body(self):
+ session = self.session
+ session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
+ props = session.delivery_properties(routing_key="xyz")
+ session.message_transfer(message=Message(props, ""))
+
+ consumer_tag = "tag1"
+ session.message_subscribe(queue="xyz", destination=consumer_tag)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag)
+ queue = session.incoming(consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEquals("", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)