diff options
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 539 |
1 files changed, 436 insertions, 103 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 827cb7dca9..d25281eed5 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -18,59 +18,123 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest +import traceback +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition -from logging import getLogger, WARN, ERROR, DEBUG +from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent +from uuid import UUID log = getLogger(__name__) +class QmfAgent(object): + """Access to a QMF broker agent.""" + def __init__(self, address, **kwargs): + self._connection = Connection.establish( + address, client_properties={"qpid.ha-admin":1}, **kwargs) + self._agent = BrokerAgent(self._connection) + assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address) + + def __getattr__(self, name): + a = getattr(self._agent, name) + return a + +class Credentials(object): + """SASL credentials: username, password, and mechanism""" + def __init__(self, username, password, mechanism): + (self.username, self.password, self.mechanism) = (username, password, mechanism) + + def __str__(self): return "Credentials%s"%(self.tuple(),) + + def tuple(self): return (self.username, self.password, self.mechanism) + + def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) + class HaBroker(Broker): - def __init__(self, test, args=[], broker_url=None, ha_cluster=True, - ha_replicate="all", **kwargs): + """Start a broker with HA enabled + @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. + """ + def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", + client_credentials=None, **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=info+", "--log-enable=debug+:ha::", + "--log-enable=debug+:ha::", # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] if ha_replicate is not None: args += [ "--ha-replicate=%s"%ha_replicate ] - if broker_url: args.extend([ "--ha-brokers", broker_url ]) + if brokers_url: args += [ "--ha-brokers-url", brokers_url ] Broker.__init__(self, test, args, **kwargs) - self.commands=os.getenv("PYTHON_COMMANDS") - assert os.path.isdir(self.commands) + self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") + assert os.path.exists(self.qpid_ha_path) + self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") + assert os.path.exists(self.qpid_config_path) getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - - def promote(self): - assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0 - - def set_client_url(self, url): - assert os.system( - "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0 - - def set_broker_url(self, url): - assert os.system( - "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0 - - def replicate(self, from_broker, queue): - assert os.system( - "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + self.qpid_ha_script=import_script(self.qpid_ha_path) + self._agent = None + self.client_credentials = client_credentials + + def __str__(self): return Broker.__str__(self) + + def qpid_ha(self, args): + cred = self.client_credentials + url = self.host_port() + if cred: + url =cred.add_user(url) + args = args + ["--sasl-mechanism", cred.mechanism] + self.qpid_ha_script.main_except(["", "-b", url]+args) + + def promote(self): self.qpid_ha(["promote"]) + def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) + def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) + def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + + def agent(self): + if not self._agent: + cred = self.client_credentials + if cred: + self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) + else: + self._agent = QmfAgent(self.host_port()) + return self._agent + + def ha_status(self): + hb = self.agent().getHaBroker() + hb.update() + return hb.status + + def wait_status(self, status): + def try_get_status(): + # Ignore ConnectionError, the broker may not be up yet. + try: return self.ha_status() == status; + except ConnectionError: return False + assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status) + + # FIXME aconway 2012-05-01: do direct python call to qpid-config code. + def qpid_config(self, args): + assert subprocess.call( + [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 def config_replicate(self, from_broker, queue): - assert os.system( - "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) def config_declare(self, queue, replication): - assert os.system( - "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0 + self.qpid_config(["add", "queue", queue, "--replicate", replication]) def connect_admin(self, **kwargs): - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + cred = self.client_credentials + if cred: + return Broker.connect( + self, client_properties={"qpid.ha-admin":1}, + username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, + **kwargs) + else: + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) def wait_backup(self, address): """Wait for address to become valid on a backup broker.""" @@ -78,6 +142,14 @@ class HaBroker(Broker): try: wait_address(bs, address) finally: bs.connection.close() + def assert_browse(self, queue, expected, **kwargs): + """Verify queue contents by browsing.""" + bs = self.connect().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + def assert_browse_backup(self, queue, expected, **kwargs): """Combines wait_backup and assert_browse_retry.""" bs = self.connect_admin().session() @@ -86,33 +158,70 @@ class HaBroker(Broker): assert_browse_retry(bs, queue, expected, **kwargs) finally: bs.connection.close() + def assert_connect_fail(self): + try: + self.connect() + self.test.fail("Expected ConnectionError") + except ConnectionError: pass + + def try_connect(self): + try: return self.connect() + except ConnectionError: return None + class HaCluster(object): _cluster_count = 0 - def __init__(self, test, n, **kwargs): + def __init__(self, test, n, promote=True, **kwargs): """Start a cluster of n brokers""" self.test = test - self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] + self.kwargs = kwargs + self._brokers = [] + self.id = HaCluster._cluster_count + self.broker_id = 0 HaCluster._cluster_count += 1 - self.url = ",".join([b.host_port() for b in self]) - for b in self: b.set_broker_url(self.url) + for i in xrange(n): self.start(False) + self.update_urls() self[0].promote() + def next_name(self): + name="cluster%s-%s"%(self.id, self.broker_id) + self.broker_id += 1 + return name + + def start(self, update_urls=True, args=[]): + """Start a new broker in the cluster""" + b = HaBroker(self.test, name=self.next_name(), **self.kwargs) + self._brokers.append(b) + if update_urls: self.update_urls() + return b + + def update_urls(self): + self.url = ",".join([b.host_port() for b in self]) + if len(self) > 1: # No failover addresses on a 1 cluster. + for b in self: b.set_brokers_url(self.url) + def connect(self, i): """Connect with reconnect_urls""" return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) - def kill(self, i): + def kill(self, i, promote_next=True): """Kill broker i, promote broker i+1""" - self[i].kill() self[i].expect = EXPECT_EXIT_FAIL - self[(i+1) % len(self)].promote() + self[i].kill() + if promote_next: self[(i+1) % len(self)].promote() + + def restart(self, i): + """Start a broker with the same port, name and data directory. It will get + a separate log file: foo.n.log""" + b = self._brokers[i] + self._brokers[i] = HaBroker( + self.test, name=b.name, port=b.port(), brokers_url=self.url, + **self.kwargs) - def bounce(self, i): + def bounce(self, i, promote_next=True): """Stop and restart a broker in a cluster.""" - self.kill(i) - b = self[i] - self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url) + self.kill(i, promote_next) + self.restart(i) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -128,12 +237,12 @@ def wait_address(session, address): except NotFound: return False assert retry(check), "Timed out waiting for address %s"%(address) -def assert_missing(session, address): - """Assert that the address is _not_ valid""" +def valid_address(session, address): + """Test if an address is valid""" try: session.receiver(address) - self.fail("Expected NotFound: %s"%(address)) - except NotFound: pass + return True + except NotFound: return False class ReplicationTests(BrokerTest): """Correctness tests for HA replication.""" @@ -180,7 +289,7 @@ class ReplicationTests(BrokerTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) self.assert_browse_retry(b, prefix+"q2", []) # configuration only - assert_missing(b, prefix+"q3") + assert not valid_address(b, prefix+"q3") b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration @@ -195,7 +304,7 @@ class ReplicationTests(BrokerTest): # Create config, send messages before starting the backup, to test catch-up replication. setup(p, "1", primary) - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) # Create config, send messages after starting the backup, to test steady-state replication. setup(p, "2", primary) @@ -233,10 +342,10 @@ class ReplicationTests(BrokerTest): s = p.sender("q;{create:always}") for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() - backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port()) for m in [str(i) for i in range(10,20)]: s.send(m) s.sync() - backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port()) for m in [str(i) for i in range(20,30)]: s.send(m) s.sync() @@ -276,7 +385,7 @@ class ReplicationTests(BrokerTest): """Verify that backups rejects connections and that fail-over works in python client""" primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) # Check that backup rejects normal connections try: backup.connect().session() @@ -294,14 +403,15 @@ class ReplicationTests(BrokerTest): primary.kill() assert retry(lambda: not is_running(primary.pid)) backup.promote() - self.assert_browse_retry(s, "q", ["foo"]) + sender.send("bar") + self.assert_browse_retry(s, "q", ["foo", "bar"]) c.close() def test_failover_cpp(self): """Verify that failover works in the C++ client.""" primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) primary.connect().session().sender("q;{create:always}") backup.wait_backup("q") @@ -344,6 +454,7 @@ class ReplicationTests(BrokerTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary", ha_cluster=False) pc = primary.connect() ps = pc.session().sender("q;{create:always}") @@ -393,7 +504,7 @@ class ReplicationTests(BrokerTest): """Verify that we replicate to an LVQ correctly""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: @@ -410,7 +521,7 @@ class ReplicationTests(BrokerTest): """Test replication with the ring queue policy""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}") for i in range(10): s.send(Message(str(i))) backup.assert_browse_backup("q", [str(i) for i in range(5,10)]) @@ -419,18 +530,20 @@ class ReplicationTests(BrokerTest): """Test replication with the reject queue policy""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}") try: for i in range(10): s.send(Message(str(i)), sync=False) except qpid.messaging.exceptions.TargetCapacityExceeded: pass backup.assert_browse_backup("q", [str(i) for i in range(0,5)]) + # Detach, don't close as there is a broken session + s.session.connection.detach() def test_priority(self): """Verify priority queues replicate correctly""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) session = primary.connect().session() s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] @@ -445,7 +558,7 @@ class ReplicationTests(BrokerTest): """Verify priority queues replicate correctly""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) session = primary.connect().session() levels = 8 priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3] @@ -464,7 +577,7 @@ class ReplicationTests(BrokerTest): def test_priority_ring(self): primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) @@ -475,8 +588,10 @@ class ReplicationTests(BrokerTest): # correct result, the uncommented one is for the actualy buggy # result. See https://issues.apache.org/jira/browse/QPID-3866 # - # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) - backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority) + # expect = sorted(priorities,reverse=True)[0:5] + expect = [9,9,9,9,2] + primary.assert_browse("q", expect, transform=lambda m: m.priority) + backup.assert_browse_backup("q", expect, transform=lambda m: m.priority) def test_backup_acquired(self): """Verify that acquired messages are backed up, for all queue types.""" @@ -509,11 +624,11 @@ class ReplicationTests(BrokerTest): primary = HaBroker(self, name="primary") primary.promote() - backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port()) c = primary.connect() for t in tests: t.send(c) # Send messages, leave one unacknowledged. - backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port()) # Wait for backups to catch up. for t in tests: t.wait(self, backup1) @@ -538,11 +653,13 @@ class ReplicationTests(BrokerTest): self.fail("Excpected no-such-queue exception") except NotFound: pass - def test_invalid_default(self): - """Verify that a queue with an invalid qpid.replicate gets default treatment""" - cluster = HaCluster(self, 2, ha_replicate="all") - c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") - cluster[1].wait_backup("q") + def test_invalid_replication(self): + """Verify that we reject an attempt to declare a queue with invalid replication value.""" + cluster = HaCluster(self, 1, ha_replicate="all") + try: + c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") + self.fail("Expected ConnectionError") + except ConnectionError: pass def test_exclusive_queue(self): """Ensure that we can back-up exclusive queues, i.e. the replicating @@ -559,6 +676,136 @@ class ReplicationTests(BrokerTest): test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") + def test_auto_delete_exclusive(self): + """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues""" + cluster = HaCluster(self,2) + s = cluster[0].connect().session() + s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}") + s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}") + s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}") + s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") + s.receiver("q;{create:always}") + + s = cluster[1].connect_admin().session() + cluster[1].wait_backup("q") + assert not valid_address(s, "exad") + assert valid_address(s, "ex") + assert valid_address(s, "ad") + assert valid_address(s, "time") + + def test_broker_info(self): + """Check that broker information is correctly published via management""" + cluster = HaCluster(self, 3) + + for broker in cluster: # Make sure HA system-id matches broker's + qmf = broker.agent().getHaBroker() + self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef)) + + cluster_ports = map(lambda b: b.port(), cluster) + cluster_ports.sort() + def ports(qmf): + qmf.update() + return sorted(map(lambda b: b["port"], qmf.members)) + # Check that all brokers have the same membership as the cluster + for broker in cluster: + qmf = broker.agent().getHaBroker() + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker) + # Add a new broker, check it is updated everywhere + b = cluster.start() + cluster_ports.append(b.port()) + cluster_ports.sort() + for broker in cluster: + qmf = broker.agent().getHaBroker() + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + + def test_auth(self): + """Verify that authentication does not interfere with replication.""" + # FIXME aconway 2012-07-09: generate test sasl config portably for cmake + sasl_config=os.path.join(self.rootdir, "sasl_config") + if not os.path.exists(sasl_config): + print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config + return + acl=os.path.join(os.getcwd(), "policy.acl") + aclf=file(acl,"w") + # Verify that replication works with auth=yes and HA user has at least the following + # privileges: + aclf.write(""" +acl allow zag@QPID access queue +acl allow zag@QPID create queue +acl allow zag@QPID consume queue +acl allow zag@QPID delete queue +acl allow zag@QPID access exchange +acl allow zag@QPID create exchange +acl allow zag@QPID bind exchange +acl allow zag@QPID publish exchange +acl allow zag@QPID delete exchange +acl allow zag@QPID access method +acl allow zag@QPID create link +acl deny all all + """) + aclf.close() + cluster = HaCluster( + self, 2, + args=["--auth", "yes", "--sasl-config", sasl_config, + "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"), + "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN" + ], + client_credentials=Credentials("zag", "zag", "PLAIN")) + s0 = cluster[0].connect(username="zag", password="zag").session(); + s0.receiver("q;{create:always}") + s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") + cluster[1].wait_backup("q") + cluster[1].wait_backup("ex") + s1 = cluster[1].connect_admin().session(); # Uses Credentials above. + s1.sender("ex").send("foo"); + self.assertEqual(s1.receiver("q").fetch().content, "foo") + + def test_alternate_exchange(self): + """Verify that alternate-exchange on exchanges and queues is propagated + to new members of a cluster. """ + cluster = HaCluster(self, 2) + s = cluster[0].connect().session() + # altex exchange: acts as alternate exchange + s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") + # altq queue bound to altex, collect re-routed messages. + s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") + # 0ex exchange with alternate-exchange altex and no queues bound + s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # create queue q with alternate-exchange altex + s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") + # create a bunch of exchanges to ensure we don't clean up prematurely if the + # response comes in multiple fragments. + for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + + def verify(broker): + s = broker.connect().session() + # Verify unmatched message goes to ex's alternate. + s.sender("0ex").send("foo") + altq = s.receiver("altq") + self.assertEqual("foo", altq.fetch(timeout=0).content) + s.acknowledge() + # Verify rejected message goes to q's alternate. + s.sender("q").send("bar") + msg = s.receiver("q").fetch(timeout=0) + self.assertEqual("bar", msg.content) + s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + self.assertEqual("bar", altq.fetch(timeout=0).content) + s.acknowledge() + + # Sanity check: alternate exchanges on original broker + verify(cluster[0]) + # Check backup that was connected during setup. + cluster[1].wait_backup("0ex") + cluster[1].wait_backup("q") + cluster.bounce(0) + verify(cluster[1]) + # Check a newly started backup. + cluster.start() + cluster[2].wait_backup("0ex") + cluster[2].wait_backup("q") + cluster.bounce(1) + verify(cluster[2]) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -601,49 +848,135 @@ class LongTests(BrokerTest): if d: return float(d)*60 else: return 3 # Default is to be quick - - def disable_test_failover(self): + def test_failover_send_receive(self): """Test failover with continuous send-receive""" - # FIXME aconway 2012-02-03: fails due to dropped messages, - # known issue: sending messages to new primary before - # backups are ready. Enable when fixed. - - # Start a cluster, all members will be killed during the test. - brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) - for name in ["ha0","ha1","ha2"] ] - url = ",".join([b.host_port() for b in brokers]) - for b in brokers: b.set_broker_url(url) - brokers[0].promote() + brokers = HaCluster(self, 3) # Start sender and receiver threads - sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) - receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) - receiver.start() - sender.start() - # Wait for sender & receiver to get up and running - assert retry(lambda: receiver.received > 100) + n = 10 + senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False, + queue="test%s"%(i)) for i in xrange(n)] + receivers = [NumberedReceiver(brokers[0], sender=senders[i], + failover_updates=False, + queue="test%s"%(i)) for i in xrange(n)] + for r in receivers: r.start() + for s in senders: s.start() + + def wait_passed(r, n): + """Wait for receiver r to pass n""" + def check(): + r.check() # Verify no exceptions + return r.received > n + assert retry(check), "Stalled %s at %s"%(r.queue, n) + + for r in receivers: wait_passed(r, 0) + # Kill and restart brokers in a cycle: endtime = time.time() + self.duration() i = 0 - while time.time() < endtime or i < 3: # At least 3 iterations - sender.sender.assert_running() - receiver.receiver.assert_running() - port = brokers[i].port() - brokers[i].kill() - brokers.append( - HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, - expect=EXPECT_EXIT_FAIL)) - i += 1 - brokers[i].promote() - n = receiver.received # Verify we're still running - def enough(): - receiver.check() # Verify no exceptions - return receiver.received > n + 100 - assert retry(enough, timeout=5) - - sender.stop() - receiver.stop() - for b in brokers[i:]: b.kill() + try: + while time.time() < endtime or i < 3: # At least 3 iterations + for s in senders: s.sender.assert_running() + for r in receivers: r.receiver.assert_running() + checkpoint = [ r.received for r in receivers ] + # Don't kill primary till it is active and the next + # backup is ready, otherwise we can lose messages. + brokers[i%3].wait_status("active") + brokers[(i+1)%3].wait_status("ready") + brokers.bounce(i%3) + i += 1 + map(wait_passed, receivers, checkpoint) # Wait for all receivers + except: + traceback.print_exc() + raise + finally: + for s in senders: s.stop() + for r in receivers: r.stop() + dead = [] + for i in xrange(3): + if not brokers[i].is_running(): dead.append(i) + brokers.kill(i, False) + if dead: raise Exception("Brokers not running: %s"%dead) + +class RecoveryTests(BrokerTest): + """Tests for recovery after a failure.""" + + def test_queue_hold(self): + """Verify that the broker holds queues without sufficient backup, + i.e. does not complete messages sent to those queues.""" + + # We don't want backups to time out for this test, set long timeout. + cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]); + # Wait for the primary to be ready + cluster[0].wait_status("active") + # Create a queue before the failure. + s1 = cluster.connect(0).session().sender("q1;{create:always}") + for b in cluster: b.wait_backup("q1") + for i in xrange(100): s1.send(str(i)) + # Kill primary and 2 backups + for i in [0,1,2]: cluster.kill(i, False) + cluster[3].promote() # New primary, backups will be 1 and 2 + cluster[3].wait_status("recovering") + + def assertSyncTimeout(s): + try: + s.sync(timeout=.01) + self.fail("Expected Timeout exception") + except Timeout: pass + + # Create a queue after the failure + s2 = cluster.connect(3).session().sender("q2;{create:always}") + + # Verify that messages sent are not completed + for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + assertSyncTimeout(s1) + self.assertEqual(s1.unsettled(), 100) + assertSyncTimeout(s2) + self.assertEqual(s2.unsettled(), 100) + + # Verify we can receive even if sending is on hold: + cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) + + # Restart backups, verify queues are released only when both backups are up + cluster.restart(1) + assertSyncTimeout(s1) + self.assertEqual(s1.unsettled(), 100) + assertSyncTimeout(s2) + self.assertEqual(s2.unsettled(), 100) + self.assertEqual(cluster[3].ha_status(), "recovering") + cluster.restart(2) + + # Verify everything is up to date and active + def settled(sender): sender.sync(); return sender.unsettled() == 0; + assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) + assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) + cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) + cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) + cluster[3].wait_status("active"), + s1.session.connection.close() + s2.session.connection.close() + + def test_expected_backup_timeout(self): + """Verify that we time-out expected backups and release held queues + after a configured interval. Verify backup is demoted to catch-up, + but can still rejoin. + """ + cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]); + cluster[0].wait_status("active") # Primary ready + for b in cluster[1:4]: b.wait_status("ready") # Backups ready + for i in [0,1]: cluster.kill(i, False) + cluster[2].promote() # New primary, backups will be 1 and 2 + cluster[2].wait_status("recovering") + # Should not go active till the expected backup connects or times out. + self.assertEqual(cluster[2].ha_status(), "recovering") + # Messages should be held expected backup times out + s = cluster[2].connect().session().sender("q;{create:always}") + for i in xrange(100): s.send(str(i), sync=False) + # Verify message held initially. + try: s.sync(timeout=.01); self.fail("Expected Timeout exception") + except Timeout: pass + s.sync(timeout=1) # And released after the timeout. + self.assertEqual(cluster[2].ha_status(), "active") if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |