summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.cpp4
-rw-r--r--qpid/cpp/src/tests/brokertest.py11
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py104
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py94
5 files changed, 128 insertions, 87 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index f461a2f0e0..fa8f330f27 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -878,7 +878,7 @@ namespace {
// Called by ConnectionObserver::disconnected, disconnected from the network side.
void BrokerReplicator::disconnected() {
- QPID_LOG(info, logPrefix << "Disconnected from " << primary);
+ QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
connection = 0;
// Clean up auto-delete queues
vector<boost::shared_ptr<Exchange> > collect;
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
index 6f92bec19e..f6371d018a 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
@@ -103,6 +103,10 @@ void StatusCheckThread::run() {
catch(const exception& e) {
QPID_LOG(warning, statusCheck.logPrefix << "Error closing status check connection to " << url);
}
+ try { c.close(); }
+ catch(const exception& e) {
+ QPID_LOG(warning, "Error closing status check connection to " << url);
+ }
delete this;
}
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 7fc3e7f0eb..290a1dde2c 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -134,14 +134,8 @@ class Popen(subprocess.Popen):
self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
if stdout == FILE: stdout = open(self.outfile("out"), "w")
if stderr == FILE: stderr = open(self.outfile("err"), "w")
- try:
- subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
- stdin=stdin, stdout=stdout, stderr=stderr,
- close_fds=True)
- except ValueError: # Windows can't do close_fds
- subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
- stdin=stdin, stdout=stdout, stderr=stderr)
-
+ subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+ stdin=stdin, stdout=stdout, stderr=stderr)
f = open(self.outfile("cmd"), "w")
try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
finally: f.close()
@@ -180,6 +174,7 @@ class Popen(subprocess.Popen):
finally:
self.wait() # Clean up the process.
+
def communicate(self, input=None):
ret = subprocess.Popen.communicate(self, input)
self.cleanup()
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 4e27675438..75393fbdf1 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -63,18 +63,57 @@ class Credentials(object):
def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+class HaPort:
+ """Many HA tests need to allocate a broker port dynamically and then kill
+ and restart a broker on that same port multiple times. qpidd --port=0 only
+ ensures the port for the initial broker process, subsequent brokers re-using
+ the same port may fail with "address already in use".
+
+ HaPort binds and listens to the port and returns a file descriptor to pass
+ to qpidd --socket-fd. It holds on to the port untill the end of the test so
+ the broker can restart multiple times.
+ """
+
+ def __init__(self, test, port=0):
+ """Bind and listen to port. port=0 allocates a port dynamically.
+ self.port is the allocated port, self.fileno is the file descriptor for
+ qpid --socket-fd."""
+
+ self.test = test
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.bind(("", port))
+ self.socket.listen(5)
+ self.port = self.socket.getsockname()[1]
+ self.fileno = self.socket.fileno()
+ self.stopped = False
+ test.cleanup_stop(self) # Stop during test.tearDown
+
+ def stop(self): # Called in tearDown
+ if not self.stopped:
+ self.stopped = True
+ self.socket.shutdown(socket.SHUT_RDWR)
+ self.socket.close()
+
+ def __str__(self): return "HaPort<port:%s, fileno:%s>"%(self.port, self.fileno)
+
+
class HaBroker(Broker):
"""Start a broker with HA enabled
@param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
"""
- def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
- client_credentials=None, **kwargs):
+ def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True,
+ ha_replicate="all", client_credentials=None, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
"--log-enable=debug+:ha::",
- # FIXME aconway 2012-02-13: workaround slow link failover.
+ # Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
+ # Heartbeat and negotiate time are needed so that a broker wont
+ # stall on an address that doesn't currently have a broker running.
+ "--link-heartbeat-interval=1",
+ "--max-negotiate-time=1000",
"--ha-cluster=%s"%ha_cluster]
if ha_replicate is not None:
args += [ "--ha-replicate=%s"%ha_replicate ]
@@ -89,7 +128,8 @@ acl allow all all
aclf.close()
if not "--acl-file" in args:
args += [ "--acl-file", acl, "--load-module", os.getenv("ACL_LIB") ]
- Broker.__init__(self, test, args, **kwargs)
+ args += ["--socket-fd=%s"%ha_port.fileno, "--listen-disable=tcp"]
+ Broker.__init__(self, test, args, port=ha_port.port, **kwargs)
self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
assert os.path.exists(self.qpid_ha_path)
self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
@@ -97,6 +137,7 @@ acl allow all all
self.qpid_ha_script=import_script(self.qpid_ha_path)
self._agent = None
self.client_credentials = client_credentials
+ self.ha_port = ha_port
def __str__(self): return Broker.__str__(self)
@@ -108,8 +149,7 @@ acl allow all all
args = args + ["--sasl-mechanism", cred.mechanism]
self.qpid_ha_script.main_except(["", "-b", url]+args)
- def promote(self):
- self.ready(); self.qpid_ha(["promote"])
+ def promote(self): self.ready(); self.qpid_ha(["promote"])
def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
@@ -211,14 +251,16 @@ acl allow all all
def ready(self):
return Broker.ready(self, client_properties={"qpid.ha-admin":1})
- def kill(self):
+ def kill(self, final=True):
+ if final: self.ha_port.stop()
self._agent = None
return Broker.kill(self)
+
class HaCluster(object):
_cluster_count = 0
- def __init__(self, test, n, promote=True, wait=True, **kwargs):
+ def __init__(self, test, n, promote=True, wait=True, args=[], **kwargs):
"""Start a cluster of n brokers.
@test: The test being run
@@ -227,36 +269,49 @@ class HaCluster(object):
@wait: wait for primary active and backups ready. Ignored if promote=False
"""
self.test = test
+ self.args = args
self.kwargs = kwargs
+ self._ports = [HaPort(test) for i in xrange(n)]
+ self._set_url()
self._brokers = []
self.id = HaCluster._cluster_count
self.broker_id = 0
HaCluster._cluster_count += 1
- for i in xrange(n): self.start(False)
- self.update_urls()
+ for i in xrange(n): self.start()
if promote:
self[0].promote()
if wait:
self[0].wait_status("active")
for b in self[1:]: b.wait_status("ready")
-
def next_name(self):
name="cluster%s-%s"%(self.id, self.broker_id)
self.broker_id += 1
return name
- def start(self, update_urls=True, args=[]):
- """Start a new broker in the cluster"""
- b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+ def _ha_broker(self, ha_port, name):
+ b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
+ args=self.args, **self.kwargs)
b.ready()
+ return b
+
+ def start(self):
+ """Start a new broker in the cluster"""
+ i = len(self)
+ assert i <= len(self._ports)
+ if i == len(self._ports):
+ self._ports.append(HaPort(self.test))
+ self._set_url()
+ self._update_urls()
+ b = self._ha_broker(self._ports[i], self.next_name())
self._brokers.append(b)
- if update_urls: self.update_urls()
return b
- def update_urls(self):
- self.url = ",".join([b.host_port() for b in self])
- if len(self) > 1: # No failover addresses on a 1 cluster.
+ def _set_url(self):
+ self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports)
+
+ def _update_urls(self):
+ if len(self) > 1: # No failover addresses on a 1 cluster.
for b in self:
b.set_brokers_url(self.url)
b.set_public_url(self.url)
@@ -265,29 +320,28 @@ class HaCluster(object):
"""Connect with reconnect_urls"""
return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
- def kill(self, i, promote_next=True):
+ def kill(self, i, promote_next=True, final=True):
"""Kill broker i, promote broker i+1"""
- self[i].kill()
+ self[i].kill(final=final)
if promote_next: self[(i+1) % len(self)].promote()
def restart(self, i):
"""Start a broker with the same port, name and data directory. It will get
a separate log file: foo.n.log"""
+ if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self))
b = self._brokers[i]
- self._brokers[i] = HaBroker(
- self.test, name=b.name, port=b.port(), brokers_url=self.url,
- **self.kwargs)
+ self._brokers[i] = self._ha_broker(self._ports[i], b.name)
self._brokers[i].ready()
def bounce(self, i, promote_next=True):
"""Stop and restart a broker in a cluster."""
if (len(self) == 1):
- self.kill(i, promote_next=False)
+ self.kill(i, promote_next=False, final=False)
self.restart(i)
self[i].ready()
if promote_next: self[i].promote()
else:
- self.kill(i, promote_next)
+ self.kill(i, promote_next, final=False)
self.restart(i)
# Behave like a list of brokers.
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index db4c7f62e7..c6cc736386 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -230,25 +230,17 @@ class ReplicationTests(HaBrokerTest):
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
- primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
- primary.promote()
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
- url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always}")
- backup.wait_backup("q")
-
- sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
- receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
+ cluster = HaCluster(self, 2)
+ cluster[0].connect().session().sender("q;{create:always}")
+ cluster[1].wait_backup("q")
+ sender = NumberedSender(cluster[0], url=cluster.url, queue="q", failover_updates = False)
+ receiver = NumberedReceiver(cluster[0], url=cluster.url, queue="q", failover_updates = False)
receiver.start()
sender.start()
- backup.wait_backup("q")
assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
-
- primary.kill()
- assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
- backup.promote()
- n = receiver.received # Make sure we are still running
- assert retry(lambda: receiver.received > n + 10)
+ cluster.kill(0)
+ n = receiver.received
+ assert retry(lambda: receiver.received > n + 10) # Verify we are still going
sender.stop()
receiver.stop()
@@ -372,16 +364,14 @@ class ReplicationTests(HaBrokerTest):
def test_priority(self):
"""Verify priority queues replicate correctly"""
- primary = HaBroker(self, name="primary")
- primary.promote()
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
- session = primary.connect().session()
+ cluster = HaCluster(self, 2)
+ session = cluster[0].connect().session()
s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
- backup.wait_backup("priority-queue")
- r = backup.connect_admin().session().receiver("priority-queue")
+ cluster[1].wait_backup("priority-queue")
+ r = cluster[1].connect_admin().session().receiver("priority-queue")
received = [r.fetch().priority for i in priorities]
self.assertEqual(sorted(priorities, reverse=True), received)
@@ -480,9 +470,8 @@ class ReplicationTests(HaBrokerTest):
def test_replicate_binding(self):
"""Verify that binding replication can be disabled"""
- primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
- primary.promote()
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ cluster = HaCluster(self, 2)
+ primary, backup = cluster[0], cluster[1]
ps = primary.connect().session()
ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
@@ -867,7 +856,7 @@ acl deny all all
self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
# Simulate the race by re-creating the objects before promoting the new primary
- cluster.kill(0, False)
+ cluster.kill(0, promote_next=False)
xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
node = "node:{%s}"%(xdecl)
sn = cluster[1].connect_admin().session()
@@ -946,9 +935,10 @@ class LongTests(HaBrokerTest):
# Start sender and receiver threads
n = 10
- senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+ senders = [NumberedSender(brokers[0], url=brokers.url,
+ max_depth=1024, failover_updates=False,
queue="test%s"%(i)) for i in xrange(n)]
- receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+ receivers = [NumberedReceiver(brokers[0], url=brokers.url, sender=senders[i],
failover_updates=False,
queue="test%s"%(i)) for i in xrange(n)]
for r in receivers: r.start()
@@ -987,10 +977,9 @@ class LongTests(HaBrokerTest):
brokers.bounce(victim) # Next one is promoted
primary = next
else:
- brokers.kill(victim, False)
+ brokers.kill(victim, promote_next=False, final=False)
dead = victim
- # At this point the primary is running with 1 or 2 backups
# Make sure we are not stalled
map(wait_passed, receivers, checkpoint)
# Run another checkpoint to ensure things work in this configuration
@@ -1090,11 +1079,11 @@ class RecoveryTests(HaBrokerTest):
# Create a queue before the failure.
s1 = cluster.connect(0).session().sender("q1;{create:always}")
for b in cluster: b.wait_backup("q1")
- for i in xrange(100): s1.send(str(i))
+ for i in xrange(10): s1.send(str(i))
# Kill primary and 2 backups
cluster[3].wait_status("ready")
- for i in [0,1,2]: cluster.kill(i, False)
+ for i in [0,1,2]: cluster.kill(i, promote_next=False, final=False)
cluster[3].promote() # New primary, backups will be 1 and 2
cluster[3].wait_status("recovering")
@@ -1108,31 +1097,33 @@ class RecoveryTests(HaBrokerTest):
s2 = cluster.connect(3).session().sender("q2;{create:always}")
# Verify that messages sent are not completed
- for i in xrange(100,200):
- s1.send(str(i), sync=False);
- s2.send(str(i), sync=False)
+ for i in xrange(10,20):
+ s1.send(str(i), sync=False, timeout=0.1);
+ s2.send(str(i), sync=False, timeout=0.1)
+
assertSyncTimeout(s1)
- self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s1.unsettled(), 10)
assertSyncTimeout(s2)
- self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 10)
# Verify we can receive even if sending is on hold:
- cluster[3].assert_browse("q1", [str(i) for i in range(200)])
+ cluster[3].assert_browse("q1", [str(i) for i in range(10)])
# Restart backups, verify queues are released only when both backups are up
cluster.restart(1)
assertSyncTimeout(s1)
- self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s1.unsettled(), 10)
assertSyncTimeout(s2)
- self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 10)
cluster.restart(2)
+ cluster.restart(0)
# Verify everything is up to date and active
def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0;
assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
- cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
- cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(10)+range(10,20)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(10,20)])
cluster[3].wait_status("active"),
s1.session.connection.close()
s2.session.connection.close()
@@ -1164,8 +1155,7 @@ class RecoveryTests(HaBrokerTest):
"""If we join a cluster where the primary is dead, the new primary is
not yet promoted and there are ready backups then we should refuse
promotion so that one of the ready backups can be chosen."""
- # FIXME aconway 2012-10-05: smaller timeout
- cluster = HaCluster(self, 2, args=["--link-heartbeat-interval", 1])
+ cluster = HaCluster(self, 2)
cluster[0].wait_status("active")
cluster[1].wait_status("ready")
cluster.bounce(0, promote_next=False)
@@ -1206,9 +1196,9 @@ class ConfigurationTests(HaBrokerTest):
b = start("none", "none")
check(b, "", "")
-
class StoreTests(BrokerTest):
"""Test for HA with persistence."""
+
def check_skip(self):
if not BrokerTest.store_lib:
print "WARNING: skipping HA+store tests, no store lib found."
@@ -1255,7 +1245,7 @@ class StoreTests(BrokerTest):
doing catch-up from the primary."""
if self.check_skip(): return
cluster = HaCluster(self, 2)
- sn = cluster[0].connect().session()
+ sn = cluster[0].connect(heartbeat=1).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(Message(m, durable=True))
s2 = sn.sender("q2;{create:always,node:{durable:true}}")
@@ -1264,10 +1254,9 @@ class StoreTests(BrokerTest):
# Wait for backup to catch up.
cluster[1].assert_browse_backup("q1", ["foo","bar"])
cluster[1].assert_browse_backup("q2", ["hello"])
-
# Make changes that the backup doesn't see
- cluster.kill(1, promote_next=False)
- r1 = cluster[0].connect().session().receiver("q1")
+ cluster.kill(1, promote_next=False, final=False)
+ r1 = cluster[0].connect(heartbeat=1).session().receiver("q1")
for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
r1.session.acknowledge()
for m in ["x","y","z"]: s1.send(Message(m, durable=True))
@@ -1285,7 +1274,8 @@ class StoreTests(BrokerTest):
# Verify state
cluster[0].assert_browse("q1", ["x","y","z"])
cluster[1].assert_browse_backup("q1", ["x","y","z"])
- sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over!
+
+ sn = cluster[0].connect(heartbeat=1).session() # FIXME aconway 2012-09-25: should fail over!
sn.sender("ex/k1").send("boo")
cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
@@ -1298,8 +1288,6 @@ if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")
if qpid_ha and os.path.exists(qpid_ha):
- os.execvp("qpid-python-test",
- ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
+ os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
else:
print "Skipping ha_tests, %s not available"%(qpid_ha)
-