diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 53 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 8 | ||||
-rwxr-xr-x | cpp/src/tests/federation.py | 73 |
7 files changed, 90 insertions, 79 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index fc52ab3711..5b8104c77c 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -76,13 +76,13 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (bk.queues.add_unless(b, MatchQueue(queue))) { b->startManagement(); - propagate = bk.fedBinding.addOrigin(fedOrigin); + propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } } else { // queue already present - still need to track fedOrigin - bk.fedBinding.addOrigin(fedOrigin); + bk.fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } } else if (fedOp == fedOpUnbind) { @@ -90,11 +90,12 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con BoundKey& bk = bindings[routingKey]; QPID_LOG(debug, "Bind - fedOpUnbind key [" << routingKey << "] queue " << queue->getName() - << " (origin=" << fedOrigin << ")"); + << " (origin=" << fedOrigin << ")" << " (count=" << bk.fedBinding.count() << ")"); - propagate = bk.fedBinding.delOrigin(fedOrigin); - if (bk.fedBinding.count() == 0) + propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin); + if (bk.fedBinding.countFedBindings(queue->getName()) == 0) unbind(queue, routingKey, 0); + } else if (fedOp == fedOpReorigin) { /** gather up all the keys that need rebinding in a local vector * while holding the lock. Then propagate once the lock is @@ -142,6 +143,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } } + // If I delete my local binding, propagate this unbind to any upstream brokers if (propagate) propagateFedOp(routingKey, string(), fedOpUnbind, string()); return true; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 26d7f41015..3c8b5ca2cd 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -95,46 +95,61 @@ protected: bool operator()(Exchange::Binding::shared_ptr b); }; + /** A FedBinding keeps track of information that Federation needs + to know when to propagate changes. + + Dynamic federation needs to know which exchanges have at least + one local binding. The bindings on these exchanges need to be + propagated. + + Federated binds and unbinds need to know which federation + origins are associated with the bindings for each queue. When + origins are added or deleted, the corresponding bindings need + to be propagated. + + fedBindings[queueName] contains the origins associated with + the given queue. + */ + class FedBinding { uint32_t localBindings; - std::set<std::string> originSet; + + typedef std::set<std::string> originSet; + std::map<std::string, originSet> fedBindings; + public: FedBinding() : localBindings(0) {} bool hasLocal() const { return localBindings != 0; } - /** - * Returns 'true' if and only if this is the first local - * binding. - * - * The first local binding may need to be propagated. - */ - bool addOrigin(const std::string& origin) { + /** Returns true if propagation is needed. */ + bool addOrigin(const std::string& queueName, const std::string& origin) { if (origin.empty()) { localBindings++; return localBindings == 1; } - originSet.insert(origin); + fedBindings[queueName].insert(origin); return true; } - bool delOrigin(const std::string& origin) { - originSet.erase(origin); + + /** Returns true if propagation is needed. */ + bool delOrigin(const std::string& queueName, const std::string& origin){ + fedBindings[queueName].erase(origin); return true; } - /** - * Returns 'true' if and only if the last local binding is - * deleted. - * - * When the last local binding is deleted, it may need to - * be propagated. - */ + /** Returns true if propagation is needed. */ bool delOrigin() { if (localBindings > 0) localBindings--; return localBindings == 0; } + uint32_t count() { - return localBindings + originSet.size(); + return localBindings + fedBindings.size(); + } + + uint32_t countFedBindings(const std::string& queueName) { + return fedBindings[queueName].size(); } }; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index a33eba1d09..ac2c914a97 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -53,18 +53,18 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); if (bindings.add_unless(binding, MatchQueue(queue))) { binding->startManagement(); - propagate = fedBinding.addOrigin(fedOrigin); + propagate = fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } } else { // queue already present - still need to track fedOrigin - fedBinding.addOrigin(fedOrigin); + fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } } else if (fedOp == fedOpUnbind) { - propagate = fedBinding.delOrigin(fedOrigin); - if (fedBinding.count() == 0) + propagate = fedBinding.delOrigin(queue->getName(), fedOrigin); + if (fedBinding.countFedBindings(queue->getName()) == 0) unbind(queue, "", 0); } else if (fedOp == fedOpReorigin) { if (fedBinding.hasLocal()) { diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 584cd4c481..82ac5911ee 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -116,12 +116,12 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co BoundKey bk(binding); if (bindings.add_unless(bk, MatchArgs(queue, args))) { binding->startManagement(); - propagate = bk.fedBinding.addOrigin(fedOrigin); + propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } } else { - bk.fedBinding.addOrigin(fedOrigin); + bk.fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } } // lock dropped @@ -129,7 +129,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co } else if (fedOp == fedOpUnbind) { Mutex::ScopedLock l(lock); - FedUnbindModifier modifier(fedOrigin); + FedUnbindModifier modifier(queue->getName(), fedOrigin); bindings.modify_if(MatchKey(queue, bindingKey), modifier); propagate = modifier.shouldPropagate; if (modifier.shouldUnbind) { @@ -325,7 +325,7 @@ bool HeadersExchange::MatchKey::operator()(BoundKey & bk) } //---------- -HeadersExchange::FedUnbindModifier::FedUnbindModifier(string & origin) : fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {} +HeadersExchange::FedUnbindModifier::FedUnbindModifier(const string& queueName, const string& origin) : queueName(queueName), fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {} HeadersExchange::FedUnbindModifier::FedUnbindModifier() : shouldUnbind(false), shouldPropagate(false) {} bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk) @@ -333,9 +333,9 @@ bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk) if ("" == fedOrigin) { shouldPropagate = bk.fedBinding.delOrigin(); } else { - shouldPropagate = bk.fedBinding.delOrigin(fedOrigin); + shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin); } - if (bk.fedBinding.count() == 0) + if (bk.fedBinding.countFedBindings(queueName) == 0) { shouldUnbind = true; } diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 33c119cbbb..3b939d6851 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -60,11 +60,12 @@ class HeadersExchange : public virtual Exchange { struct FedUnbindModifier { + std::string queueName; std::string fedOrigin; bool shouldUnbind; bool shouldPropagate; FedUnbindModifier(); - FedUnbindModifier(std::string & origin); + FedUnbindModifier(const std::string& queueName, const std::string& origin); bool operator()(BoundKey & bk); }; diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 6bc42e28bf..1b0fe71bcf 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -236,7 +236,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons for (q = qv.begin(); q != qv.end(); q++) { if ((*q)->queue == queue) { // already bound, but may be from a different fedOrigin - bk->fedBinding.addOrigin(fedOrigin); + bk->fedBinding.addOrigin(queue->getName(), fedOrigin); return false; } } @@ -245,7 +245,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons binding->startManagement(); bk->bindingVector.push_back(binding); nBindings++; - propagate = bk->fedBinding.addOrigin(fedOrigin); + propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); } @@ -258,8 +258,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons RWlock::ScopedWlock l(lock); BindingKey* bk = bindingTree.getBindingKey(routingPattern); if (bk) { - propagate = bk->fedBinding.delOrigin(fedOrigin); - reallyUnbind = bk->fedBinding.count() == 0; + propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); + reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0; } } if (reallyUnbind) diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 92a28c01ad..973a1d366c 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -1200,15 +1200,15 @@ class FederationTests(TestBase010): # @todo - restore code when QPID-2499 fixed!! sleep(6) # wait for the binding count on B1 to drop from 2 to 1 - # retries = 0 - # exchanges[1].update() - # while exchanges[1].bindingCount != 1: - # retries += 1 - # self.failIfEqual(retries, 10, - # "unbinding failed to propagate to broker B1: %d" - # % exchanges[1].bindingCount) - # sleep(1) - # exchanges[1].update() + retries = 0 + exchanges[1].update() + while exchanges[1].bindingCount != 1: + retries += 1 + self.failIfEqual(retries, 10, + "unbinding failed to propagate to broker B1: %d" + % exchanges[1].bindingCount) + sleep(1) + exchanges[1].update() # send 10 msgs from B0 for i in range(11, 21): @@ -1216,13 +1216,12 @@ class FederationTests(TestBase010): self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i)) # verify messages are forwarded to B3 only - # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499? retries = 0 for ex in exchanges: ex.update() while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or - exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or - exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): retries += 1 self.failIfEqual(retries, 10, @@ -1455,18 +1454,16 @@ class FederationTests(TestBase010): self._brokers[2].client_session.message_cancel(destination="f1") self._brokers[2].client_session.queue_delete(queue="fedX1") - # @todo - restore code when QPID-2499 fixed!! - sleep(6) # wait for the binding count on B1 to drop from 2 to 1 - # retries = 0 - # exchanges[1].update() - # while exchanges[1].bindingCount != 1: - # retries += 1 - # self.failIfEqual(retries, 10, - # "unbinding failed to propagate to broker B1: %d" - # % exchanges[1].bindingCount) - # sleep(1) - # exchanges[1].update() + retries = 0 + exchanges[1].update() + while exchanges[1].bindingCount != 1: + retries += 1 + self.failIfEqual(retries, 10, + "unbinding failed to propagate to broker B1: %d" + % exchanges[1].bindingCount) + sleep(1) + exchanges[1].update() # send 10 msgs from B0 for i in range(11, 21): @@ -1474,13 +1471,12 @@ class FederationTests(TestBase010): self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message_trp %d" % i)) # verify messages are forwarded to B3 only - # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499? retries = 0 for ex in exchanges: ex.update() while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or - exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or - exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): retries += 1 self.failIfEqual(retries, 10, @@ -1714,18 +1710,16 @@ class FederationTests(TestBase010): self._brokers[2].client_session.message_cancel(destination="f1") self._brokers[2].client_session.queue_delete(queue="fedX1") - # @todo - find a proper way to check the propagation here! - sleep(6) # wait for the binding count on B1 to drop from 2 to 1 - # retries = 0 - # exchanges[1].update() - # while exchanges[1].bindingCount != 1: - # retries += 1 - # self.failIfEqual(retries, 10, - # "unbinding failed to propagate to broker B1: %d" - # % exchanges[1].bindingCount) - # sleep(1) - # exchanges[1].update() + retries = 0 + exchanges[1].update() + while exchanges[1].bindingCount != 1: + retries += 1 + self.failIfEqual(retries, 10, + "unbinding failed to propagate to broker B1: %d" + % exchanges[1].bindingCount) + sleep(1) + exchanges[1].update() # send 10 msgs from B0 for i in range(11, 21): @@ -1733,13 +1727,12 @@ class FederationTests(TestBase010): self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message_frp %d" % i)) # verify messages are forwarded to B3 only - # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499? retries = 0 for ex in exchanges: ex.update() while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or - exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or - exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): retries += 1 self.failIfEqual(retries, 10, |