diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/tests_0-10/message.py | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index 74e2b6416f..b882cd5438 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -645,7 +645,145 @@ class MessageTests(TestBase): self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") self.assertEmpty(self.client.queue("consumer")) + + def test_credit_flow_messages(self): + """ + Test basic credit based flow control with unit = message + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + #create consumer (for now that defaults to infinite credit) + channel.message_consume(queue = "q", destination = "c") + channel.message_flow_mode(mode = 0, destination = "c") + #set credit to zero (can remove this once move to proper default for subscribe method) + channel.message_stop(destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(routing_key = "q", body = "Message %d" % i) + #set message credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 0, value = 5, destination = "c") + #set infinite byte credit + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(6, 11): + channel.message_flow(unit = 0, value = 1, destination = "c") + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + def test_credit_flow_bytes(self): + """ + Test basic credit based flow control with unit = bytes + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + #create consumer (for now that defaults to infinite credit) + channel.message_consume(queue = "q", destination = "c") + channel.message_flow_mode(mode = 0, destination = "c") + #set credit to zero (can remove this once move to proper default for subscribe method) + channel.message_stop(destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(routing_key = "q", body = "abcdefgh") + + #each message is currently interpreted as requiring 75 bytes of credit + #set byte credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 1, value = 75*5, destination = "c") + #set infinite message credit + channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(6, 11): + channel.message_flow(unit = 1, value = 75, destination = "c") + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + + def test_window_flow_messages(self): + """ + Test basic window based flow control with unit = message + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + #create consumer (for now that defaults to infinite credit) + channel.message_consume(queue = "q", destination = "c") + channel.message_flow_mode(mode = 1, destination = "c") + #set credit to zero (can remove this once move to proper default for subscribe method) + channel.message_stop(destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(routing_key = "q", body = "Message %d" % i) + + #set message credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 0, value = 5, destination = "c") + #set infinite byte credit + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + msg = q.get(timeout = 1) + self.assertDataEquals(channel, msg, "Message %d" % i) + self.assertEmpty(q) + + #acknowledge messages and check more are received + msg.complete(cumulative=True) + for i in range(6, 11): + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + + def test_window_flow_bytes(self): + """ + Test basic window based flow control with unit = bytes + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + #create consumer (for now that defaults to infinite credit) + channel.message_consume(queue = "q", destination = "c") + channel.message_flow_mode(mode = 1, destination = "c") + #set credit to zero (can remove this once move to proper default for subscribe method) + channel.message_stop(destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(routing_key = "q", body = "abcdefgh") + + #each message is currently interpreted as requiring 75 bytes of credit + #set byte credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 1, value = 75*5, destination = "c") + #set infinite message credit + channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + msgs = [] + for i in range(1, 6): + msg = q.get(timeout = 1) + msgs.append(msg) + self.assertDataEquals(channel, msg, "abcdefgh") + self.assertEmpty(q) + + #ack each message individually and check more are received + for i in range(6, 11): + msg = msgs.pop() + msg.complete(cumulative=False) + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + def assertDataEquals(self, channel, msg, expected): if isinstance(msg.body, ReferenceId): |