summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py660
1 files changed, 455 insertions, 205 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 822e07c702..4d07d386f9 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -19,50 +19,121 @@
#
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
-from qpid.messaging import Message, NotFound, ConnectionError, Connection
+import traceback
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
+from uuid import UUID
-log = getLogger("qpid.ha-tests")
+log = getLogger(__name__)
+
+class QmfAgent(object):
+ """Access to a QMF broker agent."""
+ def __init__(self, address):
+ self._connection = Connection.establish(
+ address, client_properties={"qpid.ha-admin":1})
+ self._agent = BrokerAgent(self._connection)
+ assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
+
+ def __getattr__(self, name):
+ a = getattr(self._agent, name)
+ return a
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs):
+ """Start a broker with HA enabled"""
+ def __init__(self, test, args=[], brokers_url=None, ha_cluster=True,
+ ha_replicate="all", **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
- args.extend(["--load-module", BrokerTest.ha_lib,
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=%s"%ha_cluster])
- if broker_url: args.extend([ "--ha-brokers", broker_url ])
+ args += ["--load-module", BrokerTest.ha_lib,
+ "--log-enable=debug+:ha::",
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster]
+ if ha_replicate is not None:
+ args += [ "--ha-replicate=%s"%ha_replicate ]
+ if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
Broker.__init__(self, test, args, **kwargs)
- self.commands=os.getenv("PYTHON_COMMANDS")
- assert os.path.isdir(self.commands)
+ self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
+ assert os.path.exists(self.qpid_ha_path)
+ self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
+ assert os.path.exists(self.qpid_config_path)
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ self.qpid_ha_script=import_script(self.qpid_ha_path)
+ self._agent = None
+
+ def __str__(self): return Broker.__str__(self)
+
+ def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args)
+
+ def promote(self): self.qpid_ha(["promote"])
+ def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
+ def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+ def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+ def agent(self):
+ if not self._agent: self._agent = QmfAgent(self.host_port())
+ return self._agent
+
+ def ha_status(self):
+ hb = self.agent().getHaBroker()
+ hb.update()
+ return hb.status
+
+ def wait_status(self, status):
+ def try_get_status():
+ # Ignore ConnectionError, the broker may not be up yet.
+ try: return self.ha_status() == status;
+ except ConnectionError: return False
+ assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status)
+
+ # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+ def qpid_config(self, args):
+ assert subprocess.call(
+ [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
- def promote(self):
- assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
+ def config_replicate(self, from_broker, queue):
+ self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
- def set_client_url(self, url):
- assert os.system(
- "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
+ def config_declare(self, queue, replication):
+ self.qpid_config(["add", "queue", queue, "--replicate", replication])
- def set_broker_url(self, url):
- assert os.system(
- "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
+ def connect_admin(self, **kwargs):
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
- def replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ def wait_backup(self, address):
+ """Wait for address to become valid on a backup broker."""
+ bs = self.connect_admin().session()
+ try: wait_address(bs, address)
+ finally: bs.connection.close()
- def config_replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ def assert_browse(self, queue, expected, **kwargs):
+ """Verify queue contents by browsing."""
+ bs = self.connect().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
- def config_declare(self, queue, replication):
- assert os.system(
- "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+ def assert_browse_backup(self, queue, expected, **kwargs):
+ """Combines wait_backup and assert_browse_retry."""
+ bs = self.connect_admin().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
+ def assert_connect_fail(self):
+ try:
+ self.connect()
+ self.test.fail("Expected ConnectionError")
+ except ConnectionError: pass
+
+ def try_connect(self):
+ try: return self.connect()
+ except ConnectionError: return None
class HaCluster(object):
_cluster_count = 0
@@ -70,77 +141,82 @@ class HaCluster(object):
def __init__(self, test, n, **kwargs):
"""Start a cluster of n brokers"""
self.test = test
- self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ self.kwargs = kwargs
+ self._brokers = []
+ self.id = HaCluster._cluster_count
+ self.broker_id = 0
HaCluster._cluster_count += 1
+ for i in xrange(n): self.start(False)
+ self.update_urls()
self[0].promote()
+
+ def next_name(self):
+ name="cluster%s-%s"%(self.id, self.broker_id)
+ self.broker_id += 1
+ return name
+
+ def start(self, update_urls=True):
+ """Start a new broker in the cluster"""
+ b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+ self._brokers.append(b)
+ if update_urls: self.update_urls()
+ return b
+
+ def update_urls(self):
self.url = ",".join([b.host_port() for b in self])
- for b in self: b.set_broker_url(self.url)
+ for b in self: b.set_brokers_url(self.url)
def connect(self, i):
"""Connect with reconnect_urls"""
return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
- def kill(self, i):
+ def kill(self, i, promote_next=True):
"""Kill broker i, promote broker i+1"""
- self[i].kill()
self[i].expect = EXPECT_EXIT_FAIL
- self[(i+1) % len(self)].promote()
+ self[i].kill()
+ if promote_next: self[(i+1) % len(self)].promote()
+
+ def restart(self, i):
+ """Start a broker with the same name and data directory. It will get
+ a separate log file: foo.n.log"""
+ b = self._brokers[i]
+ self._brokers[i] = HaBroker(
+ self.test, name=b.name, port=b.port(), brokers_url=self.url,
+ **self.kwargs)
- def bounce(self, i):
+ def bounce(self, i, promote_next=True):
"""Stop and restart a broker in a cluster."""
- self.kill(i)
- b = self[i]
- self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+ self.kill(i, promote_next)
+ self.restart(i)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
def __getitem__(self,index): return self._brokers[index]
def __iter__(self): return self._brokers.__iter__()
-
-def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
-class ShortTests(BrokerTest):
- """Short HA functionality tests."""
-
- # Wait for an address to become valid.
- def wait(self, session, address):
- def check():
- try:
- session.sender(address)
- return True
- except NotFound: return False
- assert retry(check), "Timed out waiting for address %s"%(address)
-
- # Wait for address to become valid on a backup broker.
- def wait_backup(self, backup, address):
- bs = self.connect_admin(backup).session()
- self.wait(bs, address)
- bs.connection.close()
-
- # Combines wait_backup and assert_browse_retry
- def assert_browse_backup(self, backup, queue, expected, **kwargs):
- bs = self.connect_admin(backup).session()
- self.wait(bs, queue)
- self.assert_browse_retry(bs, queue, expected, **kwargs)
- bs.connection.close()
-
- def assert_missing(self, session, address):
+def wait_address(session, address):
+ """Wait for an address to become valid."""
+ def check():
try:
- session.receiver(address)
- self.fail("Should not have been replicated: %s"%(address))
- except NotFound: pass
+ session.sender(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for address %s"%(address)
- def connect_admin(self, backup, **kwargs):
- """Connect to a backup broker as an admin connection"""
- return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
+def valid_address(session, address):
+ """Test if an address is valid"""
+ try:
+ session.receiver(address)
+ return True
+ except NotFound: return False
+
+class ReplicationTests(BrokerTest):
+ """Correctness tests for HA replication."""
def test_replication(self):
"""Test basic replication of configuration and messages before and
after backup has connected"""
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
-
def queue(name, replicate):
return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
@@ -170,9 +246,8 @@ class ShortTests(BrokerTest):
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
-
# Wait for configuration to replicate.
- self.wait(b, prefix+"x");
+ wait_address(b, prefix+"x");
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
@@ -180,7 +255,7 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- self.assert_missing(b, prefix+"q3")
+ assert not valid_address(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
@@ -195,17 +270,17 @@ class ShortTests(BrokerTest):
# Create config, send messages before starting the backup, to test catch-up replication.
setup(p, "1", primary)
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
# Create config, send messages after starting the backup, to test steady-state replication.
setup(p, "2", primary)
# Verify the data on the backup
- b = self.connect_admin(backup).session()
+ b = backup.connect_admin().session()
verify(b, "1", p)
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
s = p.sender(queue("foo","all"))
- self.wait(b, "foo")
+ wait_address(b, "foo")
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(Message(m))
self.assert_browse_retry(p, "foo", msgs)
@@ -227,103 +302,91 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(b, "foo", msgs[i+1:])
def test_sync(self):
- def queue(name, replicate):
- return "%s;{create:always,%s}"%(name, qr_node(replicate))
primary = HaBroker(self, name="primary")
primary.promote()
p = primary.connect().session()
- s = p.sender(queue("q","all"))
+ s = p.sender("q;{create:always}")
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
for m in [str(i) for i in range(10,20)]: s.send(m)
s.sync()
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
for m in [str(i) for i in range(20,30)]: s.send(m)
s.sync()
msgs = [str(i) for i in range(30)]
- b1 = self.connect_admin(backup1).session()
- self.wait(b1, "q");
+ b1 = backup1.connect_admin().session()
+ wait_address(b1, "q");
self.assert_browse_retry(b1, "q", msgs)
- b2 = self.connect_admin(backup2).session()
- self.wait(b2, "q");
+ b2 = backup2.connect_admin().session()
+ wait_address(b2, "q");
self.assert_browse_retry(b2, "q", msgs)
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- primary = HaBroker(self, name="primary")
- primary.promote()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ brokers = HaCluster(self, 3)
sender = self.popen(
["qpid-send",
- "--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(qr_node("all")),
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
- "--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(qr_node("all")),
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
"--messages=990",
"--timeout=10"
])
- try:
- self.assertEqual(sender.wait(), 0)
- self.assertEqual(receiver.wait(), 0)
- expect = [long(i) for i in range(991, 1001)]
- sn = lambda m: m.properties["sn"]
- self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn)
- self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn)
- except:
- print self.browse(primary.connect().session(), "q", transform=sn)
- print self.browse(self.connect_admin(backup1).session(), "q", transform=sn)
- print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
- raise
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ brokers[1].assert_browse_backup("q", expect, transform=sn)
+ brokers[2].assert_browse_backup("q", expect, transform=sn)
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
# Check that backup rejects normal connections
try:
backup.connect().session()
self.fail("Expected connection to backup to fail")
except ConnectionError: pass
# Check that admin connections are allowed to backup.
- self.connect_admin(backup).close()
+ backup.connect_admin().close()
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- sender = s.sender("q;{create:always,%s}"%(qr_node()))
- self.wait_backup(backup, "q")
+ sender = s.sender("q;{create:always}")
+ backup.wait_backup("q")
sender.send("foo")
primary.kill()
assert retry(lambda: not is_running(primary.pid))
backup.promote()
- self.assert_browse_retry(s, "q", ["foo"])
+ sender.send("bar")
+ self.assert_browse_retry(s, "q", ["foo", "bar"])
c.close()
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always,%s}"%(qr_node()))
- self.wait_backup(backup, "q")
+ primary.connect().session().sender("q;{create:always}")
+ backup.wait_backup("q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
receiver.start()
sender.start()
- self.wait_backup(backup, "q")
+ backup.wait_backup("q")
assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
primary.kill()
@@ -337,122 +400,123 @@ class ShortTests(BrokerTest):
def test_backup_failover(self):
"""Verify that a backup broker fails over and recovers queue state"""
brokers = HaCluster(self, 3)
- brokers[0].connect().session().sender(
- "q;{create:always,%s}"%(qr_node())).send("a")
- for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
+ brokers[0].connect().session().sender("q;{create:always}").send("a")
+ for b in brokers[1:]: b.assert_browse_backup("q", ["a"], msg=b)
brokers[0].expect = EXPECT_EXIT_FAIL
brokers.kill(0)
brokers[1].connect().session().sender("q").send("b")
- self.assert_browse_backup(brokers[2], "q", ["a","b"])
+ brokers[2].assert_browse_backup("q", ["a","b"])
s = brokers[1].connect().session()
self.assertEqual("a", s.receiver("q").fetch().content)
s.acknowledge()
- self.assert_browse_backup(brokers[2], "q", ["b"])
+ brokers[2].assert_browse_backup("q", ["b"])
def test_qpid_config_replication(self):
"""Set up replication via qpid-config"""
brokers = HaCluster(self,2)
brokers[0].config_declare("q","all")
brokers[0].connect().session().sender("q").send("foo")
- self.assert_browse_backup(brokers[1], "q", ["foo"])
+ brokers[1].assert_browse_backup("q", ["foo"])
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"])
+ primary = HaBroker(self, name="primary", ha_cluster=False)
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ backup = HaBroker(self, name="backup", ha_cluster=False)
br = backup.connect().session().receiver("q;{create:always}")
# Set up replication with qpid-ha
backup.replicate(primary.host_port(), "q")
ps.send("a")
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
ps.send("b")
- self.assert_browse_backup(backup, "q", ["a", "b"])
+ backup.assert_browse_backup("q", ["a", "b"])
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
- self.assert_browse_backup(backup, "q", ["b"])
+ backup.assert_browse_backup("q", ["b"])
# Set up replication with qpid-config
ps2 = pc.session().sender("q2;{create:always}")
backup.config_replicate(primary.host_port(), "q2");
ps2.send("x")
- self.assert_browse_backup(backup, "q2", ["x"])
+ backup.assert_browse_backup("q2", ["x"])
def test_queue_replica_failover(self):
"""Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
cluster = HaCluster(self, 2)
primary = cluster[0]
pc = cluster.connect(0)
- ps = pc.session().sender("q;{create:always,%s}"%qr_node("all"))
- pr = pc.session().receiver("q;{create:always,%s}"%qr_node("all"))
- backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False)
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
ps.send("a")
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
cluster.bounce(0)
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
ps.send("b")
- self.assert_browse_backup(backup, "q", ["a", "b"])
+ backup.assert_browse_backup("q", ["a", "b"])
cluster.bounce(1)
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
- self.assert_browse_backup(backup, "q", ["b"])
+ backup.assert_browse_backup("q", ["b"])
def test_lvq(self):
"""Verify that we replicate to an LVQ correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':all}}}}")
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
send(*kv)
- self.assert_browse_backup(backup, "lvq", ["b-1", "a-3", "c-2"])
+ backup.assert_browse_backup("lvq", ["b-1", "a-3", "c-2"])
send("b","b-2")
- self.assert_browse_backup(backup, "lvq", ["a-3", "c-2", "b-2"])
+ backup.assert_browse_backup("lvq", ["a-3", "c-2", "b-2"])
send("c","c-3")
- self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3"])
+ backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3"])
send("d","d-1")
- self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"])
+ backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"])
def test_ring(self):
+ """Test replication with the ring queue policy"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':all}}}}")
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
for i in range(10): s.send(Message(str(i)))
- self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
+ backup.assert_browse_backup("q", [str(i) for i in range(5,10)])
def test_reject(self):
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ """Test replication with the reject queue policy"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':all}}}}")
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
try:
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
- self.assert_browse_backup(backup, "q", [str(i) for i in range(0,5)])
+ backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
+ # Detach, don't close as there is a broken session
+ s.session.connection.detach()
def test_priority(self):
"""Verify priority queues replicate correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
session = primary.connect().session()
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':all}}}}")
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
- self.wait_backup(backup, "priority-queue")
- r = self.connect_admin(backup).session().receiver("priority-queue")
+ backup.wait_backup("priority-queue")
+ r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().priority for i in priorities]
self.assertEqual(sorted(priorities, reverse=True), received)
@@ -460,17 +524,17 @@ class ShortTests(BrokerTest):
"""Verify priority queues replicate correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
session = primary.connect().session()
levels = 8
priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':all}}}}"%(levels,limit_policy))
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
messages = [Message(content=str(uuid4()), priority = p) for p in priorities]
for m in messages: s.send(m)
- self.wait_backup(backup, s.target)
- r = self.connect_admin(backup).session().receiver("priority-queue")
+ backup.wait_backup(s.target)
+ r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().content for i in priorities]
sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True)
fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)]
@@ -479,17 +543,144 @@ class ShortTests(BrokerTest):
def test_priority_ring(self):
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':all}}}}")
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
- # FIXME aconway 2012-02-22: there is a bug in priority ring queues that allows a low
- # priority message to displace a high one. The following commented-out assert_browse
- # is for the correct result, the uncommented one is for the actualy buggy result.
- # See https://issues.apache.org/jira/browse/QPID-3866
+
+ # FIXME aconway 2012-02-22: there is a bug in priority ring
+ # queues that allows a low priority message to displace a high
+ # one. The following commented-out assert_browse is for the
+ # correct result, the uncommented one is for the actualy buggy
+ # result. See https://issues.apache.org/jira/browse/QPID-3866
#
- # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
- self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
+ # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
+ backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
+
+ def test_backup_acquired(self):
+ """Verify that acquired messages are backed up, for all queue types."""
+ class Test:
+ def __init__(self, queue, arguments, expect):
+ self.queue = queue
+ self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
+ self.queue, ",".join(arguments + ["'qpid.replicate':all"]))
+ self.expect = [str(i) for i in expect]
+
+ def send(self, connection):
+ """Send messages, then acquire one but don't acknowledge"""
+ s = connection.session()
+ for m in range(10): s.sender(self.address).send(str(m))
+ s.receiver(self.address).fetch()
+
+ def wait(self, brokertest, backup):
+ backup.wait_backup(self.queue)
+
+ def verify(self, brokertest, backup):
+ backup.assert_browse_backup(self.queue, self.expect, msg=self.queue)
+
+ tests = [
+ Test("plain",[],range(10)),
+ Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
+ Test("priority",["'qpid.priorities':10"], range(10)),
+ Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
+ Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
+ ]
+
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
+ c = primary.connect()
+ for t in tests: t.send(c) # Send messages, leave one unacknowledged.
+
+ backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
+ # Wait for backups to catch up.
+ for t in tests:
+ t.wait(self, backup1)
+ t.wait(self, backup2)
+ # Verify acquired message was replicated
+ for t in tests: t.verify(self, backup1)
+ for t in tests: t.verify(self, backup2)
+
+ def test_replicate_default(self):
+ """Make sure we don't replicate if ha-replicate is unspecified or none"""
+ cluster1 = HaCluster(self, 2, ha_replicate=None)
+ c1 = cluster1[0].connect().session().sender("q;{create:always}")
+ cluster2 = HaCluster(self, 2, ha_replicate="none")
+ cluster2[0].connect().session().sender("q;{create:always}")
+ time.sleep(.1) # Give replication a chance.
+ try:
+ cluster1[1].connect_admin().session().receiver("q")
+ self.fail("Excpected no-such-queue exception")
+ except NotFound: pass
+ try:
+ cluster2[1].connect_admin().session().receiver("q")
+ self.fail("Excpected no-such-queue exception")
+ except NotFound: pass
+
+ def test_invalid_replication(self):
+ """Verify that we reject an attempt to declare a queue with invalid replication value."""
+ cluster = HaCluster(self, 1, ha_replicate="all")
+ try:
+ c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+ self.fail("Expected ConnectionError")
+ except ConnectionError: pass
+
+ def test_exclusive_queue(self):
+ """Ensure that we can back-up exclusive queues, i.e. the replicating
+ subscriptions are exempt from the exclusivity"""
+ cluster = HaCluster(self, 2)
+ def test(addr):
+ c = cluster[0].connect()
+ q = addr.split(";")[0]
+ r = c.session().receiver(addr)
+ try: c.session().receiver(addr); self.fail("Expected exclusive exception")
+ except ReceiverError: pass
+ s = c.session().sender(q).send(q)
+ cluster[1].assert_browse_backup(q, [q])
+ test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
+ test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
+
+ def test_auto_delete_exclusive(self):
+ """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
+ cluster = HaCluster(self,2)
+ s = cluster[0].connect().session()
+ s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+ s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+ s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+ s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ s.receiver("q;{create:always}")
+
+ s = cluster[1].connect_admin().session()
+ cluster[1].wait_backup("q")
+ assert not valid_address(s, "exad")
+ assert valid_address(s, "ex")
+ assert valid_address(s, "ad")
+ assert valid_address(s, "time")
+
+ def test_broker_info(self):
+ """Check that broker information is correctly published via management"""
+ cluster = HaCluster(self, 3)
+
+ for broker in cluster: # Make sure HA system-id matches broker's
+ qmf = broker.agent().getHaBroker()
+ self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+
+ cluster_ports = map(lambda b: b.port(), cluster)
+ cluster_ports.sort()
+ def ports(qmf):
+ qmf.update()
+ return sorted(map(lambda b: b["port"], qmf.members))
+ # Check that all brokers have the same membership as the cluster
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
+ # Add a new broker, check it is updated everywhere
+ b = cluster.start()
+ cluster_ports.append(b.port())
+ cluster_ports.sort()
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
def fairshare(msgs, limit, levels):
"""
@@ -533,49 +724,108 @@ class LongTests(BrokerTest):
if d: return float(d)*60
else: return 3 # Default is to be quick
-
- def disable_test_failover(self):
+ def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
- # FIXME aconway 2012-02-03: fails due to dropped messages,
- # known issue: sending messages to new primary before
- # backups are ready. Enable when fixed.
-
- # Start a cluster, all members will be killed during the test.
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
- for name in ["ha0","ha1","ha2"] ]
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
- brokers[0].promote()
+ brokers = HaCluster(self, 3)
# Start sender and receiver threads
- sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
- receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
- receiver.start()
- sender.start()
+ n = 10
+ senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(n)]
+ receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+ failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(n)]
+ for r in receivers: r.start()
+ for s in senders: s.start()
+
# Wait for sender & receiver to get up and running
- assert retry(lambda: receiver.received > 100)
+ assert retry(lambda: receivers[0].received > 100), "%s<=100"%receivers[0].received
# Kill and restart brokers in a cycle:
endtime = time.time() + self.duration()
i = 0
- while time.time() < endtime or i < 3: # At least 3 iterations
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- port = brokers[i].port()
- brokers[i].kill()
- brokers.append(
- HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
- expect=EXPECT_EXIT_FAIL))
- i += 1
- brokers[i].promote()
- n = receiver.received # Verify we're still running
- def enough():
- receiver.check() # Verify no exceptions
- return receiver.received > n + 100
- assert retry(enough, timeout=5)
-
- sender.stop()
- receiver.stop()
- for b in brokers[i:]: b.kill()
+ try:
+ while time.time() < endtime or i < 3: # At least 3 iterations
+ for s in senders: s.sender.assert_running()
+ for r in receivers: r.receiver.assert_running()
+ n = receivers[0].received
+ # FIXME aconway 2012-05-01: don't kill primary till it's active
+ # and backups are ready, otherwise we can lose messages. When we
+ # implement non-promotion of catchup brokers we can make this
+ # stronger: wait only for there to be at least one ready backup.
+ brokers[i%3].wait_status("active")
+ brokers[(i+1)%3].wait_status("ready")
+ brokers[(i+2)%3].wait_status("ready")
+ brokers.bounce(i%3)
+ i += 1
+ def enough(): # Verify we're still running
+ receivers[0].check() # Verify no exceptions
+ return receivers[0].received > n + 100
+ assert retry(enough), "Stalled: %s < %s+100"%(receivers[0].received, n)
+ except:
+ traceback.print_exc()
+ raise
+ finally:
+ for s in senders: s.stop()
+ for r in receivers: r.stop()
+ dead = []
+ for i in xrange(3):
+ if not brokers[i].is_running(): dead.append(i)
+ brokers.kill(i, False)
+ if dead: raise Exception("Brokers not running: %s"%dead)
+
+class RecoveryTests(BrokerTest):
+ """Tests for recovery after a failure."""
+
+ def test_queue_hold(self):
+ """Verify that the broker holds queues without sufficient backup,
+ i.e. does not complete messages sent to those queues."""
+
+ cluster = HaCluster(self, 4);
+ # Wait for the primary to be ready
+ cluster[0].wait_status("active")
+ # Create a queue before the failure.
+ s1 = cluster.connect(0).session().sender("q1;{create:always}")
+ for b in cluster: b.wait_backup("q1")
+ for i in xrange(100): s1.send(str(i))
+ # Kill primary and 2 backups
+ for i in [0,1,2]: cluster.kill(i, False)
+ cluster[3].promote() # New primary, backups will be 1 and 2
+ cluster[3].wait_status("recovering")
+
+ def trySync(s):
+ try:
+ s.sync(timeout=.1)
+ self.fail("Expected Timeout exception")
+ except Timeout: pass
+
+ # Create a queue after the failure
+ s2 = cluster.connect(3).session().sender("q2;{create:always}")
+ # Verify that messages sent are not completed
+ for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ trySync(s1)
+ self.assertEqual(s1.unsettled(), 100)
+ trySync(s2)
+ self.assertEqual(s2.unsettled(), 100)
+
+ # Verify we can receive even if sending is on hold:
+ cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+ # Restart backups, verify queues are released only when both backups are up
+ cluster.restart(1)
+ trySync(s1)
+ self.assertEqual(s1.unsettled(), 100)
+ trySync(s2)
+ self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(cluster[3].ha_status(), "recovering")
+ cluster.restart(2)
+
+ def settled(sender): sender.sync(); return sender.unsettled() == 0;
+ assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+ assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[3].wait_status("active"),
+ s1.session.connection.close()
+ s2.session.connection.close()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)