summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-xcpp/src/tests/cluster_tests.py167
1 files changed, 153 insertions, 14 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 12f7a2ca9a..26298393ff 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -327,7 +327,7 @@ acl allow all all
Thread.__init__(self)
def run(self):
try:
- self.sender.send(self.msg)
+ self.sender.send(self.msg, sync=True)
self.condition.acquire()
try:
self.blocked = False
@@ -359,11 +359,12 @@ acl allow all all
ssn0 = brokers.first().connect().session()
s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
brokers.first().startQmf()
- q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- oid = q.getObjectId()
- self.assertEqual(q.name, "flq")
- self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert not q.flowStopped
+ q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q1.getObjectId()
+ self.assertEqual(q1.name, "flq")
+ self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 0)
# fill the queue on one broker until flow control is active
for x in range(5): s0.send(Message(str(x)))
@@ -371,18 +372,20 @@ acl allow all all
sender.start() # Tests that sender does block
# Verify the broker queue goes into a flowStopped state
deadline = time.time() + 1
- while not q.flowStopped and time.time() < deadline: q.update()
- assert q.flowStopped
+ while not q1.flowStopped and time.time() < deadline: q1.update()
+ assert q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 1)
sender.assert_blocked() # Still blocked
# Now verify the both brokers in cluster have same configuration
brokers.second().startQmf()
qs = brokers.second().qmf_session.getObjects(_objectId=oid)
self.assertEqual(len(qs), 1)
- q = qs[0]
- self.assertEqual(q.name, "flq")
- self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert q.flowStopped
+ q2 = qs[0]
+ self.assertEqual(q2.name, "flq")
+ self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q2.flowStopped
+ self.assertEqual(q2.flowStoppedCount, 1)
# now drain the queue using a session to the other broker
ssn1 = brokers.second().connect().session()
@@ -392,6 +395,12 @@ acl allow all all
ssn1.acknowledge()
sender.wait() # Verify no longer blocked.
+ # and re-verify state of queue on both brokers
+ q1.update()
+ assert not q1.flowStopped
+ q2.update()
+ assert not q2.flowStopped
+
ssn0.connection.close()
ssn1.connection.close()
cluster_test_logs.verify_logs()
@@ -405,7 +414,6 @@ acl allow all all
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_cluster(self):
- return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
cluster = self.cluster(2)
class Brokers:
def first(self): return cluster[0]
@@ -413,7 +421,6 @@ acl allow all all
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_cluster_join(self):
- return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
cluster = self.cluster(1)
class Brokers:
def first(self): return cluster[0]
@@ -422,6 +429,103 @@ acl allow all all
return cluster[1]
self.queue_flowlimit_test(Brokers())
+ def test_queue_flowlimit_replicate(self):
+ """ Verify that a queue which is in flow control BUT has drained BELOW
+ the flow control 'stop' threshold, is correctly replicated when a new
+ broker is added to the cluster.
+ """
+
+ class AsyncSender(Thread):
+ """Send a fixed number of msgs from a sender in a separate thread
+ so it may block without blocking the test.
+ """
+ def __init__(self, broker, address, count=1, size=4):
+ Thread.__init__(self)
+ self.daemon = True
+ self.broker = broker
+ self.queue = address
+ self.count = count
+ self.size = size
+ self.done = False
+
+ def run(self):
+ self.sender = subprocess.Popen(["qpid-send",
+ "--capacity=1",
+ "--content-size=%s" % self.size,
+ "--messages=%s" % self.count,
+ "--failover-updates",
+ "--connection-options={reconnect:true}",
+ "--address=%s" % self.queue,
+ "--broker=%s" % self.broker.host_port()])
+ self.sender.wait()
+ self.done = True
+
+ cluster = self.cluster(2)
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}")
+
+ # fire off the sending thread to broker[0], and wait until the queue
+ # hits flow control on broker[1]
+ sender = AsyncSender(cluster[0], "flq", count=110);
+ sender.start();
+
+ cluster[1].startQmf()
+ q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ deadline = time.time() + 10
+ while not q_obj.flowStopped and time.time() < deadline:
+ q_obj.update()
+ assert q_obj.flowStopped
+ assert not sender.done
+ assert q_obj.msgDepth < 110
+
+ # Now drain enough messages on broker[1] to drop below the flow stop
+ # threshold, but not relieve flow control...
+ receiver = subprocess.Popen(["qpid-receive",
+ "--messages=15",
+ "--timeout=1",
+ "--print-content=no",
+ "--failover-updates",
+ "--connection-options={reconnect:true}",
+ "--ack-frequency=1",
+ "--address=flq",
+ "--broker=%s" % cluster[1].host_port()])
+ receiver.wait()
+ q_obj.update()
+ assert q_obj.flowStopped
+ assert not sender.done
+ current_depth = q_obj.msgDepth
+
+ # add a new broker to the cluster, and verify that the queue is in flow
+ # control on that broker
+ cluster.start()
+ cluster[2].startQmf()
+ q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ assert q_obj.flowStopped
+ assert q_obj.msgDepth == current_depth
+
+ # now drain the queue on broker[2], and verify that the sender becomes
+ # unblocked
+ receiver = subprocess.Popen(["qpid-receive",
+ "--messages=95",
+ "--timeout=1",
+ "--print-content=no",
+ "--failover-updates",
+ "--connection-options={reconnect:true}",
+ "--ack-frequency=1",
+ "--address=flq",
+ "--broker=%s" % cluster[2].host_port()])
+ receiver.wait()
+ q_obj.update()
+ assert not q_obj.flowStopped
+ assert q_obj.msgDepth == 0
+
+ # verify that the sender has become unblocked
+ sender.join(timeout=5)
+ assert not sender.isAlive()
+ assert sender.done
+
+
def test_alternate_exchange_update(self):
"""Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
cluster = self.cluster(1)
@@ -688,6 +792,41 @@ class LongTests(BrokerTest):
for i in xrange(1000): cluster[0].connect().close()
cluster_test_logs.verify_logs()
+ def test_flowlimit_failover(self):
+ """Test fail-over during continuous send-receive with flow control
+ active.
+ """
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ #for b in cluster: ErrorGenerator(b)
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
+
+ receiver = NumberedReceiver(cluster[2])
+ receiver.start()
+ senders = [NumberedSender(cluster[i]) for i in range(1,3)]
+ for s in senders:
+ s.start()
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ #ErrorGenerator(b)
+ time.sleep(5)
+ #b = cluster[0]
+ #b.startQmf()
+ for s in senders:
+ s.stop()
+ receiver.stop()
+ for i in range(i, len(cluster)): cluster[i].kill()
+
class StoreTests(BrokerTest):
"""