summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py355
1 files changed, 355 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
new file mode 100755
index 0000000000..97de0d1f77
--- /dev/null
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -0,0 +1,355 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
+from qpid.messaging import Message, NotFound, ConnectionError, Connection
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger, WARN, ERROR, DEBUG
+
+
+log = getLogger("qpid.ha-tests")
+
+class HaBroker(Broker):
+ def __init__(self, test, args=[], broker_url=None, **kwargs):
+ assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ args=["--load-module", BrokerTest.ha_lib,
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-enable=yes"]
+ if broker_url: args += [ "--ha-broker-url", broker_url ]
+ Broker.__init__(self, test, args, **kwargs)
+
+ def promote(self):
+ assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0
+
+ def set_client_url(self, url):
+ assert os.system(
+ "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0
+
+ def set_broker_url(self, url):
+ assert os.system(
+ "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0
+
+
+class ShortTests(BrokerTest):
+ """Short HA functionality tests."""
+
+ # Wait for an address to become valid.
+ def wait(self, session, address):
+ def check():
+ try:
+ session.sender(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for %s"%(address)
+
+ # Wait for address to become valid on a backup broker.
+ def wait_backup(self, backup, address):
+ bs = self.connect_admin(backup).session()
+ self.wait(bs, address)
+ bs.connection.close()
+
+ # Combines wait_backup and assert_browse_retry
+ def assert_browse_backup(self, backup, queue, expected, **kwargs):
+ bs = self.connect_admin(backup).session()
+ self.wait(bs, queue)
+ self.assert_browse_retry(bs, queue, expected, **kwargs)
+ bs.connection.close()
+
+ def assert_missing(self, session, address):
+ try:
+ session.receiver(address)
+ self.fail("Should not have been replicated: %s"%(address))
+ except NotFound: pass
+
+ def connect_admin(self, backup, **kwargs):
+ """Connect to a backup broker as an admin connection"""
+ return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
+
+ def test_replication(self):
+ """Test basic replication of configuration and messages before and
+ after backup has connected"""
+
+ def queue(name, replicate):
+ return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+
+ def exchange(name, replicate, bindq):
+ return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+ def setup(p, prefix, primary):
+ """Create config, send messages on the primary p"""
+ s = p.sender(queue(prefix+"q1", "messages"))
+ for m in ["a", "b", "1"]: s.send(Message(m))
+ # Test replication of dequeue
+ self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
+ p.acknowledge()
+ p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
+ p.sender(queue(prefix+"q3", "none")).send(Message("3"))
+ p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5"))
+ # Test unbind
+ p.sender(queue(prefix+"q4", "messages")).send(Message("6"))
+ s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4"))
+ s3.send(Message("7"))
+ # Use old connection to unbind
+ us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
+ us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4")
+ p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
+ # Need a marker so we can wait till sync is done.
+ p.sender(queue(prefix+"x", "configuration"))
+
+ def verify(b, prefix, p):
+ """Verify setup was replicated to backup b"""
+
+ # Wait for configuration to replicate.
+ self.wait(b, prefix+"x");
+ self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
+
+ self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
+ p.acknowledge()
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
+
+ self.assert_browse_retry(b, prefix+"q2", []) # configuration only
+ self.assert_missing(b, prefix+"q3")
+ b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+ self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
+ b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
+ self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
+
+ b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
+ self.assert_browse_retry(b, prefix+"q4", ["6","7"])
+
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ p = primary.connect().session()
+
+ # Create config, send messages before starting the backup, to test catch-up replication.
+ setup(p, "1", primary)
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ # Create config, send messages after starting the backup, to test steady-state replication.
+ setup(p, "2", primary)
+
+ # Verify the data on the backup
+ b = self.connect_admin(backup).session()
+ verify(b, "1", p)
+ verify(b, "2", p)
+ # Test a series of messages, enqueue all then dequeue all.
+ s = p.sender(queue("foo","messages"))
+ self.wait(b, "foo")
+ msgs = [str(i) for i in range(10)]
+ for m in msgs: s.send(Message(m))
+ self.assert_browse_retry(p, "foo", msgs)
+ self.assert_browse_retry(b, "foo", msgs)
+ r = p.receiver("foo")
+ for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
+ p.acknowledge()
+ self.assert_browse_retry(p, "foo", [])
+ self.assert_browse_retry(b, "foo", [])
+
+ # Another series, this time verify each dequeue individually.
+ for m in msgs: s.send(Message(m))
+ self.assert_browse_retry(p, "foo", msgs)
+ self.assert_browse_retry(b, "foo", msgs)
+ for i in range(len(msgs)):
+ self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+ p.acknowledge()
+ self.assert_browse_retry(p, "foo", msgs[i+1:])
+ self.assert_browse_retry(b, "foo", msgs[i+1:])
+
+ def qpid_replicate(self, value="messages"):
+ return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
+
+ def test_sync(self):
+ def queue(name, replicate):
+ return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ p = primary.connect().session()
+ s = p.sender(queue("q","messages"))
+ for m in [str(i) for i in range(0,10)]: s.send(m)
+ s.sync()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ for m in [str(i) for i in range(10,20)]: s.send(m)
+ s.sync()
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ for m in [str(i) for i in range(20,30)]: s.send(m)
+ s.sync()
+
+ msgs = [str(i) for i in range(30)]
+ b1 = self.connect_admin(backup1).session()
+ self.wait(b1, "q");
+ self.assert_browse_retry(b1, "q", msgs)
+ b2 = self.connect_admin(backup2).session()
+ self.wait(b2, "q");
+ self.assert_browse_retry(b2, "q", msgs)
+
+ def test_send_receive(self):
+ """Verify sequence numbers of messages sent by qpid-send"""
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ sender = self.popen(
+ ["qpid-send",
+ "--broker", primary.host_port(),
+ "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--messages=1000",
+ "--content-string=x"
+ ])
+ receiver = self.popen(
+ ["qpid-receive",
+ "--broker", primary.host_port(),
+ "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--messages=990",
+ "--timeout=10"
+ ])
+ try:
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn)
+ self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn)
+ except:
+ print self.browse(primary.connect().session(), "q", transform=sn)
+ print self.browse(self.connect_admin(backup1).session(), "q", transform=sn)
+ print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
+ raise
+
+ def test_failover_python(self):
+ """Verify that backups rejects connections and that fail-over works in python client"""
+ getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ # Check that backup rejects normal connections
+ try:
+ backup.connect().session()
+ self.fail("Expected connection to backup to fail")
+ except ConnectionError: pass
+ # Check that admin connections are allowed to backup.
+ self.connect_admin(backup).close()
+
+ # Test discovery: should connect to primary after reject by backup
+ c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+ s = c.session()
+ sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ self.wait_backup(backup, "q")
+ sender.send("foo")
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid))
+ backup.promote()
+ self.assert_browse_retry(s, "q", ["foo"])
+ c.close()
+
+ def test_failover_cpp(self):
+ """Verify that failover works in the C++ client."""
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ url="%s,%s"%(primary.host_port(), backup.host_port())
+ primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ self.wait_backup(backup, "q")
+
+ sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
+ receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
+ receiver.start()
+ sender.start()
+ self.wait_backup(backup, "q")
+ assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
+
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
+ backup.promote()
+ n = receiver.received # Make sure we are still running
+ assert retry(lambda: receiver.received > n + 10)
+ sender.stop()
+ receiver.stop()
+
+ def test_backup_failover(self):
+ brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
+ for name in ["a","b","c"] ]
+ url = ",".join([b.host_port() for b in brokers])
+ for b in brokers: b.set_broker_url(url)
+ brokers[0].promote()
+ brokers[0].connect().session().sender(
+ "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
+ for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
+ brokers[0].kill()
+ brokers[2].promote() # c must fail over to b.
+ brokers[2].connect().session().sender("q").send("b")
+ self.assert_browse_backup(brokers[1], "q", ["a","b"])
+ for b in brokers[1:]: b.kill()
+
+class LongTests(BrokerTest):
+ """Tests that can run for a long time if -DDURATION=<minutes> is set"""
+
+ def duration(self):
+ d = self.config.defines.get("DURATION")
+ if d: return float(d)*60
+ else: return 3 # Default is to be quick
+
+
+ def disable_test_failover(self):
+ """Test failover with continuous send-receive"""
+ # FIXME aconway 2012-02-03: fails due to dropped messages,
+ # known issue: sending messages to new primary before
+ # backups are ready.
+
+ # Start a cluster, all members will be killed during the test.
+ brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
+ for name in ["ha0","ha1","ha2"] ]
+ url = ",".join([b.host_port() for b in brokers])
+ for b in brokers: b.set_broker_url(url)
+ brokers[0].promote()
+
+ # Start sender and receiver threads
+ sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
+ receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
+ receiver.start()
+ sender.start()
+ # Wait for sender & receiver to get up and running
+ assert retry(lambda: receiver.received > 100)
+ # Kill and restart brokers in a cycle:
+ endtime = time.time() + self.duration()
+ i = 0
+ while time.time() < endtime or i < 3: # At least 3 iterations
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
+ port = brokers[i].port()
+ brokers[i].kill()
+ brokers.append(
+ HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
+ expect=EXPECT_EXIT_FAIL))
+ i += 1
+ brokers[i].promote()
+ n = receiver.received # Verify we're still running
+ def enough():
+ receiver.check() # Verify no exceptions
+ return receiver.received > n + 100
+ assert retry(enough, timeout=5)
+
+ sender.stop()
+ receiver.stop()
+ for b in brokers[i:]: b.kill()
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])