diff options
author | Alan Conway <aconway@apache.org> | 2012-02-22 18:49:55 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-22 18:49:55 +0000 |
commit | e369bfd7b5f951b45e848731835814e43165402f (patch) | |
tree | 3cc62021b3c5c0a8801f63976843b2f9a3f79577 | |
parent | 3314a5cb4d14e94ed8fa29a1ba6348d10d27fdcf (diff) | |
download | qpid-python-e369bfd7b5f951b45e848731835814e43165402f.tar.gz |
QPID-3603: Test HA replication of LVQ, priority and ring queues.
Also fix one bug causing problems with LVQ replication.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1292444 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 126 |
2 files changed, 122 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 0017cc82cd..98b431fc10 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -155,9 +155,6 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const Fiel QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() << " to " << position); assert(queue->getPosition() <= position); - //TODO aconway 2011-12-14: Optimize this? - for (SequenceNumber i = queue->getPosition(); i < position; ++i) - dequeue(i,l); queue->setPosition(position); } else { msg.deliverTo(queue); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 97de0d1f77..982a405ada 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -18,8 +18,9 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math from qpid.messaging import Message, NotFound, ConnectionError, Connection +from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG @@ -48,7 +49,6 @@ class HaBroker(Broker): assert os.system( "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0 - class ShortTests(BrokerTest): """Short HA functionality tests.""" @@ -109,7 +109,7 @@ class ShortTests(BrokerTest): s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4")) s3.send(Message("7")) # Use old connection to unbind - us = primary.connect_old().session(str(qpid.datatypes.uuid4())) + us = primary.connect_old().session(str(uuid4())) us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4") p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped # Need a marker so we can wait till sync is done. @@ -298,6 +298,124 @@ class ShortTests(BrokerTest): self.assert_browse_backup(brokers[1], "q", ["a","b"]) for b in brokers[1:]: b.kill() + def test_lvq(self): + """Verify that we replicate to an LVQ correctly""" + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':messages}}}}") + def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) + for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: + send(*kv) + self.assert_browse_backup(backup, "lvq", ["b-1", "a-3", "c-2"]) + send("b","b-2") + self.assert_browse_backup(backup, "lvq", ["a-3", "c-2", "b-2"]) + send("c","c-3") + self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3"]) + send("d","d-1") + self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"]) + + def test_ring(self): + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':messages}}}}") + for i in range(10): s.send(Message(str(i))) + self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)]) + + def test_reject(self): + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':messages}}}}") + try: + for i in range(10): s.send(Message(str(i)), sync=False) + except qpid.messaging.exceptions.TargetCapacityExceeded: pass + self.assert_browse_backup(backup, "q", [str(i) for i in range(0,5)]) + + def test_priority(self): + """Verify priority queues replicate correctly""" + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + session = primary.connect().session() + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':messages}}}}") + priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] + for p in priorities: s.send(Message(priority=p)) + self.wait_backup(backup, "priority-queue") + r = self.connect_admin(backup).session().receiver("priority-queue") + received = [r.fetch().priority for i in priorities] + self.assertEqual(sorted(priorities, reverse=True), received) + + def test_priority_fairshare(self): + """Verify priority queues replicate correctly""" + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + session = primary.connect().session() + levels = 8 + priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3] + limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2} + limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()]) + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':messages}}}}"%(levels,limit_policy)) + messages = [Message(content=str(uuid4()), priority = p) for p in priorities] + for m in messages: s.send(m) + self.wait_backup(backup, s.target) + r = self.connect_admin(backup).session().receiver("priority-queue") + received = [r.fetch().content for i in priorities] + sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True) + fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)] + self.assertEqual(received, fair) + + def test_priority_ring(self): + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':messages}}}}") + priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] + for p in priorities: s.send(Message(priority=p)) + # FIXME aconway 2012-02-22: there is a bug in priority ring queues that allows a low + # priority message to displace a high one. The following commented-out assert_browse + # is for the correct result, the uncommented one is for the actualy buggy result. + # See https://issues.apache.org/jira/browse/QPID-3866 + # + # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) + self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) + +def fairshare(msgs, limit, levels): + """ + Generator to return prioritised messages in expected order for a given fairshare limit + """ + count = 0 + last_priority = None + postponed = [] + while msgs or postponed: + if not msgs: + msgs = postponed + count = 0 + last_priority = None + postponed = [] + msg = msgs.pop(0) + if last_priority and priority_level(msg.priority, levels) == last_priority: + count += 1 + else: + last_priority = priority_level(msg.priority, levels) + count = 1 + l = limit(last_priority) + if (l and count > l): + postponed.append(msg) + else: + yield msg + return + +def priority_level(value, levels): + """ + Method to determine which of a distinct number of priority levels + a given value falls into. + """ + offset = 5-math.ceil(levels/2.0) + return min(max(value - offset, 0), levels-1) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" @@ -311,7 +429,7 @@ class LongTests(BrokerTest): """Test failover with continuous send-receive""" # FIXME aconway 2012-02-03: fails due to dropped messages, # known issue: sending messages to new primary before - # backups are ready. + # backups are ready. Enable when fixed. # Start a cluster, all members will be killed during the test. brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) |