diff options
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 167 |
1 files changed, 153 insertions, 14 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 12f7a2ca9a..26298393ff 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -327,7 +327,7 @@ acl allow all all Thread.__init__(self) def run(self): try: - self.sender.send(self.msg) + self.sender.send(self.msg, sync=True) self.condition.acquire() try: self.blocked = False @@ -359,11 +359,12 @@ acl allow all all ssn0 = brokers.first().connect().session() s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") brokers.first().startQmf() - q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - oid = q.getObjectId() - self.assertEqual(q.name, "flq") - self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert not q.flowStopped + q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + oid = q1.getObjectId() + self.assertEqual(q1.name, "flq") + self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert not q1.flowStopped + self.assertEqual(q1.flowStoppedCount, 0) # fill the queue on one broker until flow control is active for x in range(5): s0.send(Message(str(x))) @@ -371,18 +372,20 @@ acl allow all all sender.start() # Tests that sender does block # Verify the broker queue goes into a flowStopped state deadline = time.time() + 1 - while not q.flowStopped and time.time() < deadline: q.update() - assert q.flowStopped + while not q1.flowStopped and time.time() < deadline: q1.update() + assert q1.flowStopped + self.assertEqual(q1.flowStoppedCount, 1) sender.assert_blocked() # Still blocked # Now verify the both brokers in cluster have same configuration brokers.second().startQmf() qs = brokers.second().qmf_session.getObjects(_objectId=oid) self.assertEqual(len(qs), 1) - q = qs[0] - self.assertEqual(q.name, "flq") - self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert q.flowStopped + q2 = qs[0] + self.assertEqual(q2.name, "flq") + self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert q2.flowStopped + self.assertEqual(q2.flowStoppedCount, 1) # now drain the queue using a session to the other broker ssn1 = brokers.second().connect().session() @@ -392,6 +395,12 @@ acl allow all all ssn1.acknowledge() sender.wait() # Verify no longer blocked. + # and re-verify state of queue on both brokers + q1.update() + assert not q1.flowStopped + q2.update() + assert not q2.flowStopped + ssn0.connection.close() ssn1.connection.close() cluster_test_logs.verify_logs() @@ -405,7 +414,6 @@ acl allow all all self.queue_flowlimit_test(Brokers()) def test_queue_flowlimit_cluster(self): - return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 cluster = self.cluster(2) class Brokers: def first(self): return cluster[0] @@ -413,7 +421,6 @@ acl allow all all self.queue_flowlimit_test(Brokers()) def test_queue_flowlimit_cluster_join(self): - return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 cluster = self.cluster(1) class Brokers: def first(self): return cluster[0] @@ -422,6 +429,103 @@ acl allow all all return cluster[1] self.queue_flowlimit_test(Brokers()) + def test_queue_flowlimit_replicate(self): + """ Verify that a queue which is in flow control BUT has drained BELOW + the flow control 'stop' threshold, is correctly replicated when a new + broker is added to the cluster. + """ + + class AsyncSender(Thread): + """Send a fixed number of msgs from a sender in a separate thread + so it may block without blocking the test. + """ + def __init__(self, broker, address, count=1, size=4): + Thread.__init__(self) + self.daemon = True + self.broker = broker + self.queue = address + self.count = count + self.size = size + self.done = False + + def run(self): + self.sender = subprocess.Popen(["qpid-send", + "--capacity=1", + "--content-size=%s" % self.size, + "--messages=%s" % self.count, + "--failover-updates", + "--connection-options={reconnect:true}", + "--address=%s" % self.queue, + "--broker=%s" % self.broker.host_port()]) + self.sender.wait() + self.done = True + + cluster = self.cluster(2) + # create a queue with rather draconian flow control settings + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}") + + # fire off the sending thread to broker[0], and wait until the queue + # hits flow control on broker[1] + sender = AsyncSender(cluster[0], "flq", count=110); + sender.start(); + + cluster[1].startQmf() + q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + deadline = time.time() + 10 + while not q_obj.flowStopped and time.time() < deadline: + q_obj.update() + assert q_obj.flowStopped + assert not sender.done + assert q_obj.msgDepth < 110 + + # Now drain enough messages on broker[1] to drop below the flow stop + # threshold, but not relieve flow control... + receiver = subprocess.Popen(["qpid-receive", + "--messages=15", + "--timeout=1", + "--print-content=no", + "--failover-updates", + "--connection-options={reconnect:true}", + "--ack-frequency=1", + "--address=flq", + "--broker=%s" % cluster[1].host_port()]) + receiver.wait() + q_obj.update() + assert q_obj.flowStopped + assert not sender.done + current_depth = q_obj.msgDepth + + # add a new broker to the cluster, and verify that the queue is in flow + # control on that broker + cluster.start() + cluster[2].startQmf() + q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + assert q_obj.flowStopped + assert q_obj.msgDepth == current_depth + + # now drain the queue on broker[2], and verify that the sender becomes + # unblocked + receiver = subprocess.Popen(["qpid-receive", + "--messages=95", + "--timeout=1", + "--print-content=no", + "--failover-updates", + "--connection-options={reconnect:true}", + "--ack-frequency=1", + "--address=flq", + "--broker=%s" % cluster[2].host_port()]) + receiver.wait() + q_obj.update() + assert not q_obj.flowStopped + assert q_obj.msgDepth == 0 + + # verify that the sender has become unblocked + sender.join(timeout=5) + assert not sender.isAlive() + assert sender.done + + def test_alternate_exchange_update(self): """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ cluster = self.cluster(1) @@ -688,6 +792,41 @@ class LongTests(BrokerTest): for i in xrange(1000): cluster[0].connect().close() cluster_test_logs.verify_logs() + def test_flowlimit_failover(self): + """Test fail-over during continuous send-receive with flow control + active. + """ + + # Original cluster will all be killed so expect exit with failure + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + #for b in cluster: ErrorGenerator(b) + + # create a queue with rather draconian flow control settings + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") + + receiver = NumberedReceiver(cluster[2]) + receiver.start() + senders = [NumberedSender(cluster[i]) for i in range(1,3)] + for s in senders: + s.start() + + # Kill original brokers, start new ones for the duration. + endtime = time.time() + self.duration(); + i = 0 + while time.time() < endtime: + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + #ErrorGenerator(b) + time.sleep(5) + #b = cluster[0] + #b.startQmf() + for s in senders: + s.stop() + receiver.stop() + for i in range(i, len(cluster)): cluster[i].kill() + class StoreTests(BrokerTest): """ |