summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/federation.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-04-16 20:57:44 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-04-16 20:57:44 +0000
commitd342d013775b2df2c40508824b324994475468b3 (patch)
tree9b77e20f3659d5da80b2ccdcc536f0b4b75eeb57 /qpid/cpp/src/tests/federation.py
parent00735020f17f94088da3141a8a861767e35b9684 (diff)
downloadqpid-python-d342d013775b2df2c40508824b324994475468b3.tar.gz
QPID-2487: fix route propagation tests to prevent spurious timeouts.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@935079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/federation.py')
-rwxr-xr-xqpid/cpp/src/tests/federation.py321
1 files changed, 263 insertions, 58 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index c5e379b829..0addcf029a 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -872,11 +872,24 @@ class FederationTests(TestBase010):
self._setup_brokers()
- # create direct exchange on each broker
+ # create direct exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+ exchanges=[]
for _b in self._brokers:
_b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+ # pull the exchange out of qmf...
+ my_exchange = None
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX.direct":
+ my_exchange = ooo
+ break
+ self.assertTrue(my_exchange is not None)
+ exchanges.append(my_exchange)
+ self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
+
# connect B0 --> B1
result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
self._brokers[0].port,
@@ -928,29 +941,51 @@ class FederationTests(TestBase010):
queue_3 = self._brokers[3].client_session.incoming("f1")
# wait until the binding key has propagated to each broker (twice at
- # broker 2)
-
- retries = 0
- count = 0
- for xxx in qmf.getObjects(_class="binding"):
- if xxx.bindingKey == "spudboy":
- count += 1
- while count < 5:
- retries += 1
- if retries >= 10:
- self.fail("binding did not propagate to all brokers!")
- return
- sleep(1)
- count = 0
- for xxx in qmf.getObjects(_class="binding"):
- if xxx.bindingKey == "spudboy":
- count += 1
+ # broker B1)
+
+ binding_counts = [1, 2, 1, 1]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(len(exchanges)):
+ retries = 0
+ exchanges[i].update()
+ while exchanges[i].bindingCount < binding_counts[i]:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "binding failed to propagate to broker %d"
+ % i)
+ sleep(1)
+ exchanges[i].update()
# send 10 msgs from B0
for i in range(1, 11):
dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy")
self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i))
+ # wait for 10 messages to be forwarded from B0->B1,
+ # 10 messages from B1->B2,
+ # and 10 messages from B1->B3
+ retries = 0
+ for ex in exchanges:
+ ex.update()
+ while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or
+ exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10):
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d"
+ % (exchanges[0].msgReceives,
+ exchanges[0].msgRoutes,
+ exchanges[1].msgReceives,
+ exchanges[1].msgRoutes,
+ exchanges[2].msgReceives,
+ exchanges[2].msgRoutes,
+ exchanges[3].msgReceives,
+ exchanges[3].msgRoutes))
+ sleep(1)
+ for ex in exchanges:
+ ex.update()
+
# get exactly 10 msgs on B2 and B3
for i in range(1, 11):
msg = queue_2.get(timeout=5)
@@ -968,19 +1003,54 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue_3: " + extra.body)
except Empty: None
+
# tear down the queue on B2
self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy")
self._brokers[2].client_session.message_cancel(destination="f1")
self._brokers[2].client_session.queue_delete(queue="fedX1")
- # @todo - find a proper way to check the propagation here!
+ # @todo - restore code when QPID-2499 fixed!!
sleep(6)
+ # wait for the binding count on B1 to drop from 2 to 1
+ # retries = 0
+ # exchanges[1].update()
+ # while exchanges[1].bindingCount != 1:
+ # retries += 1
+ # self.failIfEqual(retries, 10,
+ # "unbinding failed to propagate to broker B1: %d"
+ # % exchanges[1].bindingCount)
+ # sleep(1)
+ # exchanges[1].update()
# send 10 msgs from B0
for i in range(1, 11):
dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy")
self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i))
+ # verify messages are forwarded to B3 only
+ # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
+ retries = 0
+ for ex in exchanges:
+ ex.update()
+ while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
+ exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d"
+ % (exchanges[0].msgReceives,
+ exchanges[0].msgRoutes,
+ exchanges[1].msgReceives,
+ exchanges[1].msgRoutes,
+ exchanges[2].msgReceives,
+ exchanges[2].msgRoutes,
+ exchanges[3].msgReceives,
+ exchanges[3].msgRoutes))
+ sleep(1)
+ for ex in exchanges:
+ ex.update()
+
# get exactly 10 msgs on B3 only
for i in range(1, 11):
msg = queue_3.get(timeout=5)
@@ -1034,10 +1104,22 @@ class FederationTests(TestBase010):
self._setup_brokers()
- # create topic exchange on each broker
+ # create exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+ exchanges=[]
for _b in self._brokers:
_b.client_session.exchange_declare(exchange="fedX.topic", type="topic")
+ # pull the exchange out of qmf...
+ my_exchange = None
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX.topic":
+ my_exchange = ooo
+ break
+ self.assertTrue(my_exchange is not None)
+ exchanges.append(my_exchange)
+ self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
# connect B0 --> B1
result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
@@ -1090,29 +1172,51 @@ class FederationTests(TestBase010):
queue_3 = self._brokers[3].client_session.incoming("f1")
# wait until the binding key has propagated to each broker (twice at
- # broker 2)
-
- retries = 0
- count = 0
- for xxx in qmf.getObjects(_class="binding"):
- if xxx.bindingKey == "spud.*":
- count += 1
- while count < 5:
- retries += 1
- if retries >= 10:
- self.fail("binding did not propagate to all brokers!")
- return
- sleep(1)
- count = 0
- for xxx in qmf.getObjects(_class="binding"):
- if xxx.bindingKey == "spud.*":
- count += 1
+ # broker B1)
+
+ binding_counts = [1, 2, 1, 1]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(len(exchanges)):
+ retries = 0
+ exchanges[i].update()
+ while exchanges[i].bindingCount < binding_counts[i]:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "binding failed to propagate to broker %d"
+ % i)
+ sleep(1)
+ exchanges[i].update()
# send 10 msgs from B0
for i in range(1, 11):
dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy")
self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i))
+ # wait for 10 messages to be forwarded from B0->B1,
+ # 10 messages from B1->B2,
+ # and 10 messages from B1->B3
+ retries = 0
+ for ex in exchanges:
+ ex.update()
+ while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or
+ exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10):
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d"
+ % (exchanges[0].msgReceives,
+ exchanges[0].msgRoutes,
+ exchanges[1].msgReceives,
+ exchanges[1].msgRoutes,
+ exchanges[2].msgReceives,
+ exchanges[2].msgRoutes,
+ exchanges[3].msgReceives,
+ exchanges[3].msgRoutes))
+ sleep(1)
+ for ex in exchanges:
+ ex.update()
+
# get exactly 10 msgs on B2 and B3
for i in range(1, 11):
msg = queue_2.get(timeout=5)
@@ -1135,14 +1239,48 @@ class FederationTests(TestBase010):
self._brokers[2].client_session.message_cancel(destination="f1")
self._brokers[2].client_session.queue_delete(queue="fedX1")
- # @todo - find a proper way to check the propagation here!
+ # @todo - restore code when QPID-2499 fixed!!
sleep(6)
+ # wait for the binding count on B1 to drop from 2 to 1
+ # retries = 0
+ # exchanges[1].update()
+ # while exchanges[1].bindingCount != 1:
+ # retries += 1
+ # self.failIfEqual(retries, 10,
+ # "unbinding failed to propagate to broker B1: %d"
+ # % exchanges[1].bindingCount)
+ # sleep(1)
+ # exchanges[1].update()
# send 10 msgs from B0
for i in range(1, 11):
dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy")
self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i))
+ # verify messages are forwarded to B3 only
+ # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
+ retries = 0
+ for ex in exchanges:
+ ex.update()
+ while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
+ exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d"
+ % (exchanges[0].msgReceives,
+ exchanges[0].msgRoutes,
+ exchanges[1].msgReceives,
+ exchanges[1].msgRoutes,
+ exchanges[2].msgReceives,
+ exchanges[2].msgRoutes,
+ exchanges[3].msgReceives,
+ exchanges[3].msgRoutes))
+ sleep(1)
+ for ex in exchanges:
+ ex.update()
+
# get exactly 10 msgs on B3 only
for i in range(1, 11):
msg = queue_3.get(timeout=5)
@@ -1197,10 +1335,22 @@ class FederationTests(TestBase010):
self._setup_brokers()
- # create fanout exchange on each broker
+ # create fanout exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+ exchanges=[]
for _b in self._brokers:
_b.client_session.exchange_declare(exchange="fedX.fanout", type="fanout")
+ # pull the exchange out of qmf...
+ my_exchange = None
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX.fanout":
+ my_exchange = ooo
+ break
+ self.assertTrue(my_exchange is not None)
+ exchanges.append(my_exchange)
+ self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
# connect B0 --> B1
result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
@@ -1252,31 +1402,52 @@ class FederationTests(TestBase010):
self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1")
queue_3 = self._brokers[3].client_session.incoming("f1")
- # wait until the binding has propagated to each broker by
- # counting the number of bindings that have origins.
- # Should have 3: 1 for B0, 2 for B1.
-
- retries = 0
- count = 0
- for xxx in qmf.getObjects(_class="binding"):
- if xxx.origin:
- count += 1
- while count < 3:
- retries += 1
- if retries >= 10:
- self.fail("binding did not propagate to all brokers!")
- return
- sleep(1)
- count = 0
- for xxx in qmf.getObjects(_class="binding"):
- if xxx.origin:
- count += 1
+ # wait until the binding key has propagated to each broker (twice at
+ # broker B1)
+
+ binding_counts = [1, 2, 1, 1]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(len(exchanges)):
+ retries = 0
+ exchanges[i].update()
+ while exchanges[i].bindingCount < binding_counts[i]:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "binding failed to propagate to broker %d"
+ % i)
+ sleep(1)
+ exchanges[i].update()
# send 10 msgs from B0
for i in range(1, 11):
dp = self._brokers[0].client_session.delivery_properties()
self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i))
+ # wait for 10 messages to be forwarded from B0->B1,
+ # 10 messages from B1->B2,
+ # and 10 messages from B1->B3
+ retries = 0
+ for ex in exchanges:
+ ex.update()
+ while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or
+ exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10):
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d"
+ % (exchanges[0].msgReceives,
+ exchanges[0].msgRoutes,
+ exchanges[1].msgReceives,
+ exchanges[1].msgRoutes,
+ exchanges[2].msgReceives,
+ exchanges[2].msgRoutes,
+ exchanges[3].msgReceives,
+ exchanges[3].msgRoutes))
+ sleep(1)
+ for ex in exchanges:
+ ex.update()
+
# get exactly 10 msgs on B2 and B3
for i in range(1, 11):
msg = queue_2.get(timeout=5)
@@ -1301,12 +1472,46 @@ class FederationTests(TestBase010):
# @todo - find a proper way to check the propagation here!
sleep(6)
+ # wait for the binding count on B1 to drop from 2 to 1
+ # retries = 0
+ # exchanges[1].update()
+ # while exchanges[1].bindingCount != 1:
+ # retries += 1
+ # self.failIfEqual(retries, 10,
+ # "unbinding failed to propagate to broker B1: %d"
+ # % exchanges[1].bindingCount)
+ # sleep(1)
+ # exchanges[1].update()
# send 10 msgs from B0
for i in range(1, 11):
dp = self._brokers[0].client_session.delivery_properties()
self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i))
+ # verify messages are forwarded to B3 only
+ # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
+ retries = 0
+ for ex in exchanges:
+ ex.update()
+ while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
+ exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d"
+ % (exchanges[0].msgReceives,
+ exchanges[0].msgRoutes,
+ exchanges[1].msgReceives,
+ exchanges[1].msgRoutes,
+ exchanges[2].msgReceives,
+ exchanges[2].msgRoutes,
+ exchanges[3].msgReceives,
+ exchanges[3].msgRoutes))
+ sleep(1)
+ for ex in exchanges:
+ ex.update()
+
# get exactly 10 msgs on B3 only
for i in range(1, 11):
msg = queue_3.get(timeout=5)