summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /cpp/src/tests/ha_tests.py
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-xcpp/src/tests/ha_tests.py523
1 files changed, 412 insertions, 111 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 97de0d1f77..827cb7dca9 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -18,71 +18,125 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
-from qpid.messaging import Message, NotFound, ConnectionError, Connection
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection
+from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG
+from qpidtoollibs import BrokerAgent
-
-log = getLogger("qpid.ha-tests")
+log = getLogger(__name__)
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, **kwargs):
+ def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
+ ha_replicate="all", **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- args=["--load-module", BrokerTest.ha_lib,
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-enable=yes"]
- if broker_url: args += [ "--ha-broker-url", broker_url ]
+ args = copy(args)
+ args += ["--load-module", BrokerTest.ha_lib,
+ "--log-enable=info+", "--log-enable=debug+:ha::",
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster]
+ if ha_replicate is not None:
+ args += [ "--ha-replicate=%s"%ha_replicate ]
+ if broker_url: args.extend([ "--ha-brokers", broker_url ])
Broker.__init__(self, test, args, **kwargs)
+ self.commands=os.getenv("PYTHON_COMMANDS")
+ assert os.path.isdir(self.commands)
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
def promote(self):
- assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0
+ assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
def set_client_url(self, url):
assert os.system(
- "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0
+ "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
def set_broker_url(self, url):
assert os.system(
- "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0
-
-
-class ShortTests(BrokerTest):
- """Short HA functionality tests."""
-
- # Wait for an address to become valid.
- def wait(self, session, address):
- def check():
- try:
- session.sender(address)
- return True
- except NotFound: return False
- assert retry(check), "Timed out waiting for %s"%(address)
-
- # Wait for address to become valid on a backup broker.
- def wait_backup(self, backup, address):
- bs = self.connect_admin(backup).session()
- self.wait(bs, address)
- bs.connection.close()
-
- # Combines wait_backup and assert_browse_retry
- def assert_browse_backup(self, backup, queue, expected, **kwargs):
- bs = self.connect_admin(backup).session()
- self.wait(bs, queue)
- self.assert_browse_retry(bs, queue, expected, **kwargs)
- bs.connection.close()
-
- def assert_missing(self, session, address):
+ "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
+
+ def replicate(self, from_broker, queue):
+ assert os.system(
+ "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+ def config_replicate(self, from_broker, queue):
+ assert os.system(
+ "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+ def config_declare(self, queue, replication):
+ assert os.system(
+ "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+
+ def connect_admin(self, **kwargs):
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+
+ def wait_backup(self, address):
+ """Wait for address to become valid on a backup broker."""
+ bs = self.connect_admin().session()
+ try: wait_address(bs, address)
+ finally: bs.connection.close()
+
+ def assert_browse_backup(self, queue, expected, **kwargs):
+ """Combines wait_backup and assert_browse_retry."""
+ bs = self.connect_admin().session()
try:
- session.receiver(address)
- self.fail("Should not have been replicated: %s"%(address))
- except NotFound: pass
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
+class HaCluster(object):
+ _cluster_count = 0
+
+ def __init__(self, test, n, **kwargs):
+ """Start a cluster of n brokers"""
+ self.test = test
+ self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ HaCluster._cluster_count += 1
+ self.url = ",".join([b.host_port() for b in self])
+ for b in self: b.set_broker_url(self.url)
+ self[0].promote()
+
+ def connect(self, i):
+ """Connect with reconnect_urls"""
+ return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
+
+ def kill(self, i):
+ """Kill broker i, promote broker i+1"""
+ self[i].kill()
+ self[i].expect = EXPECT_EXIT_FAIL
+ self[(i+1) % len(self)].promote()
+
+ def bounce(self, i):
+ """Stop and restart a broker in a cluster."""
+ self.kill(i)
+ b = self[i]
+ self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+
+ # Behave like a list of brokers.
+ def __len__(self): return len(self._brokers)
+ def __getitem__(self,index): return self._brokers[index]
+ def __iter__(self): return self._brokers.__iter__()
+
+def wait_address(session, address):
+ """Wait for an address to become valid."""
+ def check():
+ try:
+ session.sender(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for address %s"%(address)
+
+def assert_missing(session, address):
+ """Assert that the address is _not_ valid"""
+ try:
+ session.receiver(address)
+ self.fail("Expected NotFound: %s"%(address))
+ except NotFound: pass
- def connect_admin(self, backup, **kwargs):
- """Connect to a backup broker as an admin connection"""
- return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
+class ReplicationTests(BrokerTest):
+ """Correctness tests for HA replication."""
def test_replication(self):
"""Test basic replication of configuration and messages before and
@@ -95,21 +149,21 @@ class ShortTests(BrokerTest):
return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
def setup(p, prefix, primary):
"""Create config, send messages on the primary p"""
- s = p.sender(queue(prefix+"q1", "messages"))
+ s = p.sender(queue(prefix+"q1", "all"))
for m in ["a", "b", "1"]: s.send(Message(m))
# Test replication of dequeue
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
p.acknowledge()
p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
p.sender(queue(prefix+"q3", "none")).send(Message("3"))
- p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4"))
- p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5"))
+ p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
# Test unbind
- p.sender(queue(prefix+"q4", "messages")).send(Message("6"))
- s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4"))
+ p.sender(queue(prefix+"q4", "all")).send(Message("6"))
+ s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
s3.send(Message("7"))
# Use old connection to unbind
- us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
+ us = primary.connect_old().session(str(uuid4()))
us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4")
p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
# Need a marker so we can wait till sync is done.
@@ -117,9 +171,8 @@ class ShortTests(BrokerTest):
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
-
# Wait for configuration to replicate.
- self.wait(b, prefix+"x");
+ wait_address(b, prefix+"x");
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
@@ -127,7 +180,7 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- self.assert_missing(b, prefix+"q3")
+ assert_missing(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
@@ -147,12 +200,12 @@ class ShortTests(BrokerTest):
setup(p, "2", primary)
# Verify the data on the backup
- b = self.connect_admin(backup).session()
+ b = backup.connect_admin().session()
verify(b, "1", p)
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
- s = p.sender(queue("foo","messages"))
- self.wait(b, "foo")
+ s = p.sender(queue("foo","all"))
+ wait_address(b, "foo")
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(Message(m))
self.assert_browse_retry(p, "foo", msgs)
@@ -173,16 +226,11 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(p, "foo", msgs[i+1:])
self.assert_browse_retry(b, "foo", msgs[i+1:])
- def qpid_replicate(self, value="messages"):
- return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
def test_sync(self):
- def queue(name, replicate):
- return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
primary = HaBroker(self, name="primary")
primary.promote()
p = primary.connect().session()
- s = p.sender(queue("q","messages"))
+ s = p.sender("q;{create:always}")
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
@@ -193,49 +241,39 @@ class ShortTests(BrokerTest):
s.sync()
msgs = [str(i) for i in range(30)]
- b1 = self.connect_admin(backup1).session()
- self.wait(b1, "q");
+ b1 = backup1.connect_admin().session()
+ wait_address(b1, "q");
self.assert_browse_retry(b1, "q", msgs)
- b2 = self.connect_admin(backup2).session()
- self.wait(b2, "q");
+ b2 = backup2.connect_admin().session()
+ wait_address(b2, "q");
self.assert_browse_retry(b2, "q", msgs)
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
- primary = HaBroker(self, name="primary")
- primary.promote()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ brokers = HaCluster(self, 3)
sender = self.popen(
["qpid-send",
- "--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
- "--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
"--messages=990",
"--timeout=10"
])
- try:
- self.assertEqual(sender.wait(), 0)
- self.assertEqual(receiver.wait(), 0)
- expect = [long(i) for i in range(991, 1001)]
- sn = lambda m: m.properties["sn"]
- self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn)
- self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn)
- except:
- print self.browse(primary.connect().session(), "q", transform=sn)
- print self.browse(self.connect_admin(backup1).session(), "q", transform=sn)
- print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
- raise
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ brokers[1].assert_browse_backup("q", expect, transform=sn)
+ brokers[2].assert_browse_backup("q", expect, transform=sn)
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
- getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
@@ -245,13 +283,13 @@ class ShortTests(BrokerTest):
self.fail("Expected connection to backup to fail")
except ConnectionError: pass
# Check that admin connections are allowed to backup.
- self.connect_admin(backup).close()
+ backup.connect_admin().close()
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
- self.wait_backup(backup, "q")
+ sender = s.sender("q;{create:always}")
+ backup.wait_backup("q")
sender.send("foo")
primary.kill()
assert retry(lambda: not is_running(primary.pid))
@@ -265,14 +303,14 @@ class ShortTests(BrokerTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
- self.wait_backup(backup, "q")
+ primary.connect().session().sender("q;{create:always}")
+ backup.wait_backup("q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
receiver.start()
sender.start()
- self.wait_backup(backup, "q")
+ backup.wait_backup("q")
assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
primary.kill()
@@ -284,19 +322,276 @@ class ShortTests(BrokerTest):
receiver.stop()
def test_backup_failover(self):
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
- for name in ["a","b","c"] ]
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
- brokers[0].promote()
- brokers[0].connect().session().sender(
- "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
- for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
- brokers[0].kill()
- brokers[2].promote() # c must fail over to b.
- brokers[2].connect().session().sender("q").send("b")
- self.assert_browse_backup(brokers[1], "q", ["a","b"])
- for b in brokers[1:]: b.kill()
+ """Verify that a backup broker fails over and recovers queue state"""
+ brokers = HaCluster(self, 3)
+ brokers[0].connect().session().sender("q;{create:always}").send("a")
+ for b in brokers[1:]: b.assert_browse_backup("q", ["a"], msg=b)
+ brokers[0].expect = EXPECT_EXIT_FAIL
+ brokers.kill(0)
+ brokers[1].connect().session().sender("q").send("b")
+ brokers[2].assert_browse_backup("q", ["a","b"])
+ s = brokers[1].connect().session()
+ self.assertEqual("a", s.receiver("q").fetch().content)
+ s.acknowledge()
+ brokers[2].assert_browse_backup("q", ["b"])
+
+ def test_qpid_config_replication(self):
+ """Set up replication via qpid-config"""
+ brokers = HaCluster(self,2)
+ brokers[0].config_declare("q","all")
+ brokers[0].connect().session().sender("q").send("foo")
+ brokers[1].assert_browse_backup("q", ["foo"])
+
+ def test_standalone_queue_replica(self):
+ """Test replication of individual queues outside of cluster mode"""
+ primary = HaBroker(self, name="primary", ha_cluster=False)
+ pc = primary.connect()
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False)
+ br = backup.connect().session().receiver("q;{create:always}")
+
+ # Set up replication with qpid-ha
+ backup.replicate(primary.host_port(), "q")
+ ps.send("a")
+ backup.assert_browse_backup("q", ["a"])
+ ps.send("b")
+ backup.assert_browse_backup("q", ["a", "b"])
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ backup.assert_browse_backup("q", ["b"])
+
+ # Set up replication with qpid-config
+ ps2 = pc.session().sender("q2;{create:always}")
+ backup.config_replicate(primary.host_port(), "q2");
+ ps2.send("x")
+ backup.assert_browse_backup("q2", ["x"])
+
+
+ def test_queue_replica_failover(self):
+ """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
+ cluster = HaCluster(self, 2)
+ primary = cluster[0]
+ pc = cluster.connect(0)
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False)
+ br = backup.connect().session().receiver("q;{create:always}")
+ backup.replicate(cluster.url, "q")
+ ps.send("a")
+ backup.assert_browse_backup("q", ["a"])
+ cluster.bounce(0)
+ backup.assert_browse_backup("q", ["a"])
+ ps.send("b")
+ backup.assert_browse_backup("q", ["a", "b"])
+ cluster.bounce(1)
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ backup.assert_browse_backup("q", ["b"])
+
+ def test_lvq(self):
+ """Verify that we replicate to an LVQ correctly"""
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
+ def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
+ for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
+ send(*kv)
+ backup.assert_browse_backup("lvq", ["b-1", "a-3", "c-2"])
+ send("b","b-2")
+ backup.assert_browse_backup("lvq", ["a-3", "c-2", "b-2"])
+ send("c","c-3")
+ backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3"])
+ send("d","d-1")
+ backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"])
+
+ def test_ring(self):
+ """Test replication with the ring queue policy"""
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
+ for i in range(10): s.send(Message(str(i)))
+ backup.assert_browse_backup("q", [str(i) for i in range(5,10)])
+
+ def test_reject(self):
+ """Test replication with the reject queue policy"""
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
+ try:
+ for i in range(10): s.send(Message(str(i)), sync=False)
+ except qpid.messaging.exceptions.TargetCapacityExceeded: pass
+ backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
+
+ def test_priority(self):
+ """Verify priority queues replicate correctly"""
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ session = primary.connect().session()
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
+ priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
+ for p in priorities: s.send(Message(priority=p))
+ # Can't use browse_backup as browser sees messages in delivery order not priority.
+ backup.wait_backup("priority-queue")
+ r = backup.connect_admin().session().receiver("priority-queue")
+ received = [r.fetch().priority for i in priorities]
+ self.assertEqual(sorted(priorities, reverse=True), received)
+
+ def test_priority_fairshare(self):
+ """Verify priority queues replicate correctly"""
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ session = primary.connect().session()
+ levels = 8
+ priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
+ limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
+ limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
+ messages = [Message(content=str(uuid4()), priority = p) for p in priorities]
+ for m in messages: s.send(m)
+ backup.wait_backup(s.target)
+ r = backup.connect_admin().session().receiver("priority-queue")
+ received = [r.fetch().content for i in priorities]
+ sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True)
+ fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)]
+ self.assertEqual(received, fair)
+
+ def test_priority_ring(self):
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
+ priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
+ for p in priorities: s.send(Message(priority=p))
+
+ # FIXME aconway 2012-02-22: there is a bug in priority ring
+ # queues that allows a low priority message to displace a high
+ # one. The following commented-out assert_browse is for the
+ # correct result, the uncommented one is for the actualy buggy
+ # result. See https://issues.apache.org/jira/browse/QPID-3866
+ #
+ # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
+ backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
+
+ def test_backup_acquired(self):
+ """Verify that acquired messages are backed up, for all queue types."""
+ class Test:
+ def __init__(self, queue, arguments, expect):
+ self.queue = queue
+ self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
+ self.queue, ",".join(arguments + ["'qpid.replicate':all"]))
+ self.expect = [str(i) for i in expect]
+
+ def send(self, connection):
+ """Send messages, then acquire one but don't acknowledge"""
+ s = connection.session()
+ for m in range(10): s.sender(self.address).send(str(m))
+ s.receiver(self.address).fetch()
+
+ def wait(self, brokertest, backup):
+ backup.wait_backup(self.queue)
+
+ def verify(self, brokertest, backup):
+ backup.assert_browse_backup(self.queue, self.expect, msg=self.queue)
+
+ tests = [
+ Test("plain",[],range(10)),
+ Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
+ Test("priority",["'qpid.priorities':10"], range(10)),
+ Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
+ Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
+ ]
+
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ c = primary.connect()
+ for t in tests: t.send(c) # Send messages, leave one unacknowledged.
+
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ # Wait for backups to catch up.
+ for t in tests:
+ t.wait(self, backup1)
+ t.wait(self, backup2)
+ # Verify acquired message was replicated
+ for t in tests: t.verify(self, backup1)
+ for t in tests: t.verify(self, backup2)
+
+ def test_replicate_default(self):
+ """Make sure we don't replicate if ha-replicate is unspecified or none"""
+ cluster1 = HaCluster(self, 2, ha_replicate=None)
+ c1 = cluster1[0].connect().session().sender("q;{create:always}")
+ cluster2 = HaCluster(self, 2, ha_replicate="none")
+ cluster2[0].connect().session().sender("q;{create:always}")
+ time.sleep(.1) # Give replication a chance.
+ try:
+ cluster1[1].connect_admin().session().receiver("q")
+ self.fail("Excpected no-such-queue exception")
+ except NotFound: pass
+ try:
+ cluster2[1].connect_admin().session().receiver("q")
+ self.fail("Excpected no-such-queue exception")
+ except NotFound: pass
+
+ def test_invalid_default(self):
+ """Verify that a queue with an invalid qpid.replicate gets default treatment"""
+ cluster = HaCluster(self, 2, ha_replicate="all")
+ c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+ cluster[1].wait_backup("q")
+
+ def test_exclusive_queue(self):
+ """Ensure that we can back-up exclusive queues, i.e. the replicating
+ subscriptions are exempt from the exclusivity"""
+ cluster = HaCluster(self, 2)
+ def test(addr):
+ c = cluster[0].connect()
+ q = addr.split(";")[0]
+ r = c.session().receiver(addr)
+ try: c.session().receiver(addr); self.fail("Expected exclusive exception")
+ except ReceiverError: pass
+ s = c.session().sender(q).send(q)
+ cluster[1].assert_browse_backup(q, [q])
+ test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
+ test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
+
+def fairshare(msgs, limit, levels):
+ """
+ Generator to return prioritised messages in expected order for a given fairshare limit
+ """
+ count = 0
+ last_priority = None
+ postponed = []
+ while msgs or postponed:
+ if not msgs:
+ msgs = postponed
+ count = 0
+ last_priority = None
+ postponed = []
+ msg = msgs.pop(0)
+ if last_priority and priority_level(msg.priority, levels) == last_priority:
+ count += 1
+ else:
+ last_priority = priority_level(msg.priority, levels)
+ count = 1
+ l = limit(last_priority)
+ if (l and count > l):
+ postponed.append(msg)
+ else:
+ yield msg
+ return
+
+def priority_level(value, levels):
+ """
+ Method to determine which of a distinct number of priority levels
+ a given value falls into.
+ """
+ offset = 5-math.ceil(levels/2.0)
+ return min(max(value - offset, 0), levels-1)
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
@@ -311,7 +606,7 @@ class LongTests(BrokerTest):
"""Test failover with continuous send-receive"""
# FIXME aconway 2012-02-03: fails due to dropped messages,
# known issue: sending messages to new primary before
- # backups are ready.
+ # backups are ready. Enable when fixed.
# Start a cluster, all members will be killed during the test.
brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
@@ -352,4 +647,10 @@ class LongTests(BrokerTest):
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
- os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
+ qpid_ha = os.getenv("QPID_HA_EXEC")
+ if qpid_ha and os.path.exists(qpid_ha):
+ os.execvp("qpid-python-test",
+ ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
+ else:
+ print "Skipping ha_tests, qpid_ha not available"
+