diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py new file mode 100755 index 0000000000..97de0d1f77 --- /dev/null +++ b/qpid/cpp/src/tests/ha_tests.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil +from qpid.messaging import Message, NotFound, ConnectionError, Connection +from brokertest import * +from threading import Thread, Lock, Condition +from logging import getLogger, WARN, ERROR, DEBUG + + +log = getLogger("qpid.ha-tests") + +class HaBroker(Broker): + def __init__(self, test, args=[], broker_url=None, **kwargs): + assert BrokerTest.ha_lib, "Cannot locate HA plug-in" + args=["--load-module", BrokerTest.ha_lib, + # FIXME aconway 2012-02-13: workaround slow link failover. + "--link-maintenace-interval=0.1", + "--ha-enable=yes"] + if broker_url: args += [ "--ha-broker-url", broker_url ] + Broker.__init__(self, test, args, **kwargs) + + def promote(self): + assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0 + + def set_client_url(self, url): + assert os.system( + "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0 + + def set_broker_url(self, url): + assert os.system( + "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0 + + +class ShortTests(BrokerTest): + """Short HA functionality tests.""" + + # Wait for an address to become valid. + def wait(self, session, address): + def check(): + try: + session.sender(address) + return True + except NotFound: return False + assert retry(check), "Timed out waiting for %s"%(address) + + # Wait for address to become valid on a backup broker. + def wait_backup(self, backup, address): + bs = self.connect_admin(backup).session() + self.wait(bs, address) + bs.connection.close() + + # Combines wait_backup and assert_browse_retry + def assert_browse_backup(self, backup, queue, expected, **kwargs): + bs = self.connect_admin(backup).session() + self.wait(bs, queue) + self.assert_browse_retry(bs, queue, expected, **kwargs) + bs.connection.close() + + def assert_missing(self, session, address): + try: + session.receiver(address) + self.fail("Should not have been replicated: %s"%(address)) + except NotFound: pass + + def connect_admin(self, backup, **kwargs): + """Connect to a backup broker as an admin connection""" + return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) + + def test_replication(self): + """Test basic replication of configuration and messages before and + after backup has connected""" + + def queue(name, replicate): + return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) + + def exchange(name, replicate, bindq): + return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) + def setup(p, prefix, primary): + """Create config, send messages on the primary p""" + s = p.sender(queue(prefix+"q1", "messages")) + for m in ["a", "b", "1"]: s.send(Message(m)) + # Test replication of dequeue + self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") + p.acknowledge() + p.sender(queue(prefix+"q2", "configuration")).send(Message("2")) + p.sender(queue(prefix+"q3", "none")).send(Message("3")) + p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4")) + p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5")) + # Test unbind + p.sender(queue(prefix+"q4", "messages")).send(Message("6")) + s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4")) + s3.send(Message("7")) + # Use old connection to unbind + us = primary.connect_old().session(str(qpid.datatypes.uuid4())) + us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4") + p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped + # Need a marker so we can wait till sync is done. + p.sender(queue(prefix+"x", "configuration")) + + def verify(b, prefix, p): + """Verify setup was replicated to backup b""" + + # Wait for configuration to replicate. + self.wait(b, prefix+"x"); + self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) + + self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") + p.acknowledge() + self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) + + self.assert_browse_retry(b, prefix+"q2", []) # configuration only + self.assert_missing(b, prefix+"q3") + b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all + self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) + b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration + self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) + + b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind. + self.assert_browse_retry(b, prefix+"q4", ["6","7"]) + + primary = HaBroker(self, name="primary") + primary.promote() + p = primary.connect().session() + + # Create config, send messages before starting the backup, to test catch-up replication. + setup(p, "1", primary) + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + # Create config, send messages after starting the backup, to test steady-state replication. + setup(p, "2", primary) + + # Verify the data on the backup + b = self.connect_admin(backup).session() + verify(b, "1", p) + verify(b, "2", p) + # Test a series of messages, enqueue all then dequeue all. + s = p.sender(queue("foo","messages")) + self.wait(b, "foo") + msgs = [str(i) for i in range(10)] + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(p, "foo", msgs) + self.assert_browse_retry(b, "foo", msgs) + r = p.receiver("foo") + for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) + p.acknowledge() + self.assert_browse_retry(p, "foo", []) + self.assert_browse_retry(b, "foo", []) + + # Another series, this time verify each dequeue individually. + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(p, "foo", msgs) + self.assert_browse_retry(b, "foo", msgs) + for i in range(len(msgs)): + self.assertEqual(msgs[i], r.fetch(timeout=0).content) + p.acknowledge() + self.assert_browse_retry(p, "foo", msgs[i+1:]) + self.assert_browse_retry(b, "foo", msgs[i+1:]) + + def qpid_replicate(self, value="messages"): + return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value + + def test_sync(self): + def queue(name, replicate): + return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) + primary = HaBroker(self, name="primary") + primary.promote() + p = primary.connect().session() + s = p.sender(queue("q","messages")) + for m in [str(i) for i in range(0,10)]: s.send(m) + s.sync() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + for m in [str(i) for i in range(10,20)]: s.send(m) + s.sync() + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + for m in [str(i) for i in range(20,30)]: s.send(m) + s.sync() + + msgs = [str(i) for i in range(30)] + b1 = self.connect_admin(backup1).session() + self.wait(b1, "q"); + self.assert_browse_retry(b1, "q", msgs) + b2 = self.connect_admin(backup2).session() + self.wait(b2, "q"); + self.assert_browse_retry(b2, "q", msgs) + + def test_send_receive(self): + """Verify sequence numbers of messages sent by qpid-send""" + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + sender = self.popen( + ["qpid-send", + "--broker", primary.host_port(), + "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), + "--messages=1000", + "--content-string=x" + ]) + receiver = self.popen( + ["qpid-receive", + "--broker", primary.host_port(), + "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), + "--messages=990", + "--timeout=10" + ]) + try: + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn) + self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn) + except: + print self.browse(primary.connect().session(), "q", transform=sn) + print self.browse(self.connect_admin(backup1).session(), "q", transform=sn) + print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) + raise + + def test_failover_python(self): + """Verify that backups rejects connections and that fail-over works in python client""" + getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + # Check that backup rejects normal connections + try: + backup.connect().session() + self.fail("Expected connection to backup to fail") + except ConnectionError: pass + # Check that admin connections are allowed to backup. + self.connect_admin(backup).close() + + # Test discovery: should connect to primary after reject by backup + c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) + s = c.session() + sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate())) + self.wait_backup(backup, "q") + sender.send("foo") + primary.kill() + assert retry(lambda: not is_running(primary.pid)) + backup.promote() + self.assert_browse_retry(s, "q", ["foo"]) + c.close() + + def test_failover_cpp(self): + """Verify that failover works in the C++ client.""" + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + url="%s,%s"%(primary.host_port(), backup.host_port()) + primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) + self.wait_backup(backup, "q") + + sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) + receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False) + receiver.start() + sender.start() + self.wait_backup(backup, "q") + assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru + + primary.kill() + assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die + backup.promote() + n = receiver.received # Make sure we are still running + assert retry(lambda: receiver.received > n + 10) + sender.stop() + receiver.stop() + + def test_backup_failover(self): + brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) + for name in ["a","b","c"] ] + url = ",".join([b.host_port() for b in brokers]) + for b in brokers: b.set_broker_url(url) + brokers[0].promote() + brokers[0].connect().session().sender( + "q;{create:always,%s}"%(self.qpid_replicate())).send("a") + for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) + brokers[0].kill() + brokers[2].promote() # c must fail over to b. + brokers[2].connect().session().sender("q").send("b") + self.assert_browse_backup(brokers[1], "q", ["a","b"]) + for b in brokers[1:]: b.kill() + +class LongTests(BrokerTest): + """Tests that can run for a long time if -DDURATION=<minutes> is set""" + + def duration(self): + d = self.config.defines.get("DURATION") + if d: return float(d)*60 + else: return 3 # Default is to be quick + + + def disable_test_failover(self): + """Test failover with continuous send-receive""" + # FIXME aconway 2012-02-03: fails due to dropped messages, + # known issue: sending messages to new primary before + # backups are ready. + + # Start a cluster, all members will be killed during the test. + brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) + for name in ["ha0","ha1","ha2"] ] + url = ",".join([b.host_port() for b in brokers]) + for b in brokers: b.set_broker_url(url) + brokers[0].promote() + + # Start sender and receiver threads + sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) + receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) + receiver.start() + sender.start() + # Wait for sender & receiver to get up and running + assert retry(lambda: receiver.received > 100) + # Kill and restart brokers in a cycle: + endtime = time.time() + self.duration() + i = 0 + while time.time() < endtime or i < 3: # At least 3 iterations + sender.sender.assert_running() + receiver.receiver.assert_running() + port = brokers[i].port() + brokers[i].kill() + brokers.append( + HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, + expect=EXPECT_EXIT_FAIL)) + i += 1 + brokers[i].promote() + n = receiver.received # Verify we're still running + def enough(): + receiver.check() # Verify no exceptions + return receiver.received > n + 100 + assert retry(enough, timeout=5) + + sender.stop() + receiver.stop() + for b in brokers[i:]: b.kill() + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) |