summaryrefslogtreecommitdiff
path: root/cpp/src/tests/federation.py
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-10 19:24:40 +0000
committerTed Ross <tross@apache.org>2008-10-10 19:24:40 +0000
commit792e46cad9a978c0d297c245772e57c0759cec31 (patch)
treebb227f31c63eac3e9e27c951917c129c4e1b682f /cpp/src/tests/federation.py
parentf1935bda5aa75a139330a4b1e976c99536c6c04f (diff)
downloadqpid-python-792e46cad9a978c0d297c245772e57c0759cec31.tar.gz
QPID-1349 - Push routing for federation (includes hook for dynamic routing)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/federation.py')
-rwxr-xr-xcpp/src/tests/federation.py44
1 files changed, 44 insertions, 0 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index b92df89839..7e7caeeec6 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -148,6 +148,50 @@ class FederationTests(TestBase010):
mgmt.shutdown()
+ def test_push_to_exchange(self):
+ session = self.session
+
+ mgmt = Helper(self)
+ broker = mgmt.get_object("broker")
+
+ mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
+ link = mgmt.get_object("link")
+
+ mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout",
+ "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0,
+ "srcIsLocal":1})
+ bridge = mgmt.get_object("bridge")
+
+ #setup queue to receive messages from remote broker
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_push_to_exchange")
+ r_session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ r_session.exchange_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(session=r_session, queue="fed1", destination="f1")
+ queue = r_session.incoming("f1")
+ sleep(6)
+
+ #send messages to local broker and confirm it is routed to remote broker
+ for i in range(1, 11):
+ dp = session.delivery_properties(routing_key="my-key")
+ session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ mgmt.call_method(bridge, "close")
+ mgmt.call_method(link, "close")
+ sleep(6)
+ self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+ self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+ mgmt.shutdown()
+
def test_pull_from_queue(self):
session = self.session