diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
tree | 1391da89470593209466df68c0b40b89c14963b1 /cpp/src/tests/ha_tests.py | |
parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 523 |
1 files changed, 412 insertions, 111 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 97de0d1f77..827cb7dca9 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -18,71 +18,125 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil -from qpid.messaging import Message, NotFound, ConnectionError, Connection +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection +from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG +from qpidtoollibs import BrokerAgent - -log = getLogger("qpid.ha-tests") +log = getLogger(__name__) class HaBroker(Broker): - def __init__(self, test, args=[], broker_url=None, **kwargs): + def __init__(self, test, args=[], broker_url=None, ha_cluster=True, + ha_replicate="all", **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - args=["--load-module", BrokerTest.ha_lib, - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-enable=yes"] - if broker_url: args += [ "--ha-broker-url", broker_url ] + args = copy(args) + args += ["--load-module", BrokerTest.ha_lib, + "--log-enable=info+", "--log-enable=debug+:ha::", + # FIXME aconway 2012-02-13: workaround slow link failover. + "--link-maintenace-interval=0.1", + "--ha-cluster=%s"%ha_cluster] + if ha_replicate is not None: + args += [ "--ha-replicate=%s"%ha_replicate ] + if broker_url: args.extend([ "--ha-brokers", broker_url ]) Broker.__init__(self, test, args, **kwargs) + self.commands=os.getenv("PYTHON_COMMANDS") + assert os.path.isdir(self.commands) + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. def promote(self): - assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0 + assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0 def set_client_url(self, url): assert os.system( - "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0 + "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0 def set_broker_url(self, url): assert os.system( - "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0 - - -class ShortTests(BrokerTest): - """Short HA functionality tests.""" - - # Wait for an address to become valid. - def wait(self, session, address): - def check(): - try: - session.sender(address) - return True - except NotFound: return False - assert retry(check), "Timed out waiting for %s"%(address) - - # Wait for address to become valid on a backup broker. - def wait_backup(self, backup, address): - bs = self.connect_admin(backup).session() - self.wait(bs, address) - bs.connection.close() - - # Combines wait_backup and assert_browse_retry - def assert_browse_backup(self, backup, queue, expected, **kwargs): - bs = self.connect_admin(backup).session() - self.wait(bs, queue) - self.assert_browse_retry(bs, queue, expected, **kwargs) - bs.connection.close() - - def assert_missing(self, session, address): + "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0 + + def replicate(self, from_broker, queue): + assert os.system( + "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + + def config_replicate(self, from_broker, queue): + assert os.system( + "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + + def config_declare(self, queue, replication): + assert os.system( + "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0 + + def connect_admin(self, **kwargs): + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + + def wait_backup(self, address): + """Wait for address to become valid on a backup broker.""" + bs = self.connect_admin().session() + try: wait_address(bs, address) + finally: bs.connection.close() + + def assert_browse_backup(self, queue, expected, **kwargs): + """Combines wait_backup and assert_browse_retry.""" + bs = self.connect_admin().session() try: - session.receiver(address) - self.fail("Should not have been replicated: %s"%(address)) - except NotFound: pass + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + +class HaCluster(object): + _cluster_count = 0 + + def __init__(self, test, n, **kwargs): + """Start a cluster of n brokers""" + self.test = test + self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] + HaCluster._cluster_count += 1 + self.url = ",".join([b.host_port() for b in self]) + for b in self: b.set_broker_url(self.url) + self[0].promote() + + def connect(self, i): + """Connect with reconnect_urls""" + return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) + + def kill(self, i): + """Kill broker i, promote broker i+1""" + self[i].kill() + self[i].expect = EXPECT_EXIT_FAIL + self[(i+1) % len(self)].promote() + + def bounce(self, i): + """Stop and restart a broker in a cluster.""" + self.kill(i) + b = self[i] + self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url) + + # Behave like a list of brokers. + def __len__(self): return len(self._brokers) + def __getitem__(self,index): return self._brokers[index] + def __iter__(self): return self._brokers.__iter__() + +def wait_address(session, address): + """Wait for an address to become valid.""" + def check(): + try: + session.sender(address) + return True + except NotFound: return False + assert retry(check), "Timed out waiting for address %s"%(address) + +def assert_missing(session, address): + """Assert that the address is _not_ valid""" + try: + session.receiver(address) + self.fail("Expected NotFound: %s"%(address)) + except NotFound: pass - def connect_admin(self, backup, **kwargs): - """Connect to a backup broker as an admin connection""" - return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) +class ReplicationTests(BrokerTest): + """Correctness tests for HA replication.""" def test_replication(self): """Test basic replication of configuration and messages before and @@ -95,21 +149,21 @@ class ShortTests(BrokerTest): return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) def setup(p, prefix, primary): """Create config, send messages on the primary p""" - s = p.sender(queue(prefix+"q1", "messages")) + s = p.sender(queue(prefix+"q1", "all")) for m in ["a", "b", "1"]: s.send(Message(m)) # Test replication of dequeue self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") p.acknowledge() p.sender(queue(prefix+"q2", "configuration")).send(Message("2")) p.sender(queue(prefix+"q3", "none")).send(Message("3")) - p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4")) - p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5")) + p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4")) + p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5")) # Test unbind - p.sender(queue(prefix+"q4", "messages")).send(Message("6")) - s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4")) + p.sender(queue(prefix+"q4", "all")).send(Message("6")) + s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4")) s3.send(Message("7")) # Use old connection to unbind - us = primary.connect_old().session(str(qpid.datatypes.uuid4())) + us = primary.connect_old().session(str(uuid4())) us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4") p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped # Need a marker so we can wait till sync is done. @@ -117,9 +171,8 @@ class ShortTests(BrokerTest): def verify(b, prefix, p): """Verify setup was replicated to backup b""" - # Wait for configuration to replicate. - self.wait(b, prefix+"x"); + wait_address(b, prefix+"x"); self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") @@ -127,7 +180,7 @@ class ShortTests(BrokerTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) self.assert_browse_retry(b, prefix+"q2", []) # configuration only - self.assert_missing(b, prefix+"q3") + assert_missing(b, prefix+"q3") b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration @@ -147,12 +200,12 @@ class ShortTests(BrokerTest): setup(p, "2", primary) # Verify the data on the backup - b = self.connect_admin(backup).session() + b = backup.connect_admin().session() verify(b, "1", p) verify(b, "2", p) # Test a series of messages, enqueue all then dequeue all. - s = p.sender(queue("foo","messages")) - self.wait(b, "foo") + s = p.sender(queue("foo","all")) + wait_address(b, "foo") msgs = [str(i) for i in range(10)] for m in msgs: s.send(Message(m)) self.assert_browse_retry(p, "foo", msgs) @@ -173,16 +226,11 @@ class ShortTests(BrokerTest): self.assert_browse_retry(p, "foo", msgs[i+1:]) self.assert_browse_retry(b, "foo", msgs[i+1:]) - def qpid_replicate(self, value="messages"): - return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value - def test_sync(self): - def queue(name, replicate): - return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) primary = HaBroker(self, name="primary") primary.promote() p = primary.connect().session() - s = p.sender(queue("q","messages")) + s = p.sender("q;{create:always}") for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) @@ -193,49 +241,39 @@ class ShortTests(BrokerTest): s.sync() msgs = [str(i) for i in range(30)] - b1 = self.connect_admin(backup1).session() - self.wait(b1, "q"); + b1 = backup1.connect_admin().session() + wait_address(b1, "q"); self.assert_browse_retry(b1, "q", msgs) - b2 = self.connect_admin(backup2).session() - self.wait(b2, "q"); + b2 = backup2.connect_admin().session() + wait_address(b2, "q"); self.assert_browse_retry(b2, "q", msgs) def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" - primary = HaBroker(self, name="primary") - primary.promote() - backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) - backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + brokers = HaCluster(self, 3) sender = self.popen( ["qpid-send", - "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", "--messages=1000", "--content-string=x" ]) receiver = self.popen( ["qpid-receive", - "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", "--messages=990", "--timeout=10" ]) - try: - self.assertEqual(sender.wait(), 0) - self.assertEqual(receiver.wait(), 0) - expect = [long(i) for i in range(991, 1001)] - sn = lambda m: m.properties["sn"] - self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn) - self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn) - except: - print self.browse(primary.connect().session(), "q", transform=sn) - print self.browse(self.connect_admin(backup1).session(), "q", transform=sn) - print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) - raise + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + brokers[1].assert_browse_backup("q", expect, transform=sn) + brokers[2].assert_browse_backup("q", expect, transform=sn) def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" - getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) @@ -245,13 +283,13 @@ class ShortTests(BrokerTest): self.fail("Expected connection to backup to fail") except ConnectionError: pass # Check that admin connections are allowed to backup. - self.connect_admin(backup).close() + backup.connect_admin().close() # Test discovery: should connect to primary after reject by backup c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) s = c.session() - sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate())) - self.wait_backup(backup, "q") + sender = s.sender("q;{create:always}") + backup.wait_backup("q") sender.send("foo") primary.kill() assert retry(lambda: not is_running(primary.pid)) @@ -265,14 +303,14 @@ class ShortTests(BrokerTest): primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) - primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) - self.wait_backup(backup, "q") + primary.connect().session().sender("q;{create:always}") + backup.wait_backup("q") sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False) receiver.start() sender.start() - self.wait_backup(backup, "q") + backup.wait_backup("q") assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru primary.kill() @@ -284,19 +322,276 @@ class ShortTests(BrokerTest): receiver.stop() def test_backup_failover(self): - brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) - for name in ["a","b","c"] ] - url = ",".join([b.host_port() for b in brokers]) - for b in brokers: b.set_broker_url(url) - brokers[0].promote() - brokers[0].connect().session().sender( - "q;{create:always,%s}"%(self.qpid_replicate())).send("a") - for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) - brokers[0].kill() - brokers[2].promote() # c must fail over to b. - brokers[2].connect().session().sender("q").send("b") - self.assert_browse_backup(brokers[1], "q", ["a","b"]) - for b in brokers[1:]: b.kill() + """Verify that a backup broker fails over and recovers queue state""" + brokers = HaCluster(self, 3) + brokers[0].connect().session().sender("q;{create:always}").send("a") + for b in brokers[1:]: b.assert_browse_backup("q", ["a"], msg=b) + brokers[0].expect = EXPECT_EXIT_FAIL + brokers.kill(0) + brokers[1].connect().session().sender("q").send("b") + brokers[2].assert_browse_backup("q", ["a","b"]) + s = brokers[1].connect().session() + self.assertEqual("a", s.receiver("q").fetch().content) + s.acknowledge() + brokers[2].assert_browse_backup("q", ["b"]) + + def test_qpid_config_replication(self): + """Set up replication via qpid-config""" + brokers = HaCluster(self,2) + brokers[0].config_declare("q","all") + brokers[0].connect().session().sender("q").send("foo") + brokers[1].assert_browse_backup("q", ["foo"]) + + def test_standalone_queue_replica(self): + """Test replication of individual queues outside of cluster mode""" + primary = HaBroker(self, name="primary", ha_cluster=False) + pc = primary.connect() + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False) + br = backup.connect().session().receiver("q;{create:always}") + + # Set up replication with qpid-ha + backup.replicate(primary.host_port(), "q") + ps.send("a") + backup.assert_browse_backup("q", ["a"]) + ps.send("b") + backup.assert_browse_backup("q", ["a", "b"]) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + backup.assert_browse_backup("q", ["b"]) + + # Set up replication with qpid-config + ps2 = pc.session().sender("q2;{create:always}") + backup.config_replicate(primary.host_port(), "q2"); + ps2.send("x") + backup.assert_browse_backup("q2", ["x"]) + + + def test_queue_replica_failover(self): + """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" + cluster = HaCluster(self, 2) + primary = cluster[0] + pc = cluster.connect(0) + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False) + br = backup.connect().session().receiver("q;{create:always}") + backup.replicate(cluster.url, "q") + ps.send("a") + backup.assert_browse_backup("q", ["a"]) + cluster.bounce(0) + backup.assert_browse_backup("q", ["a"]) + ps.send("b") + backup.assert_browse_backup("q", ["a", "b"]) + cluster.bounce(1) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + backup.assert_browse_backup("q", ["b"]) + + def test_lvq(self): + """Verify that we replicate to an LVQ correctly""" + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") + def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) + for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: + send(*kv) + backup.assert_browse_backup("lvq", ["b-1", "a-3", "c-2"]) + send("b","b-2") + backup.assert_browse_backup("lvq", ["a-3", "c-2", "b-2"]) + send("c","c-3") + backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3"]) + send("d","d-1") + backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"]) + + def test_ring(self): + """Test replication with the ring queue policy""" + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}") + for i in range(10): s.send(Message(str(i))) + backup.assert_browse_backup("q", [str(i) for i in range(5,10)]) + + def test_reject(self): + """Test replication with the reject queue policy""" + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}") + try: + for i in range(10): s.send(Message(str(i)), sync=False) + except qpid.messaging.exceptions.TargetCapacityExceeded: pass + backup.assert_browse_backup("q", [str(i) for i in range(0,5)]) + + def test_priority(self): + """Verify priority queues replicate correctly""" + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + session = primary.connect().session() + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}") + priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] + for p in priorities: s.send(Message(priority=p)) + # Can't use browse_backup as browser sees messages in delivery order not priority. + backup.wait_backup("priority-queue") + r = backup.connect_admin().session().receiver("priority-queue") + received = [r.fetch().priority for i in priorities] + self.assertEqual(sorted(priorities, reverse=True), received) + + def test_priority_fairshare(self): + """Verify priority queues replicate correctly""" + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + session = primary.connect().session() + levels = 8 + priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3] + limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2} + limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()]) + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy)) + messages = [Message(content=str(uuid4()), priority = p) for p in priorities] + for m in messages: s.send(m) + backup.wait_backup(s.target) + r = backup.connect_admin().session().receiver("priority-queue") + received = [r.fetch().content for i in priorities] + sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True) + fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)] + self.assertEqual(received, fair) + + def test_priority_ring(self): + primary = HaBroker(self, name="primary") + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}") + priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] + for p in priorities: s.send(Message(priority=p)) + + # FIXME aconway 2012-02-22: there is a bug in priority ring + # queues that allows a low priority message to displace a high + # one. The following commented-out assert_browse is for the + # correct result, the uncommented one is for the actualy buggy + # result. See https://issues.apache.org/jira/browse/QPID-3866 + # + # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) + backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority) + + def test_backup_acquired(self): + """Verify that acquired messages are backed up, for all queue types.""" + class Test: + def __init__(self, queue, arguments, expect): + self.queue = queue + self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%( + self.queue, ",".join(arguments + ["'qpid.replicate':all"])) + self.expect = [str(i) for i in expect] + + def send(self, connection): + """Send messages, then acquire one but don't acknowledge""" + s = connection.session() + for m in range(10): s.sender(self.address).send(str(m)) + s.receiver(self.address).fetch() + + def wait(self, brokertest, backup): + backup.wait_backup(self.queue) + + def verify(self, brokertest, backup): + backup.assert_browse_backup(self.queue, self.expect, msg=self.queue) + + tests = [ + Test("plain",[],range(10)), + Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)), + Test("priority",["'qpid.priorities':10"], range(10)), + Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)), + Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9]) + ] + + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + c = primary.connect() + for t in tests: t.send(c) # Send messages, leave one unacknowledged. + + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + # Wait for backups to catch up. + for t in tests: + t.wait(self, backup1) + t.wait(self, backup2) + # Verify acquired message was replicated + for t in tests: t.verify(self, backup1) + for t in tests: t.verify(self, backup2) + + def test_replicate_default(self): + """Make sure we don't replicate if ha-replicate is unspecified or none""" + cluster1 = HaCluster(self, 2, ha_replicate=None) + c1 = cluster1[0].connect().session().sender("q;{create:always}") + cluster2 = HaCluster(self, 2, ha_replicate="none") + cluster2[0].connect().session().sender("q;{create:always}") + time.sleep(.1) # Give replication a chance. + try: + cluster1[1].connect_admin().session().receiver("q") + self.fail("Excpected no-such-queue exception") + except NotFound: pass + try: + cluster2[1].connect_admin().session().receiver("q") + self.fail("Excpected no-such-queue exception") + except NotFound: pass + + def test_invalid_default(self): + """Verify that a queue with an invalid qpid.replicate gets default treatment""" + cluster = HaCluster(self, 2, ha_replicate="all") + c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") + cluster[1].wait_backup("q") + + def test_exclusive_queue(self): + """Ensure that we can back-up exclusive queues, i.e. the replicating + subscriptions are exempt from the exclusivity""" + cluster = HaCluster(self, 2) + def test(addr): + c = cluster[0].connect() + q = addr.split(";")[0] + r = c.session().receiver(addr) + try: c.session().receiver(addr); self.fail("Expected exclusive exception") + except ReceiverError: pass + s = c.session().sender(q).send(q) + cluster[1].assert_browse_backup(q, [q]) + test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); + test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") + +def fairshare(msgs, limit, levels): + """ + Generator to return prioritised messages in expected order for a given fairshare limit + """ + count = 0 + last_priority = None + postponed = [] + while msgs or postponed: + if not msgs: + msgs = postponed + count = 0 + last_priority = None + postponed = [] + msg = msgs.pop(0) + if last_priority and priority_level(msg.priority, levels) == last_priority: + count += 1 + else: + last_priority = priority_level(msg.priority, levels) + count = 1 + l = limit(last_priority) + if (l and count > l): + postponed.append(msg) + else: + yield msg + return + +def priority_level(value, levels): + """ + Method to determine which of a distinct number of priority levels + a given value falls into. + """ + offset = 5-math.ceil(levels/2.0) + return min(max(value - offset, 0), levels-1) class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" @@ -311,7 +606,7 @@ class LongTests(BrokerTest): """Test failover with continuous send-receive""" # FIXME aconway 2012-02-03: fails due to dropped messages, # known issue: sending messages to new primary before - # backups are ready. + # backups are ready. Enable when fixed. # Start a cluster, all members will be killed during the test. brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) @@ -352,4 +647,10 @@ class LongTests(BrokerTest): if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) - os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) + qpid_ha = os.getenv("QPID_HA_EXEC") + if qpid_ha and os.path.exists(qpid_ha): + os.execvp("qpid-python-test", + ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) + else: + print "Skipping ha_tests, qpid_ha not available" + |