diff options
author | Gordon Sim <gsim@apache.org> | 2011-09-15 17:03:11 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-09-15 17:03:11 +0000 |
commit | faf6b5dc915c07a37c2fdaef2a758eb4656cec04 (patch) | |
tree | c7f4f2e379dc34b4a56c3b7e3442cfa1724049ac | |
parent | 852080fb799f0ac807b9553da31a4c5e2f475887 (diff) | |
download | qpid-python-faf6b5dc915c07a37c2fdaef2a758eb4656cec04.tar.gz |
QPID-3488: Added test case
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1171175 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/message.py | 41 |
1 files changed, 41 insertions, 0 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py index b46c446833..89ba936b05 100644 --- a/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/tests/src/py/qpid_tests/broker_0_10/message.py @@ -508,6 +508,47 @@ class MessageTests(TestBase010): msgB = q.get(timeout=10) + def test_window_stop(self): + """ + Ensure window based flow control reacts to stop correctly + """ + session = self.session + #setup subscriber on a test queue + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session.message_subscribe(queue = "q", destination = "c") + session.message_set_flow_mode(flow_mode = 1, destination = "c") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + + + #send batch of messages to queue + for i in range(0, 10): + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % (i+1))) + + #retrieve all delivered messages + q = session.incoming("c") + for i in range(0, 5): + msg = q.get(timeout = 1) + session.receiver._completed.add(msg.id)#TODO: this may be done automatically + self.assertDataEquals(session, msg, "Message %d" % (i+1)) + + session.message_stop(destination = "c") + + #now send completions, normally used to move window forward, + #but after a stop should not do so + session.channel.session_completed(session.receiver._completed) + + #check no more messages are sent + self.assertEmpty(q) + + #re-establish window and check remaining messages + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c") + for i in range(0, 5): + msg = q.get(timeout = 1) + self.assertDataEquals(session, msg, "Message %d" % (i+6)) + + def test_subscribe_not_acquired(self): """ Test the not-acquired modes works as expected for a simple case |