summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/federation.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2013-04-11 21:47:40 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2013-04-11 21:47:40 +0000
commit332410c66c62d5e075e9f9077d29fc4669e11db0 (patch)
treee856ee2846d664f98b21f3cb01b3989ba038241c /qpid/cpp/src/tests/federation.py
parent25eaa7e072a1f4bdfc592ac3c4ca57e265a04d40 (diff)
downloadqpid-python-332410c66c62d5e075e9f9077d29fc4669e11db0.tar.gz
QPID-4728: add 'credit' parameter to Federation Bridge configuration.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1467107 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/federation.py')
-rwxr-xr-xqpid/cpp/src/tests/federation.py135
1 files changed, 107 insertions, 28 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 6477c6effd..0da5b47ac2 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -120,7 +120,8 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0)
+ result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "",
+ "", False, False, False, 0, 0)
self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -199,7 +200,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0)
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0, 0)
self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -257,7 +258,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1, 0)
self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -310,7 +311,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1, 0)
self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -361,8 +362,8 @@ class FederationTests(TestBase010):
l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
- l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
- r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
+ l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0, 0)
+ r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0, 0)
self.assertEqual(l_res.status, 0)
self.assertEqual(r_res.status, 0)
@@ -416,7 +417,7 @@ class FederationTests(TestBase010):
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id",
- "exclude-me,also-exclude-me", False, False, False, 0)
+ "exclude-me,also-exclude-me", False, False, False, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -474,7 +475,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -519,7 +520,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -563,7 +564,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -615,9 +616,9 @@ class FederationTests(TestBase010):
queue2 = session.incoming("f2")
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
- result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -687,9 +688,9 @@ class FederationTests(TestBase010):
queue2 = session.incoming("f2")
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
- result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -750,7 +751,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -803,9 +804,9 @@ class FederationTests(TestBase010):
queue2 = session.incoming("f2")
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
- result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -859,7 +860,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -904,7 +905,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -958,10 +959,10 @@ class FederationTests(TestBase010):
queue2 = session.incoming("f2")
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
- result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -1016,7 +1017,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -1064,7 +1065,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -1175,7 +1176,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
# wait for the inter-broker links to become operational
@@ -1433,7 +1435,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
# wait for the inter-broker links to become operational
@@ -1689,7 +1692,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
# wait for the inter-broker links to become operational
@@ -2018,7 +2022,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
# wait for all the inter-broker links to become operational
@@ -2106,7 +2111,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
binding_counts = [2, 2]
@@ -2710,3 +2716,76 @@ class FederationTests(TestBase010):
self._teardown_brokers()
self.verify_cleanup()
+
+ def test_credit(self):
+ """ Test a federation link configured to use explict acks and a credit
+ limit
+ """
+ session = self.session
+
+ # setup queue on remote broker and add some messages
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_credit")
+ r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+
+ #setup queue to receive messages from local broker
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0, result)
+
+ link = qmf.getObjects(_class="link")[0]
+
+ # now wait for Link to go operational
+ retries = 0
+ operational = False
+ while not operational:
+ link.update()
+ if link.state == "Operational":
+ operational = True;
+ if not operational:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "inter-broker links failed to become operational.")
+ sleep(1)
+
+ # create the subscription
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key",
+ "", "", True, False, False,
+ 3, # explicit ack, with sync every 3 msgs
+ 7) # msg credit
+ self.assertEqual(result.status, 0, result)
+ bridge = qmf.getObjects(_class="bridge")[0]
+
+ # generate enough traffic to trigger flow control and syncs
+ for i in range(1000):
+ dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+ r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+ for i in range(1000):
+ try:
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ except Empty:
+ self.fail("Failed to find expected message containing 'Message %d'" % i)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0, result)
+ result = link.close()
+ self.assertEqual(result.status, 0, result)
+
+ r_session.close()
+ r_conn.close()
+
+ self.verify_cleanup()
+