diff options
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 142 |
1 files changed, 79 insertions, 63 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index e9d44c21e0..10d5fc0db2 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -29,13 +29,17 @@ from qpidtoollibs import BrokerAgent log = getLogger("qpid.ha-tests") class HaBroker(Broker): - def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs): + def __init__(self, test, args=[], broker_url=None, ha_cluster=True, + ha_replicate_default="all", **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) - args.extend(["--load-module", BrokerTest.ha_lib, - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-cluster=%s"%ha_cluster]) + args += ["--load-module", BrokerTest.ha_lib, + "--log-enable=info+", "--log-enable=debug+:ha::", + # FIXME aconway 2012-02-13: workaround slow link failover. + "--link-maintenace-interval=0.1", + "--ha-cluster=%s"%ha_cluster] + if ha_replicate_default is not None: + args += [ "--ha-replicate-default=%s"%ha_replicate_default ] if broker_url: args.extend([ "--ha-brokers", broker_url ]) Broker.__init__(self, test, args, **kwargs) self.commands=os.getenv("PYTHON_COMMANDS") @@ -64,6 +68,10 @@ class HaBroker(Broker): assert os.system( "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0 + def connect_admin(self, **kwargs): + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + + class HaCluster(object): _cluster_count = 0 @@ -72,9 +80,9 @@ class HaCluster(object): self.test = test self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] HaCluster._cluster_count += 1 - self[0].promote() self.url = ",".join([b.host_port() for b in self]) for b in self: b.set_broker_url(self.url) + self[0].promote() def connect(self, i): """Connect with reconnect_urls""" @@ -98,8 +106,6 @@ class HaCluster(object): def __iter__(self): return self._brokers.__iter__() -def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value - class HaTest(BrokerTest): """Base class for HA test cases, defines convenience functions""" @@ -114,13 +120,13 @@ class HaTest(BrokerTest): # Wait for address to become valid on a backup broker. def wait_backup(self, backup, address): - bs = self.connect_admin(backup).session() + bs = backup.connect_admin().session() self.wait(bs, address) bs.connection.close() # Combines wait_backup and assert_browse_retry def assert_browse_backup(self, backup, queue, expected, **kwargs): - bs = self.connect_admin(backup).session() + bs = backup.connect_admin().session() self.wait(bs, queue) self.assert_browse_retry(bs, queue, expected, **kwargs) bs.connection.close() @@ -128,12 +134,9 @@ class HaTest(BrokerTest): def assert_missing(self, session, address): try: session.receiver(address) - self.fail("Should not have been replicated: %s"%(address)) + self.fail("Expected NotFound: %s"%(address)) except NotFound: pass - def connect_admin(self, backup, **kwargs): - """Connect to a backup broker as an admin connection""" - return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) class ReplicationTests(HaTest): """Correctness tests for HA replication.""" @@ -173,7 +176,6 @@ class ReplicationTests(HaTest): def verify(b, prefix, p): """Verify setup was replicated to backup b""" - # Wait for configuration to replicate. self.wait(b, prefix+"x"); self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) @@ -203,7 +205,7 @@ class ReplicationTests(HaTest): setup(p, "2", primary) # Verify the data on the backup - b = self.connect_admin(backup).session() + b = backup.connect_admin().session() verify(b, "1", p) verify(b, "2", p) # Test a series of messages, enqueue all then dequeue all. @@ -230,12 +232,10 @@ class ReplicationTests(HaTest): self.assert_browse_retry(b, "foo", msgs[i+1:]) def test_sync(self): - def queue(name, replicate): - return "%s;{create:always,%s}"%(name, qr_node(replicate)) primary = HaBroker(self, name="primary") primary.promote() p = primary.connect().session() - s = p.sender(queue("q","all")) + s = p.sender("q;{create:always}") for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) @@ -246,46 +246,37 @@ class ReplicationTests(HaTest): s.sync() msgs = [str(i) for i in range(30)] - b1 = self.connect_admin(backup1).session() + b1 = backup1.connect_admin().session() self.wait(b1, "q"); self.assert_browse_retry(b1, "q", msgs) - b2 = self.connect_admin(backup2).session() + b2 = backup2.connect_admin().session() self.wait(b2, "q"); self.assert_browse_retry(b2, "q", msgs) def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" + brokers = HaCluster(self, 3) getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - primary = HaBroker(self, name="primary") - primary.promote() - backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) - backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) sender = self.popen( ["qpid-send", - "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(qr_node("all")), + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", "--messages=1000", "--content-string=x" ]) receiver = self.popen( ["qpid-receive", - "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(qr_node("all")), + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", "--messages=990", "--timeout=10" ]) - try: - self.assertEqual(sender.wait(), 0) - self.assertEqual(receiver.wait(), 0) - expect = [long(i) for i in range(991, 1001)] - sn = lambda m: m.properties["sn"] - self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn) - self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn) - except: - print self.browse(primary.connect().session(), "q", transform=sn) - print self.browse(self.connect_admin(backup1).session(), "q", transform=sn) - print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) - raise + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + self.assert_browse_backup(brokers[1], "q", expect, transform=sn) + self.assert_browse_backup(brokers[2], "q", expect, transform=sn) def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" @@ -299,12 +290,12 @@ class ReplicationTests(HaTest): self.fail("Expected connection to backup to fail") except ConnectionError: pass # Check that admin connections are allowed to backup. - self.connect_admin(backup).close() + backup.connect_admin().close() # Test discovery: should connect to primary after reject by backup c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) s = c.session() - sender = s.sender("q;{create:always,%s}"%(qr_node())) + sender = s.sender("q;{create:always}") self.wait_backup(backup, "q") sender.send("foo") primary.kill() @@ -319,7 +310,7 @@ class ReplicationTests(HaTest): primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) - primary.connect().session().sender("q;{create:always,%s}"%(qr_node())) + primary.connect().session().sender("q;{create:always}") self.wait_backup(backup, "q") sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) @@ -340,8 +331,7 @@ class ReplicationTests(HaTest): def test_backup_failover(self): """Verify that a backup broker fails over and recovers queue state""" brokers = HaCluster(self, 3) - brokers[0].connect().session().sender( - "q;{create:always,%s}"%(qr_node())).send("a") + brokers[0].connect().session().sender("q;{create:always}").send("a") for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) brokers[0].expect = EXPECT_EXIT_FAIL brokers.kill(0) @@ -362,11 +352,11 @@ class ReplicationTests(HaTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"]) + primary = HaBroker(self, name="primary", ha_cluster=False) pc = primary.connect() ps = pc.session().sender("q;{create:always}") pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) + backup = HaBroker(self, name="backup", ha_cluster=False) br = backup.connect().session().receiver("q;{create:always}") # Set up replication with qpid-ha @@ -392,9 +382,9 @@ class ReplicationTests(HaTest): cluster = HaCluster(self, 2) primary = cluster[0] pc = cluster.connect(0) - ps = pc.session().sender("q;{create:always,%s}"%qr_node("all")) - pr = pc.session().receiver("q;{create:always,%s}"%qr_node("all")) - backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False) br = backup.connect().session().receiver("q;{create:always}") backup.replicate(cluster.url, "q") ps.send("a") @@ -413,7 +403,7 @@ class ReplicationTests(HaTest): primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':all}}}}") + s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: send(*kv) @@ -426,19 +416,21 @@ class ReplicationTests(HaTest): self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"]) def test_ring(self): + """Test replication with the ring queue policy""" primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':all}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}") for i in range(10): s.send(Message(str(i))) self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)]) def test_reject(self): + """Test replication with the reject queue policy""" getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':all}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}") try: for i in range(10): s.send(Message(str(i)), sync=False) except qpid.messaging.exceptions.TargetCapacityExceeded: pass @@ -450,12 +442,12 @@ class ReplicationTests(HaTest): primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) session = primary.connect().session() - s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':all}}}}") + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) # Can't use browse_backup as browser sees messages in delivery order not priority. self.wait_backup(backup, "priority-queue") - r = self.connect_admin(backup).session().receiver("priority-queue") + r = backup.connect_admin().session().receiver("priority-queue") received = [r.fetch().priority for i in priorities] self.assertEqual(sorted(priorities, reverse=True), received) @@ -469,11 +461,11 @@ class ReplicationTests(HaTest): priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3] limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2} limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()]) - s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':all}}}}"%(levels,limit_policy)) + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy)) messages = [Message(content=str(uuid4()), priority = p) for p in priorities] for m in messages: s.send(m) self.wait_backup(backup, s.target) - r = self.connect_admin(backup).session().receiver("priority-queue") + r = backup.connect_admin().session().receiver("priority-queue") received = [r.fetch().content for i in priorities] sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True) fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)] @@ -483,13 +475,14 @@ class ReplicationTests(HaTest): primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':all}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) - # FIXME aconway 2012-02-22: there is a bug in priority ring queues that allows a low - # priority message to displace a high one. The following commented-out assert_browse - # is for the correct result, the uncommented one is for the actualy buggy result. - # See https://issues.apache.org/jira/browse/QPID-3866 + # FIXME aconway 2012-02-22: there is a bug in priority ring + # queues that allows a low priority message to displace a high + # one. The following commented-out assert_browse is for the + # correct result, the uncommented one is for the actualy buggy + # result. See https://issues.apache.org/jira/browse/QPID-3866 # # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) @@ -539,6 +532,29 @@ class ReplicationTests(HaTest): for t in tests: t.verify(self, backup1) for t in tests: t.verify(self, backup2) + def test_replicate_default(self): + """Make sure we don't replicate if ha-replicate-default is unspecified or none""" + cluster1 = HaCluster(self, 2, ha_replicate_default=None) + c1 = cluster1[0].connect().session().sender("q;{create:always}") + cluster2 = HaCluster(self, 2, ha_replicate_default="none") + cluster2[0].connect().session().sender("q;{create:always}") + time.sleep(.1) # Give replication a chance. + try: + cluster1[1].connect_admin().session().receiver("q") + self.fail("Excpected no-such-queue exception") + except NotFound: pass + try: + cluster2[1].connect_admin().session().receiver("q") + self.fail("Excpected no-such-queue exception") + except NotFound: pass + + def test_invalid_default(self): + """Verify that a queue with an invalid qpid.replicate gets default treatment""" + cluster = HaCluster(self, 2, ha_replicate_default="all") + c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") + self.wait_backup(cluster[1], "q") + + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |