diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-04-16 20:57:44 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-04-16 20:57:44 +0000 |
commit | d342d013775b2df2c40508824b324994475468b3 (patch) | |
tree | 9b77e20f3659d5da80b2ccdcc536f0b4b75eeb57 /qpid/cpp/src/tests/federation.py | |
parent | 00735020f17f94088da3141a8a861767e35b9684 (diff) | |
download | qpid-python-d342d013775b2df2c40508824b324994475468b3.tar.gz |
QPID-2487: fix route propagation tests to prevent spurious timeouts.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@935079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/federation.py')
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 321 |
1 files changed, 263 insertions, 58 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index c5e379b829..0addcf029a 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -872,11 +872,24 @@ class FederationTests(TestBase010): self._setup_brokers() - # create direct exchange on each broker + # create direct exchange on each broker, and retrieve the corresponding + # management object for that exchange + exchanges=[] for _b in self._brokers: _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + # pull the exchange out of qmf... + my_exchange = None + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.direct": + my_exchange = ooo + break + self.assertTrue(my_exchange is not None) + exchanges.append(my_exchange) + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") + # connect B0 --> B1 result = self._brokers[1].qmf_object.connect(self._brokers[0].host, self._brokers[0].port, @@ -928,29 +941,51 @@ class FederationTests(TestBase010): queue_3 = self._brokers[3].client_session.incoming("f1") # wait until the binding key has propagated to each broker (twice at - # broker 2) - - retries = 0 - count = 0 - for xxx in qmf.getObjects(_class="binding"): - if xxx.bindingKey == "spudboy": - count += 1 - while count < 5: - retries += 1 - if retries >= 10: - self.fail("binding did not propagate to all brokers!") - return - sleep(1) - count = 0 - for xxx in qmf.getObjects(_class="binding"): - if xxx.bindingKey == "spudboy": - count += 1 + # broker B1) + + binding_counts = [1, 2, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(len(exchanges)): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(1) + exchanges[i].update() # send 10 msgs from B0 for i in range(1, 11): dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy") self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i)) + # wait for 10 messages to be forwarded from B0->B1, + # 10 messages from B1->B2, + # and 10 messages from B1->B3 + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or + exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + # get exactly 10 msgs on B2 and B3 for i in range(1, 11): msg = queue_2.get(timeout=5) @@ -968,19 +1003,54 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue_3: " + extra.body) except Empty: None + # tear down the queue on B2 self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") self._brokers[2].client_session.message_cancel(destination="f1") self._brokers[2].client_session.queue_delete(queue="fedX1") - # @todo - find a proper way to check the propagation here! + # @todo - restore code when QPID-2499 fixed!! sleep(6) + # wait for the binding count on B1 to drop from 2 to 1 + # retries = 0 + # exchanges[1].update() + # while exchanges[1].bindingCount != 1: + # retries += 1 + # self.failIfEqual(retries, 10, + # "unbinding failed to propagate to broker B1: %d" + # % exchanges[1].bindingCount) + # sleep(1) + # exchanges[1].update() # send 10 msgs from B0 for i in range(1, 11): dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy") self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i)) + # verify messages are forwarded to B3 only + # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499? + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or + exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + # get exactly 10 msgs on B3 only for i in range(1, 11): msg = queue_3.get(timeout=5) @@ -1034,10 +1104,22 @@ class FederationTests(TestBase010): self._setup_brokers() - # create topic exchange on each broker + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + exchanges=[] for _b in self._brokers: _b.client_session.exchange_declare(exchange="fedX.topic", type="topic") + # pull the exchange out of qmf... + my_exchange = None + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.topic": + my_exchange = ooo + break + self.assertTrue(my_exchange is not None) + exchanges.append(my_exchange) + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") # connect B0 --> B1 result = self._brokers[1].qmf_object.connect(self._brokers[0].host, @@ -1090,29 +1172,51 @@ class FederationTests(TestBase010): queue_3 = self._brokers[3].client_session.incoming("f1") # wait until the binding key has propagated to each broker (twice at - # broker 2) - - retries = 0 - count = 0 - for xxx in qmf.getObjects(_class="binding"): - if xxx.bindingKey == "spud.*": - count += 1 - while count < 5: - retries += 1 - if retries >= 10: - self.fail("binding did not propagate to all brokers!") - return - sleep(1) - count = 0 - for xxx in qmf.getObjects(_class="binding"): - if xxx.bindingKey == "spud.*": - count += 1 + # broker B1) + + binding_counts = [1, 2, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(len(exchanges)): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(1) + exchanges[i].update() # send 10 msgs from B0 for i in range(1, 11): dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy") self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i)) + # wait for 10 messages to be forwarded from B0->B1, + # 10 messages from B1->B2, + # and 10 messages from B1->B3 + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or + exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + # get exactly 10 msgs on B2 and B3 for i in range(1, 11): msg = queue_2.get(timeout=5) @@ -1135,14 +1239,48 @@ class FederationTests(TestBase010): self._brokers[2].client_session.message_cancel(destination="f1") self._brokers[2].client_session.queue_delete(queue="fedX1") - # @todo - find a proper way to check the propagation here! + # @todo - restore code when QPID-2499 fixed!! sleep(6) + # wait for the binding count on B1 to drop from 2 to 1 + # retries = 0 + # exchanges[1].update() + # while exchanges[1].bindingCount != 1: + # retries += 1 + # self.failIfEqual(retries, 10, + # "unbinding failed to propagate to broker B1: %d" + # % exchanges[1].bindingCount) + # sleep(1) + # exchanges[1].update() # send 10 msgs from B0 for i in range(1, 11): dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy") self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i)) + # verify messages are forwarded to B3 only + # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499? + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or + exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + # get exactly 10 msgs on B3 only for i in range(1, 11): msg = queue_3.get(timeout=5) @@ -1197,10 +1335,22 @@ class FederationTests(TestBase010): self._setup_brokers() - # create fanout exchange on each broker + # create fanout exchange on each broker, and retrieve the corresponding + # management object for that exchange + exchanges=[] for _b in self._brokers: _b.client_session.exchange_declare(exchange="fedX.fanout", type="fanout") + # pull the exchange out of qmf... + my_exchange = None + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.fanout": + my_exchange = ooo + break + self.assertTrue(my_exchange is not None) + exchanges.append(my_exchange) + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") # connect B0 --> B1 result = self._brokers[1].qmf_object.connect(self._brokers[0].host, @@ -1252,31 +1402,52 @@ class FederationTests(TestBase010): self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1") queue_3 = self._brokers[3].client_session.incoming("f1") - # wait until the binding has propagated to each broker by - # counting the number of bindings that have origins. - # Should have 3: 1 for B0, 2 for B1. - - retries = 0 - count = 0 - for xxx in qmf.getObjects(_class="binding"): - if xxx.origin: - count += 1 - while count < 3: - retries += 1 - if retries >= 10: - self.fail("binding did not propagate to all brokers!") - return - sleep(1) - count = 0 - for xxx in qmf.getObjects(_class="binding"): - if xxx.origin: - count += 1 + # wait until the binding key has propagated to each broker (twice at + # broker B1) + + binding_counts = [1, 2, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(len(exchanges)): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(1) + exchanges[i].update() # send 10 msgs from B0 for i in range(1, 11): dp = self._brokers[0].client_session.delivery_properties() self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i)) + # wait for 10 messages to be forwarded from B0->B1, + # 10 messages from B1->B2, + # and 10 messages from B1->B3 + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or + exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + # get exactly 10 msgs on B2 and B3 for i in range(1, 11): msg = queue_2.get(timeout=5) @@ -1301,12 +1472,46 @@ class FederationTests(TestBase010): # @todo - find a proper way to check the propagation here! sleep(6) + # wait for the binding count on B1 to drop from 2 to 1 + # retries = 0 + # exchanges[1].update() + # while exchanges[1].bindingCount != 1: + # retries += 1 + # self.failIfEqual(retries, 10, + # "unbinding failed to propagate to broker B1: %d" + # % exchanges[1].bindingCount) + # sleep(1) + # exchanges[1].update() # send 10 msgs from B0 for i in range(1, 11): dp = self._brokers[0].client_session.delivery_properties() self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i)) + # verify messages are forwarded to B3 only + # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499? + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or + exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + # get exactly 10 msgs on B3 only for i in range(1, 11): msg = queue_3.get(timeout=5) |