summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-05-04 13:36:42 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-05-04 13:36:42 +0000
commit673733dc03adb3ee4497d7a841342b5c380a65b2 (patch)
treeb70f77c094b1e981e096908a827726cdaf45d319
parent27fb41307ef6593f09a6aea10e58ce0c736bebf0 (diff)
downloadqpid-python-qpid-3767.tar.gz
QPID-3767: add multilink federation testsqpid-3767
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3767@1333954 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py177
-rwxr-xr-xqpid/cpp/src/tests/federation.py163
2 files changed, 300 insertions, 40 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 8952f5de7b..09eebc5ec9 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -768,6 +768,35 @@ acl deny all all
fetch(cluster[2])
+ def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30):
+ """ Prove that traffic can pass between two federated brokers.
+ """
+ tot_time = 0
+ active = False
+ send_session = src_broker.connect().session()
+ sender = send_session.sender(src)
+ receive_session = dst_broker.connect().session()
+ receiver = receive_session.receiver(dst)
+ while not active and tot_time < timeout:
+ sender.send(Message("Hello from Source!"))
+ try:
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ # Get this far without Empty exception, and the link is good!
+ active = True
+ while True:
+ # Keep receiving msgs, as several may have accumulated
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ except Empty:
+ if not active:
+ tot_time += 1
+ receiver.close()
+ receive_session.close()
+ sender.close()
+ send_session.close()
+ return active
+
def test_federation_failover(self):
"""
Verify that federation operates across failures occuring in a cluster.
@@ -778,38 +807,6 @@ acl deny all all
cluster to newly-added members
"""
- TIMEOUT = 30
- def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
- """ Prove that traffic can pass from source fed broker to
- destination fed broker
- """
- tot_time = 0
- active = False
- send_session = src_broker.connect().session()
- sender = send_session.sender(src)
- receive_session = dst_broker.connect().session()
- receiver = receive_session.receiver(dst)
- while not active and tot_time < timeout:
- sender.send(Message("Hello from Source!"))
- try:
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- # Get this far without Empty exception, and the link is good!
- active = True
- while True:
- # Keep receiving msgs, as several may have accumulated
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- except Empty:
- if not active:
- tot_time += 1
- receiver.close()
- receive_session.close()
- sender.close()
- send_session.close()
- self.assertTrue(active, "Bridge failed to become active")
-
-
# 2 node cluster source, 2 node cluster destination
src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
src_cluster.ready();
@@ -848,43 +845,145 @@ acl deny all all
self.assertEqual(result.status, 0, result)
# check that traffic passes
- verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
# add src[2] broker to source cluster
src_cluster.start(expect=EXPECT_EXIT_FAIL);
src_cluster.ready();
- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
# Kill src[0]. dst[0] should fail over to src[1]
src_cluster[0].kill()
for b in src_cluster[1:]: b.ready()
- verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
# Kill src[1], dst[0] should fail over to src[2]
src_cluster[1].kill()
for b in src_cluster[2:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
# Kill dest[0], force failover to dest[1]
dst_cluster[0].kill()
for b in dst_cluster[1:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
# Add dest[2]
# dest[1] syncs dest[2] to current remote state
dst_cluster.start(expect=EXPECT_EXIT_FAIL);
for b in dst_cluster[1:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
# Kill dest[1], force failover to dest[2]
dst_cluster[1].kill()
for b in dst_cluster[2:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
for i in range(2, len(src_cluster)): src_cluster[i].kill()
for i in range(2, len(dst_cluster)): dst_cluster[i].kill()
+ def test_federation_multilink_failover(self):
+ """
+ Verify that multi-link federation operates across failures occuring in
+ a cluster.
+ """
+
+ # 1 node cluster source, 1 node cluster destination
+ src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ src_cluster.ready();
+ dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ dst_cluster.ready();
+
+ # federate a direct binding across two separate links
+
+ # first, create a direct exchange bound to two queues using different
+ # bindings
+ cmd = self.popen(["qpid-config",
+ "--broker", src_cluster[0].host_port(),
+ "add", "exchange", "direct", "FedX"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "exchange", "direct", "FedX"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ1"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "FedX", "destQ1", "one"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ2"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "FedX", "destQ2", "two"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ # Create two separate links between the dst and source brokers, bind
+ # each to different keys
+ dst_cluster[0].startQmf()
+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+
+ for _l in [("link1", "bridge1", "one"),
+ ("link2", "bridge2", "two")]:
+ result = dst_broker.create("link", _l[0],
+ {"host":src_cluster[0].host(),
+ "port":src_cluster[0].port()},
+ False)
+ self.assertEqual(result.status, 0, result);
+ result = dst_broker.create("bridge", _l[1],
+ {"link":_l[0],
+ "src":"FedX",
+ "dest":"FedX",
+ "key":_l[2]}, False)
+ self.assertEqual(result.status, 0);
+
+ # check that traffic passes
+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+ # add new member, verify traffic
+ src_cluster.start(expect=EXPECT_EXIT_FAIL);
+ src_cluster.ready();
+
+ dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+ dst_cluster.ready();
+
+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+ src_cluster[0].kill()
+ for b in src_cluster[1:]: b.ready()
+
+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2")
+
+ dst_cluster[0].kill()
+ for b in dst_cluster[1:]: b.ready()
+
+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1")
+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2")
+
+ for i in range(1, len(src_cluster)): src_cluster[i].kill()
+ for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
+
+
+
# Some utility code for transaction tests
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 5f483ad258..5bcf67d152 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -23,6 +23,7 @@ from qpid.testlib import TestBase010
from qpid.datatypes import Message
from qpid.queue import Empty
from qpid.util import URL
+import qpid.messaging
from time import sleep, time
@@ -94,6 +95,11 @@ class FederationTests(TestBase010):
break
self._brokers.append(_b)
+ # add a new-style messaging connection to each broker
+ for _b in self._brokers:
+ _b.connection = qpid.messaging.Connection(_b.url)
+ _b.connection.open()
+
def _teardown_brokers(self):
""" Un-does _setup_brokers()
"""
@@ -103,7 +109,7 @@ class FederationTests(TestBase010):
if not _b.client_session.error():
_b.client_session.close(timeout=10)
_b.client_conn.close(timeout=10)
-
+ _b.connection.close()
def test_bridge_create_and_close(self):
self.startQmf();
@@ -2165,3 +2171,158 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_multilink_direct(self):
+ """ Verify that two distinct links can be created between federated
+ brokers.
+ """
+ self.startQmf()
+ qmf = self.qmf
+ self._setup_brokers()
+ src_broker = self._brokers[0]
+ dst_broker = self._brokers[1]
+
+ # create a direct exchange on each broker
+ for _b in [src_broker, dst_broker]:
+ _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+ "direct", "exchange_declare failed!")
+
+ # create destination queues
+ for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+ dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True)
+ dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+
+ # create two connections, one for high priority traffic
+ for _q in ["HiPri", "Traffic"]:
+ result = dst_broker.qmf_object.create("link", _q,
+ {"host":src_broker.host,
+ "port":src_broker.port},
+ False)
+ self.assertEqual(result.status, 0);
+
+ links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link")
+ for _l in links:
+ if _l.name == "HiPri":
+ hi_link = _l
+ elif _l.name == "Traffic":
+ data_link = _l
+ else:
+ self.fail("Unexpected Link found: " + _l.name)
+
+ # now create a route for messages sent with key "high" to use the
+ # hi_link
+ result = dst_broker.qmf_object.create("bridge", "HiPriBridge",
+ {"link":hi_link.name,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "key":"high"}, False)
+ self.assertEqual(result.status, 0);
+
+
+ # create routes for the "medium" and "low" links to use the normal
+ # data_link
+ for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]:
+ result = dst_broker.qmf_object.create("bridge", _b[0],
+ {"link":data_link.name,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "key":_b[1]}, False)
+ self.assertEqual(result.status, 0);
+
+ # now wait for the links to become operational
+ for _l in [hi_link, data_link]:
+ expire_time = time() + 30
+ while _l.state != "Operational" and time() < expire_time:
+ _l.update()
+ self.assertEqual(_l.state, "Operational", "Link failed to become operational")
+
+ # verify each link uses a different connection
+ self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef,
+ "Different links using the same connection")
+
+ hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=hi_link.connectionRef)[0]
+ data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=data_link.connectionRef)[0]
+
+
+ # send hi data, verify only goes over hi link
+
+ r_ssn = dst_broker.connection.session()
+ hi_receiver = r_ssn.receiver("HiQ");
+ med_receiver = r_ssn.receiver("MedQ");
+ low_receiver = r_ssn.receiver("LoQ");
+
+ for _c in [hi_conn, data_conn]:
+ _c.update()
+ self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received")
+
+ s_ssn = src_broker.connection.session()
+ hi_sender = s_ssn.sender("fedX.direct/high")
+ med_sender = s_ssn.sender("fedX.direct/medium")
+ low_sender = s_ssn.sender("fedX.direct/low")
+
+ try:
+ hi_sender.send(qpid.messaging.Message(content="hi priority"))
+ msg = hi_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "hi priority");
+ except:
+ self.fail("Hi Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages")
+
+ # send low and medium, verify it does not go over hi link
+
+ try:
+ med_sender.send(qpid.messaging.Message(content="medium priority"))
+ msg = med_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "medium priority");
+ except:
+ self.fail("Medium Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message")
+
+ try:
+ low_sender.send(qpid.messaging.Message(content="low priority"))
+ msg = low_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "low priority");
+ except:
+ self.fail("Low Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message")
+
+ # cleanup
+
+ for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+ dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+ dst_broker.client_session.queue_delete(queue=_q[0])
+
+ for _b in [src_broker, dst_broker]:
+ _b.client_session.exchange_delete(exchange="fedX.direct")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
+
+