summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py48
-rw-r--r--qpid/cpp/src/tests/queue_flow_limit_tests.py42
2 files changed, 90 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 85ace1008a..593791297a 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -532,6 +532,54 @@ acl allow all all
assert not sender.isAlive()
assert sender.done
+ def test_blocked_queue_delete(self):
+ """Verify that producers which are blocked on a queue due to flow
+ control are unblocked when that queue is deleted.
+ """
+
+ cluster = self.cluster(2)
+ cluster[0].startQmf()
+ cluster[1].startQmf()
+
+ # configure a queue with a specific flow limit on first broker
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q1.getObjectId()
+ self.assertEqual(q1.name, "flq")
+ self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 0)
+
+ # fill the queue on one broker until flow control is active
+ for x in range(5): s0.send(Message(str(x)))
+ sender = ShortTests.BlockedSend(s0, Message(str(6)))
+ sender.start() # Tests that sender does block
+ # Verify the broker queue goes into a flowStopped state
+ deadline = time.time() + 1
+ while not q1.flowStopped and time.time() < deadline: q1.update()
+ assert q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 1)
+ sender.assert_blocked() # Still blocked
+
+ # Now verify the both brokers in cluster have same configuration
+ qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q2 = qs[0]
+ self.assertEqual(q2.name, "flq")
+ self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q2.flowStopped
+ self.assertEqual(q2.flowStoppedCount, 1)
+
+ # now delete the blocked queue from other broker
+ ssn1 = cluster[1].connect().session()
+ self.evaluate_address(ssn1, "flq;{delete:always}")
+ sender.wait() # Verify no longer blocked.
+
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
+
def test_alternate_exchange_update(self):
"""Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py
index d761bde40d..dec7cfb3af 100644
--- a/qpid/cpp/src/tests/queue_flow_limit_tests.py
+++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py
@@ -325,5 +325,47 @@ class QueueFlowLimitTests(TestBase010):
self.verify_limit(TestQ(oid))
+ def test_blocked_queue_delete(self):
+ """ Verify that blocked senders are unblocked when a queue that is flow
+ controlled is deleted.
+ """
+
+ class BlockedSender(Thread):
+ def __init__(self, tester, queue, count, capacity=10):
+ self.tester = tester
+ self.queue = queue
+ self.count = count
+ self.capacity = capacity
+ Thread.__init__(self)
+ self.done = False
+ self.start()
+ def run(self):
+ # spawn qpid-send
+ p = self.tester._start_qpid_send(self.queue,
+ self.count,
+ self.capacity)
+ p.close() # waits for qpid-send to complete
+ self.done = True
+
+ self.startQmf();
+ oid = self._create_queue("kill-q", stop_size=10, resume_size=2)
+ q = self.qmf.getObjects(_objectId=oid)[0]
+ self.failIf(q.flowStopped)
+
+ sender = BlockedSender(self, "kill-q", count=100)
+ # wait for flow control
+ deadline = time() + 10
+ while (not q.flowStopped) and time() < deadline:
+ q.update()
+
+ self.failUnless(q.flowStopped)
+ self.failIf(sender.done) # sender blocked
+
+ self._delete_queue("kill-q")
+ sender.join(5)
+ self.failIf(sender.isAlive())
+ self.failUnless(sender.done)
+
+