summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-xcpp/src/tests/ha_tests.py1097
1 files changed, 666 insertions, 431 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 310ef844bd..77399bf2e2 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -18,235 +18,32 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from qpid.datatypes import uuid4
from brokertest import *
+from ha_test import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
-from qpidtoollibs import BrokerAgent
+from qpidtoollibs import BrokerAgent, EventHelper
from uuid import UUID
log = getLogger(__name__)
-class QmfAgent(object):
- """Access to a QMF broker agent."""
- def __init__(self, address, **kwargs):
- self._connection = Connection.establish(
- address, client_properties={"qpid.ha-admin":1}, **kwargs)
- self._agent = BrokerAgent(self._connection)
- assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
+def grep(filename, regexp):
+ for line in open(filename).readlines():
+ if (regexp.search(line)): return True
+ return False
- def __getattr__(self, name):
- a = getattr(self._agent, name)
- return a
+class HaBrokerTest(BrokerTest):
+ """Base class for HA broker tests"""
+ def assert_log_no_errors(self, broker):
+ log = broker.get_log()
+ if grep(log, re.compile("] error|] critical")):
+ self.fail("Errors in log file %s"%(log))
-class Credentials(object):
- """SASL credentials: username, password, and mechanism"""
- def __init__(self, username, password, mechanism):
- (self.username, self.password, self.mechanism) = (username, password, mechanism)
-
- def __str__(self): return "Credentials%s"%(self.tuple(),)
-
- def tuple(self): return (self.username, self.password, self.mechanism)
-
- def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
-
-class HaBroker(Broker):
- """Start a broker with HA enabled
- @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
- """
- def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
- client_credentials=None, **kwargs):
- assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- args = copy(args)
- args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=debug+:ha::",
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=%s"%ha_cluster]
- if ha_replicate is not None:
- args += [ "--ha-replicate=%s"%ha_replicate ]
- if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
- Broker.__init__(self, test, args, **kwargs)
- self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
- assert os.path.exists(self.qpid_ha_path)
- self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
- assert os.path.exists(self.qpid_config_path)
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- self.qpid_ha_script=import_script(self.qpid_ha_path)
- self._agent = None
- self.client_credentials = client_credentials
-
- def __str__(self): return Broker.__str__(self)
-
- def qpid_ha(self, args):
- cred = self.client_credentials
- url = self.host_port()
- if cred:
- url =cred.add_user(url)
- args = args + ["--sasl-mechanism", cred.mechanism]
- self.qpid_ha_script.main_except(["", "-b", url]+args)
-
- def promote(self): self.qpid_ha(["promote"])
- def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
- def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
- def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-
- def agent(self):
- if not self._agent:
- cred = self.client_credentials
- if cred:
- self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
- else:
- self._agent = QmfAgent(self.host_port())
- return self._agent
-
- def ha_status(self):
- hb = self.agent().getHaBroker()
- hb.update()
- return hb.status
-
- def wait_status(self, status):
- def try_get_status():
- # Ignore ConnectionError, the broker may not be up yet.
- try:
- self._status = self.ha_status()
- return self._status == status;
- except ConnectionError: return False
- assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status)
-
- # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
- def qpid_config(self, args):
- assert subprocess.call(
- [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
-
- def config_replicate(self, from_broker, queue):
- self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
-
- def config_declare(self, queue, replication):
- self.qpid_config(["add", "queue", queue, "--replicate", replication])
-
- def connect_admin(self, **kwargs):
- cred = self.client_credentials
- if cred:
- return Broker.connect(
- self, client_properties={"qpid.ha-admin":1},
- username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
- **kwargs)
- else:
- return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
-
- def wait_backup(self, address):
- """Wait for address to become valid on a backup broker."""
- bs = self.connect_admin().session()
- try: wait_address(bs, address)
- finally: bs.connection.close()
-
- def assert_browse(self, queue, expected, **kwargs):
- """Verify queue contents by browsing."""
- bs = self.connect().session()
- try:
- wait_address(bs, queue)
- assert_browse_retry(bs, queue, expected, **kwargs)
- finally: bs.connection.close()
-
- def assert_browse_backup(self, queue, expected, **kwargs):
- """Combines wait_backup and assert_browse_retry."""
- bs = self.connect_admin().session()
- try:
- wait_address(bs, queue)
- assert_browse_retry(bs, queue, expected, **kwargs)
- finally: bs.connection.close()
-
- def assert_connect_fail(self):
- try:
- self.connect()
- self.test.fail("Expected ConnectionError")
- except ConnectionError: pass
-
- def try_connect(self):
- try: return self.connect()
- except ConnectionError: return None
-
-class HaCluster(object):
- _cluster_count = 0
-
- def __init__(self, test, n, promote=True, **kwargs):
- """Start a cluster of n brokers"""
- self.test = test
- self.kwargs = kwargs
- self._brokers = []
- self.id = HaCluster._cluster_count
- self.broker_id = 0
- HaCluster._cluster_count += 1
- for i in xrange(n): self.start(False)
- self.update_urls()
- self[0].promote()
-
- def next_name(self):
- name="cluster%s-%s"%(self.id, self.broker_id)
- self.broker_id += 1
- return name
-
- def start(self, update_urls=True, args=[]):
- """Start a new broker in the cluster"""
- b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
- self._brokers.append(b)
- if update_urls: self.update_urls()
- return b
-
- def update_urls(self):
- self.url = ",".join([b.host_port() for b in self])
- if len(self) > 1: # No failover addresses on a 1 cluster.
- for b in self: b.set_brokers_url(self.url)
-
- def connect(self, i):
- """Connect with reconnect_urls"""
- return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
-
- def kill(self, i, promote_next=True):
- """Kill broker i, promote broker i+1"""
- self[i].expect = EXPECT_EXIT_FAIL
- self[i].kill()
- if promote_next: self[(i+1) % len(self)].promote()
-
- def restart(self, i):
- """Start a broker with the same port, name and data directory. It will get
- a separate log file: foo.n.log"""
- b = self._brokers[i]
- self._brokers[i] = HaBroker(
- self.test, name=b.name, port=b.port(), brokers_url=self.url,
- **self.kwargs)
-
- def bounce(self, i, promote_next=True):
- """Stop and restart a broker in a cluster."""
- self.kill(i, promote_next)
- self.restart(i)
-
- # Behave like a list of brokers.
- def __len__(self): return len(self._brokers)
- def __getitem__(self,index): return self._brokers[index]
- def __iter__(self): return self._brokers.__iter__()
-
-def wait_address(session, address):
- """Wait for an address to become valid."""
- def check():
- try:
- session.sender(address)
- return True
- except NotFound: return False
- assert retry(check), "Timed out waiting for address %s"%(address)
-
-def valid_address(session, address):
- """Test if an address is valid"""
- try:
- session.receiver(address)
- return True
- except NotFound: return False
-
-class ReplicationTests(BrokerTest):
+class ReplicationTests(HaBrokerTest):
"""Correctness tests for HA replication."""
def test_replication(self):
@@ -256,8 +53,9 @@ class ReplicationTests(BrokerTest):
def queue(name, replicate):
return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
- def exchange(name, replicate, bindq):
- return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+ def exchange(name, replicate, bindq, key):
+ return "%s/%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'topic'},x-bindings:[{exchange:'%s',queue:'%s',key:'%s'}]}}"%(name, key, replicate, name, bindq, key)
+
def setup(p, prefix, primary):
"""Create config, send messages on the primary p"""
s = p.sender(queue(prefix+"q1", "all"))
@@ -267,16 +65,21 @@ class ReplicationTests(BrokerTest):
p.acknowledge()
p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
p.sender(queue(prefix+"q3", "none")).send(Message("3"))
- p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
- p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+ p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(Message("5"))
# Test unbind
p.sender(queue(prefix+"q4", "all")).send(Message("6"))
- s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
+ s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4", "key4"))
s3.send(Message("7"))
# Use old connection to unbind
us = primary.connect_old().session(str(uuid4()))
- us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4")
+ us.exchange_unbind(exchange=prefix+"e4", binding_key="key4", queue=prefix+"q4")
p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
+ # Test replication of deletes
+ p.sender(queue(prefix+"dq", "all"))
+ p.sender(exchange(prefix+"de", "all", prefix+"dq", ""))
+ p.sender(prefix+"dq;{delete:always}").close()
+ p.sender(prefix+"de;{delete:always}").close()
# Need a marker so we can wait till sync is done.
p.sender(queue(prefix+"x", "configuration"))
@@ -292,50 +95,61 @@ class ReplicationTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
assert not valid_address(b, prefix+"q3")
- b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+
+ # Verify exchange with replicate=all
+ b.sender(prefix+"e1/key1").send(Message(prefix+"e1"))
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
- b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
+
+ # Verify exchange with replicate=configuration
+ b.sender(prefix+"e2/key2").send(Message(prefix+"e2"))
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
- b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
+ b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind.
self.assert_browse_retry(b, prefix+"q4", ["6","7"])
- primary = HaBroker(self, name="primary")
- primary.promote()
- p = primary.connect().session()
+ # Verify deletes
+ assert not valid_address(b, prefix+"dq")
+ assert not valid_address(b, prefix+"de")
- # Create config, send messages before starting the backup, to test catch-up replication.
- setup(p, "1", primary)
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
- # Create config, send messages after starting the backup, to test steady-state replication.
- setup(p, "2", primary)
-
- # Verify the data on the backup
- b = backup.connect_admin().session()
- verify(b, "1", p)
- verify(b, "2", p)
- # Test a series of messages, enqueue all then dequeue all.
- s = p.sender(queue("foo","all"))
- wait_address(b, "foo")
- msgs = [str(i) for i in range(10)]
- for m in msgs: s.send(Message(m))
- self.assert_browse_retry(p, "foo", msgs)
- self.assert_browse_retry(b, "foo", msgs)
- r = p.receiver("foo")
- for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
- p.acknowledge()
- self.assert_browse_retry(p, "foo", [])
- self.assert_browse_retry(b, "foo", [])
-
- # Another series, this time verify each dequeue individually.
- for m in msgs: s.send(Message(m))
- self.assert_browse_retry(p, "foo", msgs)
- self.assert_browse_retry(b, "foo", msgs)
- for i in range(len(msgs)):
- self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ p = primary.connect().session()
+
+ # Create config, send messages before starting the backup, to test catch-up replication.
+ setup(p, "1", primary)
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ # Create config, send messages after starting the backup, to test steady-state replication.
+ setup(p, "2", primary)
+
+ # Verify the data on the backup
+ b = backup.connect_admin().session()
+ verify(b, "1", p)
+ verify(b, "2", p)
+ # Test a series of messages, enqueue all then dequeue all.
+ s = p.sender(queue("foo","all"))
+ wait_address(b, "foo")
+ msgs = [str(i) for i in range(10)]
+ for m in msgs: s.send(Message(m))
+ self.assert_browse_retry(p, "foo", msgs)
+ self.assert_browse_retry(b, "foo", msgs)
+ r = p.receiver("foo")
+ for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
p.acknowledge()
- self.assert_browse_retry(p, "foo", msgs[i+1:])
- self.assert_browse_retry(b, "foo", msgs[i+1:])
+ self.assert_browse_retry(p, "foo", [])
+ self.assert_browse_retry(b, "foo", [])
+
+ # Another series, this time verify each dequeue individually.
+ for m in msgs: s.send(Message(m))
+ self.assert_browse_retry(p, "foo", msgs)
+ self.assert_browse_retry(b, "foo", msgs)
+ for i in range(len(msgs)):
+ self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+ p.acknowledge()
+ self.assert_browse_retry(p, "foo", msgs[i+1:])
+ self.assert_browse_retry(b, "foo", msgs[i+1:])
+ finally: l.restore()
def test_sync(self):
primary = HaBroker(self, name="primary")
@@ -361,53 +175,59 @@ class ReplicationTests(BrokerTest):
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
- brokers = HaCluster(self, 3)
- sender = self.popen(
- ["qpid-send",
- "--broker", brokers[0].host_port(),
- "--address", "q;{create:always}",
- "--messages=1000",
- "--content-string=x"
- ])
- receiver = self.popen(
- ["qpid-receive",
- "--broker", brokers[0].host_port(),
- "--address", "q;{create:always}",
- "--messages=990",
- "--timeout=10"
- ])
- self.assertEqual(sender.wait(), 0)
- self.assertEqual(receiver.wait(), 0)
- expect = [long(i) for i in range(991, 1001)]
- sn = lambda m: m.properties["sn"]
- brokers[1].assert_browse_backup("q", expect, transform=sn)
- brokers[2].assert_browse_backup("q", expect, transform=sn)
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ brokers = HaCluster(self, 3)
+ sender = self.popen(
+ ["qpid-send",
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
+ "--messages=1000",
+ "--content-string=x"
+ ])
+ receiver = self.popen(
+ ["qpid-receive",
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
+ "--messages=990",
+ "--timeout=10"
+ ])
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ brokers[1].assert_browse_backup("q", expect, transform=sn)
+ brokers[2].assert_browse_backup("q", expect, transform=sn)
+ finally: l.restore()
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
- primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
- primary.promote()
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
- # Check that backup rejects normal connections
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
try:
- backup.connect().session()
- self.fail("Expected connection to backup to fail")
- except ConnectionError: pass
- # Check that admin connections are allowed to backup.
- backup.connect_admin().close()
-
- # Test discovery: should connect to primary after reject by backup
- c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
- s = c.session()
- sender = s.sender("q;{create:always}")
- backup.wait_backup("q")
- sender.send("foo")
- primary.kill()
- assert retry(lambda: not is_running(primary.pid))
- backup.promote()
- sender.send("bar")
- self.assert_browse_retry(s, "q", ["foo", "bar"])
- c.close()
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ # Check that backup rejects normal connections
+ try:
+ backup.connect().session()
+ self.fail("Expected connection to backup to fail")
+ except ConnectionError: pass
+ # Check that admin connections are allowed to backup.
+ backup.connect_admin().close()
+
+ # Test discovery: should connect to primary after reject by backup
+ c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+ s = c.session()
+ sender = s.sender("q;{create:always}")
+ backup.wait_backup("q")
+ sender.send("foo")
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid))
+ backup.promote()
+ sender.send("bar")
+ self.assert_browse_retry(s, "q", ["foo", "bar"])
+ c.close()
+ finally: l.restore()
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
@@ -456,51 +276,61 @@ class ReplicationTests(BrokerTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
- getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- primary = HaBroker(self, name="primary", ha_cluster=False)
- pc = primary.connect()
- ps = pc.session().sender("q;{create:always}")
- pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False)
- br = backup.connect().session().receiver("q;{create:always}")
-
- # Set up replication with qpid-ha
- backup.replicate(primary.host_port(), "q")
- ps.send("a")
- backup.assert_browse_backup("q", ["a"])
- ps.send("b")
- backup.assert_browse_backup("q", ["a", "b"])
- self.assertEqual("a", pr.fetch().content)
- pr.session.acknowledge()
- backup.assert_browse_backup("q", ["b"])
-
- # Set up replication with qpid-config
- ps2 = pc.session().sender("q2;{create:always}")
- backup.config_replicate(primary.host_port(), "q2");
- ps2.send("x")
- backup.assert_browse_backup("q2", ["x"])
-
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ primary = HaBroker(self, name="primary", ha_cluster=False,
+ args=["--ha-queue-replication=yes"]);
+ pc = primary.connect()
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
+ br = backup.connect().session().receiver("q;{create:always}")
+
+ # Set up replication with qpid-ha
+ backup.replicate(primary.host_port(), "q")
+ ps.send("a")
+ backup.assert_browse_backup("q", ["a"])
+ ps.send("b")
+ backup.assert_browse_backup("q", ["a", "b"])
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ backup.assert_browse_backup("q", ["b"])
+
+ # Set up replication with qpid-config
+ ps2 = pc.session().sender("q2;{create:always}")
+ backup.config_replicate(primary.host_port(), "q2");
+ ps2.send("x")
+ backup.assert_browse_backup("q2", ["x"])
+ finally: l.restore()
def test_queue_replica_failover(self):
- """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
- cluster = HaCluster(self, 2)
- primary = cluster[0]
- pc = cluster.connect(0)
- ps = pc.session().sender("q;{create:always}")
- pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False)
- br = backup.connect().session().receiver("q;{create:always}")
- backup.replicate(cluster.url, "q")
- ps.send("a")
- backup.assert_browse_backup("q", ["a"])
- cluster.bounce(0)
- backup.assert_browse_backup("q", ["a"])
- ps.send("b")
- backup.assert_browse_backup("q", ["a", "b"])
- cluster.bounce(1)
- self.assertEqual("a", pr.fetch().content)
- pr.session.acknowledge()
- backup.assert_browse_backup("q", ["b"])
+ """Test individual queue replication from a cluster to a standalone
+ backup broker, verify it fails over."""
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ cluster = HaCluster(self, 2)
+ primary = cluster[0]
+ pc = cluster.connect(0)
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
+ br = backup.connect().session().receiver("q;{create:always}")
+ backup.replicate(cluster.url, "q")
+ ps.send("a")
+ backup.assert_browse_backup("q", ["a"])
+ cluster.bounce(0)
+ backup.assert_browse_backup("q", ["a"])
+ ps.send("b")
+ backup.assert_browse_backup("q", ["a", "b"])
+ cluster.bounce(1)
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ backup.assert_browse_backup("q", ["b"])
+ pc.close()
+ br.close()
+ finally: l.restore()
def test_lvq(self):
"""Verify that we replicate to an LVQ correctly"""
@@ -634,8 +464,10 @@ class ReplicationTests(BrokerTest):
def test_replicate_default(self):
"""Make sure we don't replicate if ha-replicate is unspecified or none"""
cluster1 = HaCluster(self, 2, ha_replicate=None)
+ cluster1[1].wait_status("ready")
c1 = cluster1[0].connect().session().sender("q;{create:always}")
cluster2 = HaCluster(self, 2, ha_replicate="none")
+ cluster2[1].wait_status("ready")
cluster2[0].connect().session().sender("q;{create:always}")
time.sleep(.1) # Give replication a chance.
try:
@@ -647,6 +479,23 @@ class ReplicationTests(BrokerTest):
self.fail("Excpected no-such-queue exception")
except NotFound: pass
+ def test_replicate_binding(self):
+ """Verify that binding replication can be disabled"""
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ ps = primary.connect().session()
+ ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
+ ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
+ backup.wait_backup("q")
+
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
+ backup.promote()
+ bs = backup.connect_admin().session()
+ bs.sender("ex").send(Message("msg"))
+ self.assert_browse_retry(bs, "q", [])
+
def test_invalid_replication(self):
"""Verify that we reject an attempt to declare a queue with invalid replication value."""
cluster = HaCluster(self, 1, ha_replicate="all")
@@ -672,20 +521,26 @@ class ReplicationTests(BrokerTest):
def test_auto_delete_exclusive(self):
"""Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
- cluster = HaCluster(self,2)
- s = cluster[0].connect().session()
- s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
- s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
- s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
- s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
- s.receiver("q;{create:always}")
+ cluster = HaCluster(self, 2)
+ s0 = cluster[0].connect().session()
+ s0.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+ s0.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+ ad = s0.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+ s0.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ s0.receiver("q;{create:always}")
- s = cluster[1].connect_admin().session()
+ s1 = cluster[1].connect_admin().session()
cluster[1].wait_backup("q")
- assert not valid_address(s, "exad")
- assert valid_address(s, "ex")
- assert valid_address(s, "ad")
- assert valid_address(s, "time")
+ assert not valid_address(s1, "exad")
+ assert valid_address(s1, "ex")
+ assert valid_address(s1, "ad")
+ assert valid_address(s1, "time")
+
+ # Verify that auto-delete queues are not kept alive by
+ # replicating subscriptions
+ ad.close()
+ s0.sync()
+ assert not valid_address(s0, "ad")
def test_broker_info(self):
"""Check that broker information is correctly published via management"""
@@ -763,18 +618,18 @@ acl deny all all
s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
# altq queue bound to altex, collect re-routed messages.
s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
- # 0ex exchange with alternate-exchange altex and no queues bound
- s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # ex exchange with alternate-exchange altex and no queues bound
+ s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
# create queue q with alternate-exchange altex
s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
# create a bunch of exchanges to ensure we don't clean up prematurely if the
# response comes in multiple fragments.
- for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+ for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i)
def verify(broker):
s = broker.connect().session()
# Verify unmatched message goes to ex's alternate.
- s.sender("0ex").send("foo")
+ s.sender("ex").send("foo")
altq = s.receiver("altq")
self.assertEqual("foo", altq.fetch(timeout=0).content)
s.acknowledge()
@@ -786,20 +641,265 @@ acl deny all all
self.assertEqual("bar", altq.fetch(timeout=0).content)
s.acknowledge()
+ def ss(n): return cluster[n].connect().session()
+
# Sanity check: alternate exchanges on original broker
verify(cluster[0])
+ # Altex is in use as an alternate exchange.
+ self.assertRaises(SessionError,
+ lambda:ss(0).sender("altex;{delete:always}").close())
# Check backup that was connected during setup.
- cluster[1].wait_backup("0ex")
+ cluster[1].wait_status("ready")
+ cluster[1].wait_backup("ex")
cluster[1].wait_backup("q")
cluster.bounce(0)
verify(cluster[1])
+
# Check a newly started backup.
cluster.start()
- cluster[2].wait_backup("0ex")
+ cluster[2].wait_status("ready")
+ cluster[2].wait_backup("ex")
cluster[2].wait_backup("q")
cluster.bounce(1)
verify(cluster[2])
+ # Check that alt-exchange in-use count is replicated
+ s = cluster[2].connect().session();
+
+ self.assertRaises(SessionError,
+ lambda:ss(2).sender("altex;{delete:always}").close())
+ s.sender("q;{delete:always}").close()
+ self.assertRaises(SessionError,
+ lambda:ss(2).sender("altex;{delete:always}").close())
+ s.sender("ex;{delete:always}").close()
+ s.sender("altex;{delete:always}").close()
+
+ def test_priority_reroute(self):
+ """Regression test for QPID-4262, rerouting messages from a priority queue
+ to itself causes a crash"""
+ cluster = HaCluster(self, 2)
+ primary = cluster[0]
+ session = primary.connect().session()
+ s = session.sender("pq; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}},x-bindings:[{exchange:'amq.fanout',queue:pq}]}}")
+ for m in xrange(100): s.send(Message(str(m), priority=m%10))
+ pq = QmfAgent(primary.host_port()).getQueue("pq")
+ pq.reroute(request=0, useAltExchange=False, exchange="amq.fanout")
+ # Verify that consuming is in priority order
+ expect = [str(10*i+p) for p in xrange(9,-1,-1) for i in xrange(0,10) ]
+ actual = [m.content for m in primary.get_messages("pq", 100)]
+ self.assertEqual(expect, actual)
+
+ def test_delete_missing_response(self):
+ """Check that a backup correctly deletes leftover queues and exchanges that are
+ missing from the initial reponse set."""
+ # This test is a bit contrived, we set up the situation on backup brokers
+ # and then promote one.
+ cluster = HaCluster(self, 2, promote=False)
+
+ # cluster[0] Will be the primary
+ s = cluster[0].connect_admin().session()
+ s.sender("q1;{create:always}")
+ s.sender("e1;{create:always, node:{type:topic}}")
+
+ # cluster[1] will be the backup, has extra queues/exchanges
+ xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
+ node = "node:{%s}"%(xdecl)
+ s = cluster[1].connect_admin().session()
+ s.sender("q1;{create:always, %s}"%(node))
+ s.sender("q2;{create:always, %s}"%(node))
+ s.sender("e1;{create:always, node:{type:topic, %s}}"%(xdecl))
+ s.sender("e2;{create:always, node:{type:topic, %s}}"%(xdecl))
+ for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a)
+
+ cluster[0].promote()
+ # Verify the backup deletes the surplus queue and exchange
+ cluster[1].wait_status("ready")
+ s = cluster[1].connect_admin().session()
+ self.assertRaises(NotFound, s.receiver, ("q2"));
+ self.assertRaises(NotFound, s.receiver, ("e2"));
+
+
+ def test_delete_qpid_4285(self):
+ """Regression test for QPID-4285: on deleting a queue it gets stuck in a
+ partially deleted state and causes replication errors."""
+ cluster = HaCluster(self,2)
+ s = cluster[0].connect().session()
+ s.receiver("q;{create:always}")
+ cluster[1].wait_backup("q")
+ cluster.kill(0) # Make the backup take over.
+ s = cluster[1].connect().session()
+ s.receiver("q;{delete:always}").close() # Delete q on new primary
+ try:
+ s.receiver("q")
+ self.fail("Expected NotFound exception") # Should not be avaliable
+ except NotFound: pass
+ assert not cluster[1].agent().getQueue("q") # Should not be in QMF
+
+ def alt_setup(self, session, suffix):
+ # Create exchange to use as alternate and a queue bound to it.
+ # altex exchange: acts as alternate exchange
+ session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
+ # altq queue bound to altex, collect re-routed messages.
+ session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+
+ def test_auto_delete_close(self):
+ """Verify auto-delete queues are deleted on backup if auto-deleted
+ on primary"""
+ cluster=HaCluster(self, 2)
+ p = cluster[0].connect().session()
+ self.alt_setup(p, "1")
+ r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+ s = p.sender("adq1")
+ for m in ["aa","bb","cc"]: s.send(m)
+ p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+ cluster[1].wait_queue("adq1")
+ cluster[1].wait_queue("adq2")
+ r.close() # trigger auto-delete of adq1
+ cluster[1].wait_no_queue("adq1")
+ cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+ cluster[1].wait_queue("adq2")
+
+ def test_auto_delete_crash(self):
+ """Verify auto-delete queues are deleted on backup if the primary crashes"""
+ cluster=HaCluster(self, 2)
+ p = cluster[0].connect().session()
+ self.alt_setup(p,"1")
+
+ # adq1 is subscribed so will be auto-deleted.
+ r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+ s = p.sender("adq1")
+ for m in ["aa","bb","cc"]: s.send(m)
+ # adq2 is subscribed after cluster[2] starts.
+ p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+ # adq3 is never subscribed.
+ p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}")
+
+ cluster.start()
+ cluster[2].wait_status("ready")
+
+ p.receiver("adq2") # Subscribed after cluster[2] joined
+
+ for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q)
+ for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q)
+ cluster[0].kill()
+
+ cluster[1].wait_no_queue("adq1")
+ cluster[1].wait_no_queue("adq2")
+ cluster[1].wait_queue("adq3")
+
+ cluster[2].wait_no_queue("adq1")
+ cluster[2].wait_no_queue("adq2")
+ cluster[2].wait_queue("adq3")
+
+ cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+ cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"])
+
+ def test_auto_delete_timeout(self):
+ cluster = HaCluster(self, 2)
+ # Test timeout
+ r1 = cluster[0].connect().session().receiver("q1;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ # Test special case of timeout = 0
+ r0 = cluster[0].connect().session().receiver("q0;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':0}}}}")
+ cluster[1].wait_queue("q0")
+ cluster[1].wait_queue("q1")
+ cluster[0].kill()
+ cluster[1].wait_queue("q1") # Not timed out yet
+ cluster[1].wait_no_queue("q1", timeout=5) # Wait for timeout
+ cluster[1].wait_no_queue("q0", timeout=5) # Wait for timeout
+
+ def test_alt_exchange_dup(self):
+ """QPID-4349: if a queue has an alterante exchange and is deleted the
+ messages appear twice on the alternate, they are rerouted once by the
+ primary and again by the backup."""
+ cluster = HaCluster(self,2)
+
+ # Set up q with alternate exchange altex bound to altq.
+ s = cluster[0].connect().session()
+ s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+ s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+ snd = s.sender("q;{create:always,node:{x-declare:{alternate-exchange:'altex'}}}")
+ messages = [ str(n) for n in xrange(10) ]
+ for m in messages: snd.send(m)
+ cluster[1].assert_browse_backup("q", messages)
+ s.sender("q;{delete:always}").close()
+ cluster[1].assert_browse_backup("altq", messages)
+
+ def test_expired(self):
+ """Regression test for QPID-4379: HA does not properly handle expired messages"""
+ # Race between messages expiring and HA replicating consumer.
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session().sender("q;{create:always}", capacity=2)
+ def send_ttl_messages():
+ for i in xrange(100): s.send(Message(str(i), ttl=0.001), timeout=1)
+ send_ttl_messages()
+ cluster.start()
+ send_ttl_messages()
+
+ def test_stale_response(self):
+ """Check for race condition where a stale response is processed after an
+ event for the same queue/exchange """
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session()
+ s.sender("keep;{create:always}") # Leave this queue in place.
+ for i in xrange(1000):
+ s.sender("deleteme%s;{create:always,delete:always}"%(i)).close()
+ # It is possible for the backup to attempt to subscribe after the queue
+ # is deleted. This is not an error, but is logged as an error on the primary.
+ # The backup does not log this as an error so we only check the backup log for errors.
+ self.assert_log_no_errors(cluster[1])
+
+ def test_missed_recreate(self):
+ """If a queue or exchange is destroyed and one with the same name re-created
+ while a backup is disconnected, the backup should also delete/recreate
+ the object when it re-connects"""
+ cluster = HaCluster(self, 3)
+ sn = cluster[0].connect().session()
+ # Create a queue with messages
+ s = sn.sender("qq;{create:always}")
+ msgs = [str(i) for i in xrange(3)]
+ for m in msgs: s.send(m)
+ cluster[1].assert_browse_backup("qq", msgs)
+ cluster[2].assert_browse_backup("qq", msgs)
+ # Set up an exchange with a binding.
+ sn.sender("xx;{create:always,node:{type:topic}}")
+ sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}")
+ cluster[1].wait_address("xx")
+ self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1)
+ cluster[2].wait_address("xx")
+ self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
+
+ # Simulate the race by re-creating the objects before promoting the new primary
+ cluster.kill(0, False)
+ xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
+ node = "node:{%s}"%(xdecl)
+ sn = cluster[1].connect_admin().session()
+ sn.sender("qq;{delete:always}").close()
+ s = sn.sender("qq;{create:always, %s}"%(node))
+ s.send("foo")
+ sn.sender("xx;{delete:always}").close()
+ sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl))
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ # Verify we are not still using the old objects on cluster[2]
+ cluster[2].assert_browse_backup("qq", ["foo"])
+ cluster[2].wait_address("xx")
+ self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0)
+
+ def test_redeclare_exchange(self):
+ """Ensure that re-declaring an exchange is an HA no-op"""
+ cluster = HaCluster(self, 2)
+ ps = cluster[0].connect().session()
+ ps.sender("ex1;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
+ ps.sender("ex2;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout', alternate-exchange:'ex1'}}}")
+ cluster[1].wait_backup("ex1")
+ cluster[1].wait_backup("ex2")
+
+ # Use old API to re-declare the exchange
+ old_conn = cluster[0].connect_old()
+ old_sess = old_conn.session(str(qpid.datatypes.uuid4()))
+ old_sess.exchange_declare(exchange='ex1', type='fanout')
+ cluster[1].wait_backup("ex1")
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -812,7 +912,7 @@ def fairshare(msgs, limit, levels):
msgs = postponed
count = 0
last_priority = None
- postponed = []
+ postponed = [ ]
msg = msgs.pop(0)
if last_priority and priority_level(msg.priority, levels) == last_priority:
count += 1
@@ -834,7 +934,7 @@ def priority_level(value, levels):
offset = 5-math.ceil(levels/2.0)
return min(max(value - offset, 0), levels-1)
-class LongTests(BrokerTest):
+class LongTests(HaBrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -860,7 +960,7 @@ class LongTests(BrokerTest):
"""Wait for receiver r to pass n"""
def check():
r.check() # Verify no exceptions
- return r.received > n
+ return r.received > n + 100
assert retry(check), "Stalled %s at %s"%(r.queue, n)
for r in receivers: wait_passed(r, 0)
@@ -868,87 +968,176 @@ class LongTests(BrokerTest):
# Kill and restart brokers in a cycle:
endtime = time.time() + self.duration()
i = 0
+ primary = 0
try:
while time.time() < endtime or i < 3: # At least 3 iterations
+ # Precondition: All 3 brokers running,
+ # primary = index of promoted primary
+ # one or two backups are running,
for s in senders: s.sender.assert_running()
for r in receivers: r.receiver.assert_running()
- checkpoint = [ r.received for r in receivers ]
- # Don't kill primary till it is active and the next
- # backup is ready, otherwise we can lose messages.
- brokers[i%3].wait_status("active")
- brokers[(i+1)%3].wait_status("ready")
- brokers.bounce(i%3)
+ checkpoint = [ r.received+100 for r in receivers ]
+ dead = None
+ victim = random.randint(0,2)
+ if victim == primary:
+ # Don't kill primary till it is active and the next
+ # backup is ready, otherwise we can lose messages.
+ brokers[victim].wait_status("active")
+ next = (victim+1)%3
+ brokers[next].wait_status("ready")
+ brokers.bounce(victim) # Next one is promoted
+ primary = next
+ else:
+ brokers.kill(victim, False)
+ dead = victim
+
+ # At this point the primary is running with 1 or 2 backups
+ # Make sure we are not stalled
+ map(wait_passed, receivers, checkpoint)
+ # Run another checkpoint to ensure things work in this configuration
+ checkpoint = [ r.received+100 for r in receivers ]
+ map(wait_passed, receivers, checkpoint)
+
+ if dead is not None:
+ brokers.restart(dead) # Restart backup
+ brokers[dead].ready()
+ dead = None
i += 1
- map(wait_passed, receivers, checkpoint) # Wait for all receivers
except:
traceback.print_exc()
raise
finally:
for s in senders: s.stop()
for r in receivers: r.stop()
- dead = []
+ unexpected_dead = []
for i in xrange(3):
- if not brokers[i].is_running(): dead.append(i)
- brokers.kill(i, False)
- if dead: raise Exception("Brokers not running: %s"%dead)
+ if not brokers[i].is_running() and i != dead:
+ unexpected_dead.append(i)
+ if brokers[i].is_running(): brokers.kill(i, False)
+ if unexpected_dead:
+ raise Exception("Brokers not running: %s"%unexpected_dead)
+
+ def test_qmf_order(self):
+ """QPID 4402: HA QMF events can be out of order.
+ This test mimics the test described in the JIRA. Two threads repeatedly
+ declare the same auto-delete queue and close their connection.
+ """
+ broker = Broker(self)
+ class Receiver(Thread):
+ def __init__(self, qname):
+ Thread.__init__(self)
+ self.qname = qname
+ self.stopped = False
+
+ def run(self):
+ while not self.stopped:
+ self.connection = broker.connect()
+ try:
+ self.connection.session().receiver(
+ self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}")
+ except NotFound: pass # Can occur occasionally, not an error.
+ try: self.connection.close()
+ except: pass
+
+ class QmfObject(object):
+ """Track existance of an object and validate QMF events"""
+ def __init__(self, type_name, name_field, name):
+ self.type_name, self.name_field, self.name = type_name, name_field, name
+ self.exists = False
+
+ def qmf_event(self, event):
+ content = event.content[0]
+ event_type = content['_schema_id']['_class_name']
+ values = content['_values']
+ if event_type == self.type_name+"Declare" and values[self.name_field] == self.name:
+ disp = values['disp']
+ log.debug("Event %s: disp=%s exists=%s"%(
+ event_type, values['disp'], self.exists))
+ if self.exists: assert values['disp'] == 'existing'
+ else: assert values['disp'] == 'created'
+ self.exists = True
+ elif event_type == self.type_name+"Delete" and values[self.name_field] == self.name:
+ log.debug("Event %s: exists=%s"%(event_type, self.exists))
+ assert self.exists
+ self.exists = False
+
+ # Verify order of QMF events.
+ helper = EventHelper()
+ r = broker.connect().session().receiver(helper.eventAddress())
+ threads = [Receiver("qq"), Receiver("qq")]
+ for t in threads: t.start()
+ queue = QmfObject("queue", "qName", "qq")
+ finish = time.time() + self.duration()
+ try:
+ while time.time() < finish:
+ queue.qmf_event(r.fetch())
+ finally:
+ for t in threads: t.stopped = True; t.join()
-class RecoveryTests(BrokerTest):
+class RecoveryTests(HaBrokerTest):
"""Tests for recovery after a failure."""
def test_queue_hold(self):
"""Verify that the broker holds queues without sufficient backup,
i.e. does not complete messages sent to those queues."""
- # We don't want backups to time out for this test, set long timeout.
- cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
- # Wait for the primary to be ready
- cluster[0].wait_status("active")
- # Create a queue before the failure.
- s1 = cluster.connect(0).session().sender("q1;{create:always}")
- for b in cluster: b.wait_backup("q1")
- for i in xrange(100): s1.send(str(i))
- # Kill primary and 2 backups
- for i in [0,1,2]: cluster.kill(i, False)
- cluster[3].promote() # New primary, backups will be 1 and 2
- cluster[3].wait_status("recovering")
-
- def assertSyncTimeout(s):
- try:
- s.sync(timeout=.01)
- self.fail("Expected Timeout exception")
- except Timeout: pass
-
- # Create a queue after the failure
- s2 = cluster.connect(3).session().sender("q2;{create:always}")
-
- # Verify that messages sent are not completed
- for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
- assertSyncTimeout(s1)
- self.assertEqual(s1.unsettled(), 100)
- assertSyncTimeout(s2)
- self.assertEqual(s2.unsettled(), 100)
-
- # Verify we can receive even if sending is on hold:
- cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
-
- # Restart backups, verify queues are released only when both backups are up
- cluster.restart(1)
- assertSyncTimeout(s1)
- self.assertEqual(s1.unsettled(), 100)
- assertSyncTimeout(s2)
- self.assertEqual(s2.unsettled(), 100)
- self.assertEqual(cluster[3].ha_status(), "recovering")
- cluster.restart(2)
-
- # Verify everything is up to date and active
- def settled(sender): sender.sync(); return sender.unsettled() == 0;
- assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
- assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
- cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
- cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
- cluster[3].wait_status("active"),
- s1.session.connection.close()
- s2.session.connection.close()
+ l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+ try:
+ # We don't want backups to time out for this test, set long timeout.
+ cluster = HaCluster(self, 4, args=["--ha-backup-timeout=120"]);
+ # Wait for the primary to be ready
+ cluster[0].wait_status("active")
+ for b in cluster[1:4]: b.wait_status("ready")
+ # Create a queue before the failure.
+ s1 = cluster.connect(0).session().sender("q1;{create:always}")
+ for b in cluster: b.wait_backup("q1")
+ for i in xrange(100): s1.send(str(i))
+
+ # Kill primary and 2 backups
+ cluster[3].wait_status("ready")
+ for i in [0,1,2]: cluster.kill(i, False)
+ cluster[3].promote() # New primary, backups will be 1 and 2
+ cluster[3].wait_status("recovering")
+
+ def assertSyncTimeout(s):
+ try:
+ s.sync(timeout=.01)
+ self.fail("Expected Timeout exception")
+ except Timeout: pass
+
+ # Create a queue after the failure
+ s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+ # Verify that messages sent are not completed
+ for i in xrange(100,200):
+ s1.send(str(i), sync=False);
+ s2.send(str(i), sync=False)
+ assertSyncTimeout(s1)
+ self.assertEqual(s1.unsettled(), 100)
+ assertSyncTimeout(s2)
+ self.assertEqual(s2.unsettled(), 100)
+
+ # Verify we can receive even if sending is on hold:
+ cluster[3].assert_browse("q1", [str(i) for i in range(200)])
+
+ # Restart backups, verify queues are released only when both backups are up
+ cluster.restart(1)
+ assertSyncTimeout(s1)
+ self.assertEqual(s1.unsettled(), 100)
+ assertSyncTimeout(s2)
+ self.assertEqual(s2.unsettled(), 100)
+ cluster.restart(2)
+
+ # Verify everything is up to date and active
+ def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0;
+ assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+ assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[3].wait_status("active"),
+ s1.session.connection.close()
+ s2.session.connection.close()
+ finally: l.restore()
def test_expected_backup_timeout(self):
"""Verify that we time-out expected backups and release held queues
@@ -972,6 +1161,52 @@ class RecoveryTests(BrokerTest):
s.sync(timeout=1) # And released after the timeout.
self.assertEqual(cluster[2].ha_status(), "active")
+ def test_join_ready_cluster(self):
+ """If we join a cluster where the primary is dead, the new primary is
+ not yet promoted and there are ready backups then we should refuse
+ promotion so that one of the ready backups can be chosen."""
+ # FIXME aconway 2012-10-05: smaller timeout
+ cluster = HaCluster(self, 2, args=["--link-heartbeat-interval", 1])
+ cluster[0].wait_status("active")
+ cluster[1].wait_status("ready")
+ cluster.bounce(0, promote_next=False)
+ self.assertRaises(Exception, cluster[0].promote)
+ os.kill(cluster[1].pid, signal.SIGSTOP) # Test for timeout if unresponsive.
+ cluster.bounce(0, promote_next=False)
+ cluster[0].promote()
+
+
+class ConfigurationTests(HaBrokerTest):
+ """Tests for configuration settings."""
+
+ def test_client_broker_url(self):
+ """Check that setting of broker and public URLs obeys correct defaulting
+ and precedence"""
+
+ def check(broker, brokers, public):
+ qmf = broker.qmf()
+ self.assertEqual(brokers, qmf.brokersUrl)
+ self.assertEqual(public, qmf.publicUrl)
+
+ def start(brokers, public, known=None):
+ args=[]
+ if brokers: args.append("--ha-brokers-url="+brokers)
+ if public: args.append("--ha-public-url="+public)
+ if known: args.append("--known-hosts-url="+known)
+ return HaBroker(self, args=args)
+
+ # Both set explictily, no defaulting
+ b = start("foo:123", "bar:456")
+ check(b, "amqp:tcp:foo:123", "amqp:tcp:bar:456")
+ b.set_brokers_url("foo:999")
+ check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:456")
+ b.set_public_url("bar:999")
+ check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:999")
+
+ # Allow "none" to mean "not set"
+ b = start("none", "none")
+ check(b, "", "")
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")
@@ -979,5 +1214,5 @@ if __name__ == "__main__":
os.execvp("qpid-python-test",
["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
else:
- print "Skipping ha_tests, qpid_ha not available"
+ print "Skipping ha_tests, %s not available"%(qpid_ha)