summaryrefslogtreecommitdiff
path: root/cpp/src/tests/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/federation.py')
-rwxr-xr-xcpp/src/tests/federation.py334
1 files changed, 230 insertions, 104 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index d9bafd9d88..92162fd98d 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -20,7 +20,6 @@
import sys
from qpid.testlib import TestBase010, testrunner
-from qpid.management import managementChannel, managementClient
from qpid.datatypes import Message
from qpid.queue import Empty
from time import sleep
@@ -54,68 +53,45 @@ def remote_host():
def remote_port():
return int(scan_args("--remote-port"))
-class Helper:
- def __init__(self, parent):
- self.parent = parent
- self.session = parent.conn.session("Helper")
- self.mc = managementClient(self.session.spec)
- self.mch = self.mc.addChannel(self.session)
- self.mc.syncWaitForStable(self.mch)
-
- def shutdown (self):
- self.mc.removeChannel (self.mch)
+class FederationTests(TestBase010):
- def get_objects(self, type):
- return self.mc.syncGetObjects(self.mch, type)
+ def test_bridge_create_and_close(self):
+ self.startQmf();
+ qmf = self.qmf
- def get_object(self, type, position = 1, expected = None):
- objects = self.get_objects(type)
- if not expected: expected = position
- self.assertEqual(len(objects), expected)
- return objects[(position - 1)]
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
-
- def call_method(self, object, method, args=None):
- res = self.mc.syncCallMethod(self.mch, object.id, object.classKey, method, args)
- self.assertEqual(res.status, 0)
- self.assertEqual(res.statusText, "OK")
- return res
-
- def assertEqual(self, a, b):
- self.parent.assertEqual(a, b)
-
-class FederationTests(TestBase010):
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False)
+ self.assertEqual(result.status, 0)
- def test_bridge_create_and_close(self):
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
-
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.direct", "key":"my-key"})
- bridge = mgmt.get_object("bridge")
-
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
+ bridge = qmf.getObjects(_class="bridge")[0]
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
- mgmt.shutdown ()
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
def test_pull_from_exchange(self):
session = self.session
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key"})
- bridge = mgmt.get_object("bridge")
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False)
+ self.assertEqual(result.status, 0)
+
+ bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from local broker
session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
@@ -140,27 +116,29 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
- mgmt.shutdown()
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
def test_push_to_exchange(self):
session = self.session
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout",
- "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0,
- "srcIsLocal":1})
- bridge = mgmt.get_object("bridge")
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False)
+ self.assertEqual(result.status, 0)
+
+ bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from remote broker
r_conn = self.connect(host=remote_host(), port=remote_port())
@@ -184,13 +162,14 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
- mgmt.shutdown()
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
def test_pull_from_queue(self):
session = self.session
@@ -209,16 +188,18 @@ class FederationTests(TestBase010):
self.subscribe(queue="fed1", destination="f1")
queue = session.incoming("f1")
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False)
+ self.assertEqual(result.status, 0)
- mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout",
- "key":"", "tag":"", "excludes":"", "srcIsQueue":1})
- sleep(6)
- bridge = mgmt.get_object("bridge")
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(3)
#add some more messages (i.e. after bridge was created)
for i in range(6, 11):
@@ -236,14 +217,14 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
-
- mgmt.shutdown ()
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
def test_tracing_automatic(self):
remoteUrl = "%s:%d" % (remote_host(), remote_port())
@@ -307,22 +288,24 @@ class FederationTests(TestBase010):
def test_tracing(self):
session = self.session
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key",
- "tag":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"})
- sleep(6)
- bridge = mgmt.get_object("bridge")
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id",
+ "exclude-me,also-exclude-me", False, False, False)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from local broker
session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
session.exchange_bind(queue="fed1", exchange="amq.fanout")
self.subscribe(queue="fed1", destination="f1")
queue = session.incoming("f1")
+ sleep(6)
#send messages to remote broker and confirm it is routed to local broker
r_conn = self.connect(host=remote_host(), port=remote_port())
@@ -347,13 +330,155 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+ def test_dynamic_fanout(self):
+ session = self.session
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_dynamic_fanout")
+
+ session.exchange_declare(exchange="fed.fanout", type="fanout")
+ r_session.exchange_declare(exchange="fed.fanout", type="fanout")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties()
+ r_session.message_transfer(destination="fed.fanout", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+ def test_dynamic_direct(self):
+ session = self.session
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_dynamic_direct")
+
+ session.exchange_declare(exchange="fed.direct", type="direct")
+ r_session.exchange_declare(exchange="fed.direct", type="direct")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.direct", binding_key="fd-key")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="fd-key")
+ r_session.message_transfer(destination="fed.direct", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+ def test_dynamic_topic(self):
+ session = self.session
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_dynamic_topic")
+
+ session.exchange_declare(exchange="fed.topic", type="topic")
+ r_session.exchange_declare(exchange="fed.topic", type="topic")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="ft-key.#")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="ft-key.one.two")
+ r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
- mgmt.shutdown ()
def getProperty(self, msg, name):
for h in msg.headers:
@@ -364,7 +489,8 @@ class FederationTests(TestBase010):
headers = self.getProperty(msg, "application_headers")
if headers:
return headers[name]
- return None
+ return None
+
if __name__ == '__main__':
args = sys.argv[1:]