diff options
Diffstat (limited to 'python/tests_0-10/message.py')
-rw-r--r-- | python/tests_0-10/message.py | 51 |
1 files changed, 33 insertions, 18 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index aaefb52392..dd80e79d36 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -635,9 +635,7 @@ class MessageTests(TestBase010): session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #use fanout for now: - session.exchange_bind(exchange="amq.fanout", queue="q") - session.message_transfer(destination="amq.fanout", message=Message("acquire me")) - #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) @@ -647,12 +645,13 @@ class MessageTests(TestBase010): #message should still be on the queue: self.assertEquals(1, session.queue_query(queue = "q").message_count) - response = session.message_acquire(RangedSet(msg.id)) + transfers = RangedSet(msg.id) + response = session.message_acquire(transfers) #check that we get notification (i.e. message_acquired) - self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + self.assert_(msg.id in response.transfers) #message should have been removed from the queue: self.assertEquals(0, session.queue_query(queue = "q").message_count) - session.message_accept(RangedSet(msg.id)) + session.message_accept(transfers) def test_release(self): @@ -800,12 +799,12 @@ class MessageTests(TestBase010): session = self.session #publish some messages - self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #create a not-acquired subscriber - session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") session.message_flow(unit = 0, value = 10, destination = "a") @@ -816,19 +815,18 @@ class MessageTests(TestBase010): self.assertEquals("message-%d" % (i), msg.body) if (i % 2): #try to acquire every second message - session.message_acquire([msg.command_id, msg.command_id]) + response = session.message_acquire(RangedSet(msg.id)) #check that acquire succeeds - response = session.control_queue.get(timeout=1) - self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) - session.message_release(RangedSet(msg.id)) - session.channel._completed.add(msg.id) - session.channel.session_completed(session.channel._completed) - - msg.complete() + self.assert_(msg.id in response.transfers) + session.message_accept(RangedSet(msg.id)) + else: + session.message_release(RangedSet(msg.id)) + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) self.assertEmpty(queue) #create a second not-acquired subscriber - session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") session.message_flow(unit = 0, value = 1, destination = "b") #check it gets those not consumed @@ -836,7 +834,9 @@ class MessageTests(TestBase010): for i in [2,4,6,8,10]: msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) - msg.complete() + session.message_release(RangedSet(msg.id)) + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) session.message_flow(unit = 0, value = 1, destination = "b") self.assertEmpty(queue) @@ -899,6 +899,21 @@ class MessageTests(TestBase010): msg = queue.get(timeout = 3) self.assertEquals("message-body", msg.body) + def test_empty_body(self): + session = self.session + session.queue_declare(queue="xyz", exclusive=True, auto_delete=True) + props = session.delivery_properties(routing_key="xyz") + session.message_transfer(message=Message(props, "")) + + consumer_tag = "tag1" + session.message_subscribe(queue="xyz", destination=consumer_tag) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + queue = session.incoming(consumer_tag) + msg = queue.get(timeout=1) + self.assertEquals("", msg.body) + session.message_accept(RangedSet(msg.id)) + def assertDataEquals(self, session, msg, expected): self.assertEquals(expected, msg.body) |