diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 85ace1008a..593791297a 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -532,6 +532,54 @@ acl allow all all assert not sender.isAlive() assert sender.done + def test_blocked_queue_delete(self): + """Verify that producers which are blocked on a queue due to flow + control are unblocked when that queue is deleted. + """ + + cluster = self.cluster(2) + cluster[0].startQmf() + cluster[1].startQmf() + + # configure a queue with a specific flow limit on first broker + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") + q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + oid = q1.getObjectId() + self.assertEqual(q1.name, "flq") + self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert not q1.flowStopped + self.assertEqual(q1.flowStoppedCount, 0) + + # fill the queue on one broker until flow control is active + for x in range(5): s0.send(Message(str(x))) + sender = ShortTests.BlockedSend(s0, Message(str(6))) + sender.start() # Tests that sender does block + # Verify the broker queue goes into a flowStopped state + deadline = time.time() + 1 + while not q1.flowStopped and time.time() < deadline: q1.update() + assert q1.flowStopped + self.assertEqual(q1.flowStoppedCount, 1) + sender.assert_blocked() # Still blocked + + # Now verify the both brokers in cluster have same configuration + qs = cluster[1].qmf_session.getObjects(_objectId=oid) + self.assertEqual(len(qs), 1) + q2 = qs[0] + self.assertEqual(q2.name, "flq") + self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert q2.flowStopped + self.assertEqual(q2.flowStoppedCount, 1) + + # now delete the blocked queue from other broker + ssn1 = cluster[1].connect().session() + self.evaluate_address(ssn1, "flq;{delete:always}") + sender.wait() # Verify no longer blocked. + + ssn0.connection.close() + ssn1.connection.close() + cluster_test_logs.verify_logs() + def test_alternate_exchange_update(self): """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ |