path: root/qpid/cpp/src/tests/
diff options
Diffstat (limited to 'qpid/cpp/src/tests/')
1 files changed, 48 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/ b/qpid/cpp/src/tests/
index 85ace1008a..593791297a 100755
--- a/qpid/cpp/src/tests/
+++ b/qpid/cpp/src/tests/
@@ -532,6 +532,54 @@ acl allow all all
assert not sender.isAlive()
assert sender.done
+ def test_blocked_queue_delete(self):
+ """Verify that producers which are blocked on a queue due to flow
+ control are unblocked when that queue is deleted.
+ """
+ cluster = self.cluster(2)
+ cluster[0].startQmf()
+ cluster[1].startQmf()
+ # configure a queue with a specific flow limit on first broker
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if == "flq"][0]
+ oid = q1.getObjectId()
+ self.assertEqual(, "flq")
+ self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 0)
+ # fill the queue on one broker until flow control is active
+ for x in range(5): s0.send(Message(str(x)))
+ sender = ShortTests.BlockedSend(s0, Message(str(6)))
+ sender.start() # Tests that sender does block
+ # Verify the broker queue goes into a flowStopped state
+ deadline = time.time() + 1
+ while not q1.flowStopped and time.time() < deadline: q1.update()
+ assert q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 1)
+ sender.assert_blocked() # Still blocked
+ # Now verify the both brokers in cluster have same configuration
+ qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q2 = qs[0]
+ self.assertEqual(, "flq")
+ self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q2.flowStopped
+ self.assertEqual(q2.flowStoppedCount, 1)
+ # now delete the blocked queue from other broker
+ ssn1 = cluster[1].connect().session()
+ self.evaluate_address(ssn1, "flq;{delete:always}")
+ sender.wait() # Verify no longer blocked.
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
def test_alternate_exchange_update(self):
"""Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """