summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-05-15 15:03:44 +0000
committerAlan Conway <aconway@apache.org>2013-05-15 15:03:44 +0000
commitdf861e2e13c9fbc92a6ac3751e80037f632651b6 (patch)
treea8417e29086c1cdb96318c8144038ecf295a5e1e /qpid/cpp/src/tests
parentfd41668d683923894787ddc639a2829de2b7b414 (diff)
downloadqpid-python-df861e2e13c9fbc92a6ac3751e80037f632651b6.tar.gz
QPID-4745: HA safe port allocation for brokers in HA tests.
Many HA tests use --port=0 to start a broker on an available port, but then need to shutdown and restart the broker on the same port. This is not safe, on a busy system it is possible for another process to take the port between the time the broker is shut down and the time it is restarted. The solution is to do bind(0) and listen in the python test framework (class HaPort) and let the broker use the socket using qpidd --socket-fd. When the broker is shut down the port remains bound by the python process. When the broker is re-started it again is given access to the socket via --socket-fd. Other changes - move ha_store_tests into ha_tests. - add heartbeats to avoid stalling. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1482881 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py11
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py104
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py94
3 files changed, 123 insertions, 86 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 7fc3e7f0eb..290a1dde2c 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -134,14 +134,8 @@ class Popen(subprocess.Popen):
self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
if stdout == FILE: stdout = open(self.outfile("out"), "w")
if stderr == FILE: stderr = open(self.outfile("err"), "w")
- try:
- subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
- stdin=stdin, stdout=stdout, stderr=stderr,
- close_fds=True)
- except ValueError: # Windows can't do close_fds
- subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
- stdin=stdin, stdout=stdout, stderr=stderr)
-
+ subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+ stdin=stdin, stdout=stdout, stderr=stderr)
f = open(self.outfile("cmd"), "w")
try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
finally: f.close()
@@ -180,6 +174,7 @@ class Popen(subprocess.Popen):
finally:
self.wait() # Clean up the process.
+
def communicate(self, input=None):
ret = subprocess.Popen.communicate(self, input)
self.cleanup()
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 4e27675438..75393fbdf1 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -63,18 +63,57 @@ class Credentials(object):
def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+class HaPort:
+ """Many HA tests need to allocate a broker port dynamically and then kill
+ and restart a broker on that same port multiple times. qpidd --port=0 only
+ ensures the port for the initial broker process, subsequent brokers re-using
+ the same port may fail with "address already in use".
+
+ HaPort binds and listens to the port and returns a file descriptor to pass
+ to qpidd --socket-fd. It holds on to the port untill the end of the test so
+ the broker can restart multiple times.
+ """
+
+ def __init__(self, test, port=0):
+ """Bind and listen to port. port=0 allocates a port dynamically.
+ self.port is the allocated port, self.fileno is the file descriptor for
+ qpid --socket-fd."""
+
+ self.test = test
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.bind(("", port))
+ self.socket.listen(5)
+ self.port = self.socket.getsockname()[1]
+ self.fileno = self.socket.fileno()
+ self.stopped = False
+ test.cleanup_stop(self) # Stop during test.tearDown
+
+ def stop(self): # Called in tearDown
+ if not self.stopped:
+ self.stopped = True
+ self.socket.shutdown(socket.SHUT_RDWR)
+ self.socket.close()
+
+ def __str__(self): return "HaPort<port:%s, fileno:%s>"%(self.port, self.fileno)
+
+
class HaBroker(Broker):
"""Start a broker with HA enabled
@param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
"""
- def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
- client_credentials=None, **kwargs):
+ def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True,
+ ha_replicate="all", client_credentials=None, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
"--log-enable=debug+:ha::",
- # FIXME aconway 2012-02-13: workaround slow link failover.
+ # Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
+ # Heartbeat and negotiate time are needed so that a broker wont
+ # stall on an address that doesn't currently have a broker running.
+ "--link-heartbeat-interval=1",
+ "--max-negotiate-time=1000",
"--ha-cluster=%s"%ha_cluster]
if ha_replicate is not None:
args += [ "--ha-replicate=%s"%ha_replicate ]
@@ -89,7 +128,8 @@ acl allow all all
aclf.close()
if not "--acl-file" in args:
args += [ "--acl-file", acl, "--load-module", os.getenv("ACL_LIB") ]
- Broker.__init__(self, test, args, **kwargs)
+ args += ["--socket-fd=%s"%ha_port.fileno, "--listen-disable=tcp"]
+ Broker.__init__(self, test, args, port=ha_port.port, **kwargs)
self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
assert os.path.exists(self.qpid_ha_path)
self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
@@ -97,6 +137,7 @@ acl allow all all
self.qpid_ha_script=import_script(self.qpid_ha_path)
self._agent = None
self.client_credentials = client_credentials
+ self.ha_port = ha_port
def __str__(self): return Broker.__str__(self)
@@ -108,8 +149,7 @@ acl allow all all
args = args + ["--sasl-mechanism", cred.mechanism]
self.qpid_ha_script.main_except(["", "-b", url]+args)
- def promote(self):
- self.ready(); self.qpid_ha(["promote"])
+ def promote(self): self.ready(); self.qpid_ha(["promote"])
def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
@@ -211,14 +251,16 @@ acl allow all all
def ready(self):
return Broker.ready(self, client_properties={"qpid.ha-admin":1})
- def kill(self):
+ def kill(self, final=True):
+ if final: self.ha_port.stop()
self._agent = None
return Broker.kill(self)
+
class HaCluster(object):
_cluster_count = 0
- def __init__(self, test, n, promote=True, wait=True, **kwargs):
+ def __init__(self, test, n, promote=True, wait=True, args=[], **kwargs):
"""Start a cluster of n brokers.
@test: The test being run
@@ -227,36 +269,49 @@ class HaCluster(object):
@wait: wait for primary active and backups ready. Ignored if promote=False
"""
self.test = test
+ self.args = args
self.kwargs = kwargs
+ self._ports = [HaPort(test) for i in xrange(n)]
+ self._set_url()
self._brokers = []
self.id = HaCluster._cluster_count
self.broker_id = 0
HaCluster._cluster_count += 1
- for i in xrange(n): self.start(False)
- self.update_urls()
+ for i in xrange(n): self.start()
if promote:
self[0].promote()
if wait:
self[0].wait_status("active")
for b in self[1:]: b.wait_status("ready")
-
def next_name(self):
name="cluster%s-%s"%(self.id, self.broker_id)
self.broker_id += 1
return name
- def start(self, update_urls=True, args=[]):
- """Start a new broker in the cluster"""
- b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+ def _ha_broker(self, ha_port, name):
+ b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
+ args=self.args, **self.kwargs)
b.ready()
+ return b
+
+ def start(self):
+ """Start a new broker in the cluster"""
+ i = len(self)
+ assert i <= len(self._ports)
+ if i == len(self._ports):
+ self._ports.append(HaPort(self.test))
+ self._set_url()
+ self._update_urls()
+ b = self._ha_broker(self._ports[i], self.next_name())
self._brokers.append(b)
- if update_urls: self.update_urls()
return b
- def update_urls(self):
- self.url = ",".join([b.host_port() for b in self])
- if len(self) > 1: # No failover addresses on a 1 cluster.
+ def _set_url(self):
+ self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports)
+
+ def _update_urls(self):
+ if len(self) > 1: # No failover addresses on a 1 cluster.
for b in self:
b.set_brokers_url(self.url)
b.set_public_url(self.url)
@@ -265,29 +320,28 @@ class HaCluster(object):
"""Connect with reconnect_urls"""
return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
- def kill(self, i, promote_next=True):
+ def kill(self, i, promote_next=True, final=True):
"""Kill broker i, promote broker i+1"""
- self[i].kill()
+ self[i].kill(final=final)
if promote_next: self[(i+1) % len(self)].promote()
def restart(self, i):
"""Start a broker with the same port, name and data directory. It will get
a separate log file: foo.n.log"""
+ if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self))
b = self._brokers[i]
- self._brokers[i] = HaBroker(
- self.test, name=b.name, port=b.port(), brokers_url=self.url,
- **self.kwargs)
+ self._brokers[i] = self._ha_broker(self._ports[i], b.name)
self._brokers[i].ready()
def bounce(self, i, promote_next=True):
"""Stop and restart a broker in a cluster."""
if (len(self) == 1):
- self.kill(i, promote_next=False)
+ self.kill(i, promote_next=False, final=False)
self.restart(i)
self[i].ready()
if promote_next: self[i].promote()
else:
- self.kill(i, promote_next)
+ self.kill(i, promote_next, final=False)
self.restart(i)
# Behave like a list of brokers.
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index db4c7f62e7..c6cc736386 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -230,25 +230,17 @@ class ReplicationTests(HaBrokerTest):
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
- primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
- primary.promote()
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
- url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always}")
- backup.wait_backup("q")
-
- sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
- receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
+ cluster = HaCluster(self, 2)
+ cluster[0].connect().session().sender("q;{create:always}")
+ cluster[1].wait_backup("q")
+ sender = NumberedSender(cluster[0], url=cluster.url, queue="q", failover_updates = False)
+ receiver = NumberedReceiver(cluster[0], url=cluster.url, queue="q", failover_updates = False)
receiver.start()
sender.start()
- backup.wait_backup("q")
assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
-
- primary.kill()
- assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
- backup.promote()
- n = receiver.received # Make sure we are still running
- assert retry(lambda: receiver.received > n + 10)
+ cluster.kill(0)
+ n = receiver.received
+ assert retry(lambda: receiver.received > n + 10) # Verify we are still going
sender.stop()
receiver.stop()
@@ -372,16 +364,14 @@ class ReplicationTests(HaBrokerTest):
def test_priority(self):
"""Verify priority queues replicate correctly"""
- primary = HaBroker(self, name="primary")
- primary.promote()
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
- session = primary.connect().session()
+ cluster = HaCluster(self, 2)
+ session = cluster[0].connect().session()
s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
- backup.wait_backup("priority-queue")
- r = backup.connect_admin().session().receiver("priority-queue")
+ cluster[1].wait_backup("priority-queue")
+ r = cluster[1].connect_admin().session().receiver("priority-queue")
received = [r.fetch().priority for i in priorities]
self.assertEqual(sorted(priorities, reverse=True), received)
@@ -480,9 +470,8 @@ class ReplicationTests(HaBrokerTest):
def test_replicate_binding(self):
"""Verify that binding replication can be disabled"""
- primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
- primary.promote()
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ cluster = HaCluster(self, 2)
+ primary, backup = cluster[0], cluster[1]
ps = primary.connect().session()
ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
@@ -867,7 +856,7 @@ acl deny all all
self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
# Simulate the race by re-creating the objects before promoting the new primary
- cluster.kill(0, False)
+ cluster.kill(0, promote_next=False)
xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
node = "node:{%s}"%(xdecl)
sn = cluster[1].connect_admin().session()
@@ -946,9 +935,10 @@ class LongTests(HaBrokerTest):
# Start sender and receiver threads
n = 10
- senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+ senders = [NumberedSender(brokers[0], url=brokers.url,
+ max_depth=1024, failover_updates=False,
queue="test%s"%(i)) for i in xrange(n)]
- receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+ receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
failover_updates=False,
queue="test%s"%(i)) for i in xrange(n)]
for r in receivers: r.start()
@@ -987,10 +977,9 @@ class LongTests(HaBrokerTest):
brokers.bounce(victim) # Next one is promoted
primary = next
else:
- brokers.kill(victim, False)
+ brokers.kill(victim, promote_next=False, final=False)
dead = victim
- # At this point the primary is running with 1 or 2 backups
# Make sure we are not stalled
map(wait_passed, receivers, checkpoint)
# Run another checkpoint to ensure things work in this configuration
@@ -1090,11 +1079,11 @@ class RecoveryTests(HaBrokerTest):
# Create a queue before the failure.
s1 = cluster.connect(0).session().sender("q1;{create:always}")
for b in cluster: b.wait_backup("q1")
- for i in xrange(100): s1.send(str(i))
+ for i in xrange(10): s1.send(str(i))
# Kill primary and 2 backups
cluster[3].wait_status("ready")
- for i in [0,1,2]: cluster.kill(i, False)
+ for i in [0,1,2]: cluster.kill(i, promote_next=False, final=False)
cluster[3].promote() # New primary, backups will be 1 and 2
cluster[3].wait_status("recovering")
@@ -1108,31 +1097,33 @@ class RecoveryTests(HaBrokerTest):
s2 = cluster.connect(3).session().sender("q2;{create:always}")
# Verify that messages sent are not completed
- for i in xrange(100,200):
- s1.send(str(i), sync=False);
- s2.send(str(i), sync=False)
+ for i in xrange(10,20):
+ s1.send(str(i), sync=False, timeout=0.1);
+ s2.send(str(i), sync=False, timeout=0.1)
+
assertSyncTimeout(s1)
- self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s1.unsettled(), 10)
assertSyncTimeout(s2)
- self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 10)
# Verify we can receive even if sending is on hold:
- cluster[3].assert_browse("q1", [str(i) for i in range(200)])
+ cluster[3].assert_browse("q1", [str(i) for i in range(10)])
# Restart backups, verify queues are released only when both backups are up
cluster.restart(1)
assertSyncTimeout(s1)
- self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s1.unsettled(), 10)
assertSyncTimeout(s2)
- self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 10)
cluster.restart(2)
+ cluster.restart(0)
# Verify everything is up to date and active
def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0;
assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
- cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
- cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(10)+range(10,20)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(10,20)])
cluster[3].wait_status("active"),
s1.session.connection.close()
s2.session.connection.close()
@@ -1164,8 +1155,7 @@ class RecoveryTests(HaBrokerTest):
"""If we join a cluster where the primary is dead, the new primary is
not yet promoted and there are ready backups then we should refuse
promotion so that one of the ready backups can be chosen."""
- # FIXME aconway 2012-10-05: smaller timeout
- cluster = HaCluster(self, 2, args=["--link-heartbeat-interval", 1])
+ cluster = HaCluster(self, 2)
cluster[0].wait_status("active")
cluster[1].wait_status("ready")
cluster.bounce(0, promote_next=False)
@@ -1206,9 +1196,9 @@ class ConfigurationTests(HaBrokerTest):
b = start("none", "none")
check(b, "", "")
-
class StoreTests(BrokerTest):
"""Test for HA with persistence."""
+
def check_skip(self):
if not BrokerTest.store_lib:
print "WARNING: skipping HA+store tests, no store lib found."
@@ -1255,7 +1245,7 @@ class StoreTests(BrokerTest):
doing catch-up from the primary."""
if self.check_skip(): return
cluster = HaCluster(self, 2)
- sn = cluster[0].connect().session()
+ sn = cluster[0].connect(heartbeat=1).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(Message(m, durable=True))
s2 = sn.sender("q2;{create:always,node:{durable:true}}")
@@ -1264,10 +1254,9 @@ class StoreTests(BrokerTest):
# Wait for backup to catch up.
cluster[1].assert_browse_backup("q1", ["foo","bar"])
cluster[1].assert_browse_backup("q2", ["hello"])
-
# Make changes that the backup doesn't see
- cluster.kill(1, promote_next=False)
- r1 = cluster[0].connect().session().receiver("q1")
+ cluster.kill(1, promote_next=False, final=False)
+ r1 = cluster[0].connect(heartbeat=1).session().receiver("q1")
for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
r1.session.acknowledge()
for m in ["x","y","z"]: s1.send(Message(m, durable=True))
@@ -1285,7 +1274,8 @@ class StoreTests(BrokerTest):
# Verify state
cluster[0].assert_browse("q1", ["x","y","z"])
cluster[1].assert_browse_backup("q1", ["x","y","z"])
- sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over!
+
+ sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over!
sn.sender("ex/k1").send("boo")
cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
@@ -1298,8 +1288,6 @@ if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")
if qpid_ha and os.path.exists(qpid_ha):
- os.execvp("qpid-python-test",
- ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
+ os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
else:
print "Skipping ha_tests, %s not available"%(qpid_ha)
-