summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-xcpp/src/tests/ha_tests.py539
1 files changed, 436 insertions, 103 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 827cb7dca9..d25281eed5 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -18,59 +18,123 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
+import traceback
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
+from uuid import UUID
log = getLogger(__name__)
+class QmfAgent(object):
+ """Access to a QMF broker agent."""
+ def __init__(self, address, **kwargs):
+ self._connection = Connection.establish(
+ address, client_properties={"qpid.ha-admin":1}, **kwargs)
+ self._agent = BrokerAgent(self._connection)
+ assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
+
+ def __getattr__(self, name):
+ a = getattr(self._agent, name)
+ return a
+
+class Credentials(object):
+ """SASL credentials: username, password, and mechanism"""
+ def __init__(self, username, password, mechanism):
+ (self.username, self.password, self.mechanism) = (username, password, mechanism)
+
+ def __str__(self): return "Credentials%s"%(self.tuple(),)
+
+ def tuple(self): return (self.username, self.password, self.mechanism)
+
+ def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
- ha_replicate="all", **kwargs):
+ """Start a broker with HA enabled
+ @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
+ """
+ def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
+ client_credentials=None, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=info+", "--log-enable=debug+:ha::",
+ "--log-enable=debug+:ha::",
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
if ha_replicate is not None:
args += [ "--ha-replicate=%s"%ha_replicate ]
- if broker_url: args.extend([ "--ha-brokers", broker_url ])
+ if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
Broker.__init__(self, test, args, **kwargs)
- self.commands=os.getenv("PYTHON_COMMANDS")
- assert os.path.isdir(self.commands)
+ self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
+ assert os.path.exists(self.qpid_ha_path)
+ self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
+ assert os.path.exists(self.qpid_config_path)
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
-
- def promote(self):
- assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
-
- def set_client_url(self, url):
- assert os.system(
- "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
-
- def set_broker_url(self, url):
- assert os.system(
- "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
-
- def replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ self.qpid_ha_script=import_script(self.qpid_ha_path)
+ self._agent = None
+ self.client_credentials = client_credentials
+
+ def __str__(self): return Broker.__str__(self)
+
+ def qpid_ha(self, args):
+ cred = self.client_credentials
+ url = self.host_port()
+ if cred:
+ url =cred.add_user(url)
+ args = args + ["--sasl-mechanism", cred.mechanism]
+ self.qpid_ha_script.main_except(["", "-b", url]+args)
+
+ def promote(self): self.qpid_ha(["promote"])
+ def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
+ def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+ def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+
+ def agent(self):
+ if not self._agent:
+ cred = self.client_credentials
+ if cred:
+ self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
+ else:
+ self._agent = QmfAgent(self.host_port())
+ return self._agent
+
+ def ha_status(self):
+ hb = self.agent().getHaBroker()
+ hb.update()
+ return hb.status
+
+ def wait_status(self, status):
+ def try_get_status():
+ # Ignore ConnectionError, the broker may not be up yet.
+ try: return self.ha_status() == status;
+ except ConnectionError: return False
+ assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status)
+
+ # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+ def qpid_config(self, args):
+ assert subprocess.call(
+ [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
def config_replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
def config_declare(self, queue, replication):
- assert os.system(
- "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+ self.qpid_config(["add", "queue", queue, "--replicate", replication])
def connect_admin(self, **kwargs):
- return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+ cred = self.client_credentials
+ if cred:
+ return Broker.connect(
+ self, client_properties={"qpid.ha-admin":1},
+ username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
+ **kwargs)
+ else:
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
def wait_backup(self, address):
"""Wait for address to become valid on a backup broker."""
@@ -78,6 +142,14 @@ class HaBroker(Broker):
try: wait_address(bs, address)
finally: bs.connection.close()
+ def assert_browse(self, queue, expected, **kwargs):
+ """Verify queue contents by browsing."""
+ bs = self.connect().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
def assert_browse_backup(self, queue, expected, **kwargs):
"""Combines wait_backup and assert_browse_retry."""
bs = self.connect_admin().session()
@@ -86,33 +158,70 @@ class HaBroker(Broker):
assert_browse_retry(bs, queue, expected, **kwargs)
finally: bs.connection.close()
+ def assert_connect_fail(self):
+ try:
+ self.connect()
+ self.test.fail("Expected ConnectionError")
+ except ConnectionError: pass
+
+ def try_connect(self):
+ try: return self.connect()
+ except ConnectionError: return None
+
class HaCluster(object):
_cluster_count = 0
- def __init__(self, test, n, **kwargs):
+ def __init__(self, test, n, promote=True, **kwargs):
"""Start a cluster of n brokers"""
self.test = test
- self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ self.kwargs = kwargs
+ self._brokers = []
+ self.id = HaCluster._cluster_count
+ self.broker_id = 0
HaCluster._cluster_count += 1
- self.url = ",".join([b.host_port() for b in self])
- for b in self: b.set_broker_url(self.url)
+ for i in xrange(n): self.start(False)
+ self.update_urls()
self[0].promote()
+ def next_name(self):
+ name="cluster%s-%s"%(self.id, self.broker_id)
+ self.broker_id += 1
+ return name
+
+ def start(self, update_urls=True, args=[]):
+ """Start a new broker in the cluster"""
+ b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+ self._brokers.append(b)
+ if update_urls: self.update_urls()
+ return b
+
+ def update_urls(self):
+ self.url = ",".join([b.host_port() for b in self])
+ if len(self) > 1: # No failover addresses on a 1 cluster.
+ for b in self: b.set_brokers_url(self.url)
+
def connect(self, i):
"""Connect with reconnect_urls"""
return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
- def kill(self, i):
+ def kill(self, i, promote_next=True):
"""Kill broker i, promote broker i+1"""
- self[i].kill()
self[i].expect = EXPECT_EXIT_FAIL
- self[(i+1) % len(self)].promote()
+ self[i].kill()
+ if promote_next: self[(i+1) % len(self)].promote()
+
+ def restart(self, i):
+ """Start a broker with the same port, name and data directory. It will get
+ a separate log file: foo.n.log"""
+ b = self._brokers[i]
+ self._brokers[i] = HaBroker(
+ self.test, name=b.name, port=b.port(), brokers_url=self.url,
+ **self.kwargs)
- def bounce(self, i):
+ def bounce(self, i, promote_next=True):
"""Stop and restart a broker in a cluster."""
- self.kill(i)
- b = self[i]
- self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+ self.kill(i, promote_next)
+ self.restart(i)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -128,12 +237,12 @@ def wait_address(session, address):
except NotFound: return False
assert retry(check), "Timed out waiting for address %s"%(address)
-def assert_missing(session, address):
- """Assert that the address is _not_ valid"""
+def valid_address(session, address):
+ """Test if an address is valid"""
try:
session.receiver(address)
- self.fail("Expected NotFound: %s"%(address))
- except NotFound: pass
+ return True
+ except NotFound: return False
class ReplicationTests(BrokerTest):
"""Correctness tests for HA replication."""
@@ -180,7 +289,7 @@ class ReplicationTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- assert_missing(b, prefix+"q3")
+ assert not valid_address(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
@@ -195,7 +304,7 @@ class ReplicationTests(BrokerTest):
# Create config, send messages before starting the backup, to test catch-up replication.
setup(p, "1", primary)
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
# Create config, send messages after starting the backup, to test steady-state replication.
setup(p, "2", primary)
@@ -233,10 +342,10 @@ class ReplicationTests(BrokerTest):
s = p.sender("q;{create:always}")
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
for m in [str(i) for i in range(10,20)]: s.send(m)
s.sync()
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
for m in [str(i) for i in range(20,30)]: s.send(m)
s.sync()
@@ -276,7 +385,7 @@ class ReplicationTests(BrokerTest):
"""Verify that backups rejects connections and that fail-over works in python client"""
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
# Check that backup rejects normal connections
try:
backup.connect().session()
@@ -294,14 +403,15 @@ class ReplicationTests(BrokerTest):
primary.kill()
assert retry(lambda: not is_running(primary.pid))
backup.promote()
- self.assert_browse_retry(s, "q", ["foo"])
+ sender.send("bar")
+ self.assert_browse_retry(s, "q", ["foo", "bar"])
c.close()
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
primary.connect().session().sender("q;{create:always}")
backup.wait_backup("q")
@@ -344,6 +454,7 @@ class ReplicationTests(BrokerTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary", ha_cluster=False)
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
@@ -393,7 +504,7 @@ class ReplicationTests(BrokerTest):
"""Verify that we replicate to an LVQ correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
@@ -410,7 +521,7 @@ class ReplicationTests(BrokerTest):
"""Test replication with the ring queue policy"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
for i in range(10): s.send(Message(str(i)))
backup.assert_browse_backup("q", [str(i) for i in range(5,10)])
@@ -419,18 +530,20 @@ class ReplicationTests(BrokerTest):
"""Test replication with the reject queue policy"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
try:
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
+ # Detach, don't close as there is a broken session
+ s.session.connection.detach()
def test_priority(self):
"""Verify priority queues replicate correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
session = primary.connect().session()
s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
@@ -445,7 +558,7 @@ class ReplicationTests(BrokerTest):
"""Verify priority queues replicate correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
session = primary.connect().session()
levels = 8
priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
@@ -464,7 +577,7 @@ class ReplicationTests(BrokerTest):
def test_priority_ring(self):
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
@@ -475,8 +588,10 @@ class ReplicationTests(BrokerTest):
# correct result, the uncommented one is for the actualy buggy
# result. See https://issues.apache.org/jira/browse/QPID-3866
#
- # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
- backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
+ # expect = sorted(priorities,reverse=True)[0:5]
+ expect = [9,9,9,9,2]
+ primary.assert_browse("q", expect, transform=lambda m: m.priority)
+ backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
def test_backup_acquired(self):
"""Verify that acquired messages are backed up, for all queue types."""
@@ -509,11 +624,11 @@ class ReplicationTests(BrokerTest):
primary = HaBroker(self, name="primary")
primary.promote()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
c = primary.connect()
for t in tests: t.send(c) # Send messages, leave one unacknowledged.
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
# Wait for backups to catch up.
for t in tests:
t.wait(self, backup1)
@@ -538,11 +653,13 @@ class ReplicationTests(BrokerTest):
self.fail("Excpected no-such-queue exception")
except NotFound: pass
- def test_invalid_default(self):
- """Verify that a queue with an invalid qpid.replicate gets default treatment"""
- cluster = HaCluster(self, 2, ha_replicate="all")
- c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
- cluster[1].wait_backup("q")
+ def test_invalid_replication(self):
+ """Verify that we reject an attempt to declare a queue with invalid replication value."""
+ cluster = HaCluster(self, 1, ha_replicate="all")
+ try:
+ c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+ self.fail("Expected ConnectionError")
+ except ConnectionError: pass
def test_exclusive_queue(self):
"""Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -559,6 +676,136 @@ class ReplicationTests(BrokerTest):
test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
+ def test_auto_delete_exclusive(self):
+ """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
+ cluster = HaCluster(self,2)
+ s = cluster[0].connect().session()
+ s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+ s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+ s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+ s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ s.receiver("q;{create:always}")
+
+ s = cluster[1].connect_admin().session()
+ cluster[1].wait_backup("q")
+ assert not valid_address(s, "exad")
+ assert valid_address(s, "ex")
+ assert valid_address(s, "ad")
+ assert valid_address(s, "time")
+
+ def test_broker_info(self):
+ """Check that broker information is correctly published via management"""
+ cluster = HaCluster(self, 3)
+
+ for broker in cluster: # Make sure HA system-id matches broker's
+ qmf = broker.agent().getHaBroker()
+ self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+
+ cluster_ports = map(lambda b: b.port(), cluster)
+ cluster_ports.sort()
+ def ports(qmf):
+ qmf.update()
+ return sorted(map(lambda b: b["port"], qmf.members))
+ # Check that all brokers have the same membership as the cluster
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
+ # Add a new broker, check it is updated everywhere
+ b = cluster.start()
+ cluster_ports.append(b.port())
+ cluster_ports.sort()
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+
+ def test_auth(self):
+ """Verify that authentication does not interfere with replication."""
+ # FIXME aconway 2012-07-09: generate test sasl config portably for cmake
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ if not os.path.exists(sasl_config):
+ print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
+ return
+ acl=os.path.join(os.getcwd(), "policy.acl")
+ aclf=file(acl,"w")
+ # Verify that replication works with auth=yes and HA user has at least the following
+ # privileges:
+ aclf.write("""
+acl allow zag@QPID access queue
+acl allow zag@QPID create queue
+acl allow zag@QPID consume queue
+acl allow zag@QPID delete queue
+acl allow zag@QPID access exchange
+acl allow zag@QPID create exchange
+acl allow zag@QPID bind exchange
+acl allow zag@QPID publish exchange
+acl allow zag@QPID delete exchange
+acl allow zag@QPID access method
+acl allow zag@QPID create link
+acl deny all all
+ """)
+ aclf.close()
+ cluster = HaCluster(
+ self, 2,
+ args=["--auth", "yes", "--sasl-config", sasl_config,
+ "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"),
+ "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
+ ],
+ client_credentials=Credentials("zag", "zag", "PLAIN"))
+ s0 = cluster[0].connect(username="zag", password="zag").session();
+ s0.receiver("q;{create:always}")
+ s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ cluster[1].wait_backup("q")
+ cluster[1].wait_backup("ex")
+ s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
+ s1.sender("ex").send("foo");
+ self.assertEqual(s1.receiver("q").fetch().content, "foo")
+
+ def test_alternate_exchange(self):
+ """Verify that alternate-exchange on exchanges and queues is propagated
+ to new members of a cluster. """
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session()
+ # altex exchange: acts as alternate exchange
+ s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+ # altq queue bound to altex, collect re-routed messages.
+ s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+ # 0ex exchange with alternate-exchange altex and no queues bound
+ s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # create queue q with alternate-exchange altex
+ s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
+ # create a bunch of exchanges to ensure we don't clean up prematurely if the
+ # response comes in multiple fragments.
+ for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+
+ def verify(broker):
+ s = broker.connect().session()
+ # Verify unmatched message goes to ex's alternate.
+ s.sender("0ex").send("foo")
+ altq = s.receiver("altq")
+ self.assertEqual("foo", altq.fetch(timeout=0).content)
+ s.acknowledge()
+ # Verify rejected message goes to q's alternate.
+ s.sender("q").send("bar")
+ msg = s.receiver("q").fetch(timeout=0)
+ self.assertEqual("bar", msg.content)
+ s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+ self.assertEqual("bar", altq.fetch(timeout=0).content)
+ s.acknowledge()
+
+ # Sanity check: alternate exchanges on original broker
+ verify(cluster[0])
+ # Check backup that was connected during setup.
+ cluster[1].wait_backup("0ex")
+ cluster[1].wait_backup("q")
+ cluster.bounce(0)
+ verify(cluster[1])
+ # Check a newly started backup.
+ cluster.start()
+ cluster[2].wait_backup("0ex")
+ cluster[2].wait_backup("q")
+ cluster.bounce(1)
+ verify(cluster[2])
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -601,49 +848,135 @@ class LongTests(BrokerTest):
if d: return float(d)*60
else: return 3 # Default is to be quick
-
- def disable_test_failover(self):
+ def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
- # FIXME aconway 2012-02-03: fails due to dropped messages,
- # known issue: sending messages to new primary before
- # backups are ready. Enable when fixed.
-
- # Start a cluster, all members will be killed during the test.
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
- for name in ["ha0","ha1","ha2"] ]
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
- brokers[0].promote()
+ brokers = HaCluster(self, 3)
# Start sender and receiver threads
- sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
- receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
- receiver.start()
- sender.start()
- # Wait for sender & receiver to get up and running
- assert retry(lambda: receiver.received > 100)
+ n = 10
+ senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(n)]
+ receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+ failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(n)]
+ for r in receivers: r.start()
+ for s in senders: s.start()
+
+ def wait_passed(r, n):
+ """Wait for receiver r to pass n"""
+ def check():
+ r.check() # Verify no exceptions
+ return r.received > n
+ assert retry(check), "Stalled %s at %s"%(r.queue, n)
+
+ for r in receivers: wait_passed(r, 0)
+
# Kill and restart brokers in a cycle:
endtime = time.time() + self.duration()
i = 0
- while time.time() < endtime or i < 3: # At least 3 iterations
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- port = brokers[i].port()
- brokers[i].kill()
- brokers.append(
- HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
- expect=EXPECT_EXIT_FAIL))
- i += 1
- brokers[i].promote()
- n = receiver.received # Verify we're still running
- def enough():
- receiver.check() # Verify no exceptions
- return receiver.received > n + 100
- assert retry(enough, timeout=5)
-
- sender.stop()
- receiver.stop()
- for b in brokers[i:]: b.kill()
+ try:
+ while time.time() < endtime or i < 3: # At least 3 iterations
+ for s in senders: s.sender.assert_running()
+ for r in receivers: r.receiver.assert_running()
+ checkpoint = [ r.received for r in receivers ]
+ # Don't kill primary till it is active and the next
+ # backup is ready, otherwise we can lose messages.
+ brokers[i%3].wait_status("active")
+ brokers[(i+1)%3].wait_status("ready")
+ brokers.bounce(i%3)
+ i += 1
+ map(wait_passed, receivers, checkpoint) # Wait for all receivers
+ except:
+ traceback.print_exc()
+ raise
+ finally:
+ for s in senders: s.stop()
+ for r in receivers: r.stop()
+ dead = []
+ for i in xrange(3):
+ if not brokers[i].is_running(): dead.append(i)
+ brokers.kill(i, False)
+ if dead: raise Exception("Brokers not running: %s"%dead)
+
+class RecoveryTests(BrokerTest):
+ """Tests for recovery after a failure."""
+
+ def test_queue_hold(self):
+ """Verify that the broker holds queues without sufficient backup,
+ i.e. does not complete messages sent to those queues."""
+
+ # We don't want backups to time out for this test, set long timeout.
+ cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
+ # Wait for the primary to be ready
+ cluster[0].wait_status("active")
+ # Create a queue before the failure.
+ s1 = cluster.connect(0).session().sender("q1;{create:always}")
+ for b in cluster: b.wait_backup("q1")
+ for i in xrange(100): s1.send(str(i))
+ # Kill primary and 2 backups
+ for i in [0,1,2]: cluster.kill(i, False)
+ cluster[3].promote() # New primary, backups will be 1 and 2
+ cluster[3].wait_status("recovering")
+
+ def assertSyncTimeout(s):
+ try:
+ s.sync(timeout=.01)
+ self.fail("Expected Timeout exception")
+ except Timeout: pass
+
+ # Create a queue after the failure
+ s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+ # Verify that messages sent are not completed
+ for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ assertSyncTimeout(s1)
+ self.assertEqual(s1.unsettled(), 100)
+ assertSyncTimeout(s2)
+ self.assertEqual(s2.unsettled(), 100)
+
+ # Verify we can receive even if sending is on hold:
+ cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+
+ # Restart backups, verify queues are released only when both backups are up
+ cluster.restart(1)
+ assertSyncTimeout(s1)
+ self.assertEqual(s1.unsettled(), 100)
+ assertSyncTimeout(s2)
+ self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(cluster[3].ha_status(), "recovering")
+ cluster.restart(2)
+
+ # Verify everything is up to date and active
+ def settled(sender): sender.sync(); return sender.unsettled() == 0;
+ assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+ assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[3].wait_status("active"),
+ s1.session.connection.close()
+ s2.session.connection.close()
+
+ def test_expected_backup_timeout(self):
+ """Verify that we time-out expected backups and release held queues
+ after a configured interval. Verify backup is demoted to catch-up,
+ but can still rejoin.
+ """
+ cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
+ cluster[0].wait_status("active") # Primary ready
+ for b in cluster[1:4]: b.wait_status("ready") # Backups ready
+ for i in [0,1]: cluster.kill(i, False)
+ cluster[2].promote() # New primary, backups will be 1 and 2
+ cluster[2].wait_status("recovering")
+ # Should not go active till the expected backup connects or times out.
+ self.assertEqual(cluster[2].ha_status(), "recovering")
+ # Messages should be held expected backup times out
+ s = cluster[2].connect().session().sender("q;{create:always}")
+ for i in xrange(100): s.send(str(i), sync=False)
+ # Verify message held initially.
+ try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
+ except Timeout: pass
+ s.sync(timeout=1) # And released after the timeout.
+ self.assertEqual(cluster[2].ha_status(), "active")
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)