summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py46
1 files changed, 34 insertions, 12 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index bf044765b5..f3c45ba7a3 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -19,7 +19,7 @@
#
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
-from qpid.messaging import Message, NotFound, ConnectionError
+from qpid.messaging import Message, NotFound, ConnectionError, Connection
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger
@@ -48,15 +48,18 @@ class ShortTests(BrokerTest):
except NotFound: return False
assert retry(check), "Timed out waiting for %s"%(address)
- def assert_missing(self,session, address):
+ def set_ha_status(self, address, status):
+ os.system("qpid-ha-status %s %s"%(address, status))
+
+ def assert_missing(self, session, address):
try:
session.receiver(address)
self.fail("Should not have been replicated: %s"%(address))
except NotFound: pass
def connect_admin(self, backup, **kwargs):
- """Connect to a backup broker as the admin user"""
- return backup.connect(username="qpid-ha-admin", password="dummy", mechanism="PLAIN", **kwargs)
+ """Connect to a backup broker as an admin connection"""
+ return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
def test_replication(self):
"""Test basic replication of wiring and messages before and
@@ -113,6 +116,7 @@ class ShortTests(BrokerTest):
primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
p = primary.connect().session()
+
# Create config, send messages before starting the backup, to test catch-up replication.
setup(p, "1", primary)
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
@@ -120,10 +124,9 @@ class ShortTests(BrokerTest):
setup(p, "2", primary)
# Verify the data on the backup
- b = self.connect_admin(backup, ).session()
+ b = self.connect_admin(backup).session()
verify(b, "1", p)
verify(b, "2", p)
-
# Test a series of messages, enqueue all then dequeue all.
s = p.sender(queue("foo","all"))
self.wait(b, "foo")
@@ -174,6 +177,7 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(b2, "q", msgs)
def test_send_receive(self):
+ """Verify sequence numbers of messages sent by qpid-send"""
primary = self.ha_broker(name="primary", broker_url="primary")
backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
@@ -204,17 +208,35 @@ class ShortTests(BrokerTest):
print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
raise
- def test_exclude(self):
- """Verify that backup rejects connections"""
- primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+ def test_failover(self):
+ """Verify that backups rejects connections and that fail-over works"""
+ primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
- # Admin is allowed
- self.connect_admin(backup)
- # Others are not
+ # Check that backup rejects normal connections
try:
backup.connect()
self.fail("Expected connection to backup to fail")
except ConnectionError: pass
+ # Check that admin connections are allowed to backup.
+ self.connect_admin(backup).close()
+
+ # Test discovery: should connect to primary after reject by backup
+ c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+ s = c.session()
+ s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", sync=True)
+ # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here,
+ # send(sync=True) shouldn't return till the backup acknowledges.
+ bs = self.connect_admin(backup).session()
+ self.wait(bs, "q")
+ self.assert_browse_retry(bs, "q", ["foo"])
+ bs.connection.close()
+
+ primary.kill()
+ # Promote the backup
+ self.set_ha_status(backup.host_port(), "primary")
+ # FIXME aconway 2012-01-23: should re-use session s below
+ self.assert_browse_retry(c.session(), "q", ["foo"])
+ c.close()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)