diff options
author | Ted Ross <tross@apache.org> | 2008-10-10 19:24:40 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-10-10 19:24:40 +0000 |
commit | 792e46cad9a978c0d297c245772e57c0759cec31 (patch) | |
tree | bb227f31c63eac3e9e27c951917c129c4e1b682f /cpp/src/tests/federation.py | |
parent | f1935bda5aa75a139330a4b1e976c99536c6c04f (diff) | |
download | qpid-python-792e46cad9a978c0d297c245772e57c0759cec31.tar.gz |
QPID-1349 - Push routing for federation (includes hook for dynamic routing)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/federation.py')
-rwxr-xr-x | cpp/src/tests/federation.py | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index b92df89839..7e7caeeec6 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -148,6 +148,50 @@ class FederationTests(TestBase010): mgmt.shutdown() + def test_push_to_exchange(self): + session = self.session + + mgmt = Helper(self) + broker = mgmt.get_object("broker") + + mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()}) + link = mgmt.get_object("link") + + mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", + "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0, + "srcIsLocal":1}) + bridge = mgmt.get_object("bridge") + + #setup queue to receive messages from remote broker + r_conn = self.connect(host=remote_host(), port=remote_port()) + r_session = r_conn.session("test_push_to_exchange") + r_session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + r_session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(session=r_session, queue="fed1", destination="f1") + queue = r_session.incoming("f1") + sleep(6) + + #send messages to local broker and confirm it is routed to remote broker + for i in range(1, 11): + dp = session.delivery_properties(routing_key="my-key") + session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + mgmt.call_method(bridge, "close") + mgmt.call_method(link, "close") + sleep(6) + self.assertEqual(len(mgmt.get_objects("bridge")), 0) + self.assertEqual(len(mgmt.get_objects("link")), 0) + + mgmt.shutdown() + def test_pull_from_queue(self): session = self.session |