diff options
Diffstat (limited to 'cpp/src/tests/federation.py')
-rwxr-xr-x | cpp/src/tests/federation.py | 334 |
1 files changed, 230 insertions, 104 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index d9bafd9d88..92162fd98d 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -20,7 +20,6 @@ import sys from qpid.testlib import TestBase010, testrunner -from qpid.management import managementChannel, managementClient from qpid.datatypes import Message from qpid.queue import Empty from time import sleep @@ -54,68 +53,45 @@ def remote_host(): def remote_port(): return int(scan_args("--remote-port")) -class Helper: - def __init__(self, parent): - self.parent = parent - self.session = parent.conn.session("Helper") - self.mc = managementClient(self.session.spec) - self.mch = self.mc.addChannel(self.session) - self.mc.syncWaitForStable(self.mch) - - def shutdown (self): - self.mc.removeChannel (self.mch) +class FederationTests(TestBase010): - def get_objects(self, type): - return self.mc.syncGetObjects(self.mch, type) + def test_bridge_create_and_close(self): + self.startQmf(); + qmf = self.qmf - def get_object(self, type, position = 1, expected = None): - objects = self.get_objects(type) - if not expected: expected = position - self.assertEqual(len(objects), expected) - return objects[(position - 1)] + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - - def call_method(self, object, method, args=None): - res = self.mc.syncCallMethod(self.mch, object.id, object.classKey, method, args) - self.assertEqual(res.status, 0) - self.assertEqual(res.statusText, "OK") - return res - - def assertEqual(self, a, b): - self.parent.assertEqual(a, b) - -class FederationTests(TestBase010): + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False) + self.assertEqual(result.status, 0) - def test_bridge_create_and_close(self): - mgmt = Helper(self) - broker = mgmt.get_object("broker") - - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.direct", "key":"my-key"}) - bridge = mgmt.get_object("bridge") - - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") + bridge = qmf.getObjects(_class="bridge")[0] + result = bridge.close() + self.assertEqual(result.status, 0) - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) + result = link.close() + self.assertEqual(result.status, 0) - mgmt.shutdown () + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) def test_pull_from_exchange(self): session = self.session - mgmt = Helper(self) - broker = mgmt.get_object("broker") + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key"}) - bridge = mgmt.get_object("bridge") + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from local broker session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) @@ -140,27 +116,29 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) - mgmt.shutdown() + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) def test_push_to_exchange(self): session = self.session - mgmt = Helper(self) - broker = mgmt.get_object("broker") + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", - "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0, - "srcIsLocal":1}) - bridge = mgmt.get_object("bridge") + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from remote broker r_conn = self.connect(host=remote_host(), port=remote_port()) @@ -184,13 +162,14 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) - mgmt.shutdown() + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) def test_pull_from_queue(self): session = self.session @@ -209,16 +188,18 @@ class FederationTests(TestBase010): self.subscribe(queue="fed1", destination="f1") queue = session.incoming("f1") - mgmt = Helper(self) - broker = mgmt.get_object("broker") + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False) + self.assertEqual(result.status, 0) - mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", - "key":"", "tag":"", "excludes":"", "srcIsQueue":1}) - sleep(6) - bridge = mgmt.get_object("bridge") + bridge = qmf.getObjects(_class="bridge")[0] + sleep(3) #add some more messages (i.e. after bridge was created) for i in range(6, 11): @@ -236,14 +217,14 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) - - mgmt.shutdown () + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) def test_tracing_automatic(self): remoteUrl = "%s:%d" % (remote_host(), remote_port()) @@ -307,22 +288,24 @@ class FederationTests(TestBase010): def test_tracing(self): session = self.session - mgmt = Helper(self) - broker = mgmt.get_object("broker") + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) - mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) - link = mgmt.get_object("link") - - mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key", - "tag":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"}) - sleep(6) - bridge = mgmt.get_object("bridge") + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id", + "exclude-me,also-exclude-me", False, False, False) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from local broker session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) session.exchange_bind(queue="fed1", exchange="amq.fanout") self.subscribe(queue="fed1", destination="f1") queue = session.incoming("f1") + sleep(6) #send messages to remote broker and confirm it is routed to local broker r_conn = self.connect(host=remote_host(), port=remote_port()) @@ -347,13 +330,155 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - mgmt.call_method(bridge, "close") - mgmt.call_method(link, "close") - sleep(6) - self.assertEqual(len(mgmt.get_objects("bridge")), 0) - self.assertEqual(len(mgmt.get_objects("link")), 0) + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + def test_dynamic_fanout(self): + session = self.session + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_dynamic_fanout") + + session.exchange_declare(exchange="fed.fanout", type="fanout") + r_session.exchange_declare(exchange="fed.fanout", type="fanout") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties() + r_session.message_transfer(destination="fed.fanout", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + + def test_dynamic_direct(self): + session = self.session + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_dynamic_direct") + + session.exchange_declare(exchange="fed.direct", type="direct") + r_session.exchange_declare(exchange="fed.direct", type="direct") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.direct", binding_key="fd-key") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="fd-key") + r_session.message_transfer(destination="fed.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) + + + def test_dynamic_topic(self): + session = self.session + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_dynamic_topic") + + session.exchange_declare(exchange="fed.topic", type="topic") + r_session.exchange_declare(exchange="fed.topic", type="topic") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="ft-key.#") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="ft-key.one.two") + r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + sleep(3) + self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) + self.assertEqual(len(qmf.getObjects(_class="link")), 0) - mgmt.shutdown () def getProperty(self, msg, name): for h in msg.headers: @@ -364,7 +489,8 @@ class FederationTests(TestBase010): headers = self.getProperty(msg, "application_headers") if headers: return headers[name] - return None + return None + if __name__ == '__main__': args = sys.argv[1:] |