summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Robie <jonathan@apache.org>2011-01-13 20:54:03 +0000
committerJonathan Robie <jonathan@apache.org>2011-01-13 20:54:03 +0000
commited044874a6f652a30ce6320a642b624c9b8ce022 (patch)
treed6c760de176f15b8d8226df6adf2f793490ae965
parent8b256bdb1e55a1c81901b20642d385b23c096677 (diff)
downloadqpid-python-ed044874a6f652a30ce6320a642b624c9b8ce022.tar.gz
Fixes QPID-2499: Stale federation routes remain after route deletion.
Federated binds and unbinds need to know which federation origins are associated with the bindings for each queue. When origins are added or deleted, the corresponding bindings need to be propagated. fedBindings[queueName] contains the origins associated with the given queue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1058747 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h53
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.h3
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp8
-rwxr-xr-xqpid/cpp/src/tests/federation.py73
7 files changed, 90 insertions, 79 deletions
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index fc52ab3711..5b8104c77c 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -76,13 +76,13 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
if (bk.queues.add_unless(b, MatchQueue(queue))) {
b->startManagement();
- propagate = bk.fedBinding.addOrigin(fedOrigin);
+ propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
// queue already present - still need to track fedOrigin
- bk.fedBinding.addOrigin(fedOrigin);
+ bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} else if (fedOp == fedOpUnbind) {
@@ -90,11 +90,12 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
BoundKey& bk = bindings[routingKey];
QPID_LOG(debug, "Bind - fedOpUnbind key [" << routingKey << "] queue " << queue->getName()
- << " (origin=" << fedOrigin << ")");
+ << " (origin=" << fedOrigin << ")" << " (count=" << bk.fedBinding.count() << ")");
- propagate = bk.fedBinding.delOrigin(fedOrigin);
- if (bk.fedBinding.count() == 0)
+ propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
+ if (bk.fedBinding.countFedBindings(queue->getName()) == 0)
unbind(queue, routingKey, 0);
+
} else if (fedOp == fedOpReorigin) {
/** gather up all the keys that need rebinding in a local vector
* while holding the lock. Then propagate once the lock is
@@ -142,6 +143,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
}
+ // If I delete my local binding, propagate this unbind to any upstream brokers
if (propagate)
propagateFedOp(routingKey, string(), fedOpUnbind, string());
return true;
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index 26d7f41015..3c8b5ca2cd 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -95,46 +95,61 @@ protected:
bool operator()(Exchange::Binding::shared_ptr b);
};
+ /** A FedBinding keeps track of information that Federation needs
+ to know when to propagate changes.
+
+ Dynamic federation needs to know which exchanges have at least
+ one local binding. The bindings on these exchanges need to be
+ propagated.
+
+ Federated binds and unbinds need to know which federation
+ origins are associated with the bindings for each queue. When
+ origins are added or deleted, the corresponding bindings need
+ to be propagated.
+
+ fedBindings[queueName] contains the origins associated with
+ the given queue.
+ */
+
class FedBinding {
uint32_t localBindings;
- std::set<std::string> originSet;
+
+ typedef std::set<std::string> originSet;
+ std::map<std::string, originSet> fedBindings;
+
public:
FedBinding() : localBindings(0) {}
bool hasLocal() const { return localBindings != 0; }
- /**
- * Returns 'true' if and only if this is the first local
- * binding.
- *
- * The first local binding may need to be propagated.
- */
- bool addOrigin(const std::string& origin) {
+ /** Returns true if propagation is needed. */
+ bool addOrigin(const std::string& queueName, const std::string& origin) {
if (origin.empty()) {
localBindings++;
return localBindings == 1;
}
- originSet.insert(origin);
+ fedBindings[queueName].insert(origin);
return true;
}
- bool delOrigin(const std::string& origin) {
- originSet.erase(origin);
+
+ /** Returns true if propagation is needed. */
+ bool delOrigin(const std::string& queueName, const std::string& origin){
+ fedBindings[queueName].erase(origin);
return true;
}
- /**
- * Returns 'true' if and only if the last local binding is
- * deleted.
- *
- * When the last local binding is deleted, it may need to
- * be propagated.
- */
+ /** Returns true if propagation is needed. */
bool delOrigin() {
if (localBindings > 0)
localBindings--;
return localBindings == 0;
}
+
uint32_t count() {
- return localBindings + originSet.size();
+ return localBindings + fedBindings.size();
+ }
+
+ uint32_t countFedBindings(const std::string& queueName) {
+ return fedBindings[queueName].size();
}
};
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
index a33eba1d09..ac2c914a97 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -53,18 +53,18 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
if (bindings.add_unless(binding, MatchQueue(queue))) {
binding->startManagement();
- propagate = fedBinding.addOrigin(fedOrigin);
+ propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
// queue already present - still need to track fedOrigin
- fedBinding.addOrigin(fedOrigin);
+ fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} else if (fedOp == fedOpUnbind) {
- propagate = fedBinding.delOrigin(fedOrigin);
- if (fedBinding.count() == 0)
+ propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
+ if (fedBinding.countFedBindings(queue->getName()) == 0)
unbind(queue, "", 0);
} else if (fedOp == fedOpReorigin) {
if (fedBinding.hasLocal()) {
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 584cd4c481..82ac5911ee 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -116,12 +116,12 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
BoundKey bk(binding);
if (bindings.add_unless(bk, MatchArgs(queue, args))) {
binding->startManagement();
- propagate = bk.fedBinding.addOrigin(fedOrigin);
+ propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
- bk.fedBinding.addOrigin(fedOrigin);
+ bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} // lock dropped
@@ -129,7 +129,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
} else if (fedOp == fedOpUnbind) {
Mutex::ScopedLock l(lock);
- FedUnbindModifier modifier(fedOrigin);
+ FedUnbindModifier modifier(queue->getName(), fedOrigin);
bindings.modify_if(MatchKey(queue, bindingKey), modifier);
propagate = modifier.shouldPropagate;
if (modifier.shouldUnbind) {
@@ -325,7 +325,7 @@ bool HeadersExchange::MatchKey::operator()(BoundKey & bk)
}
//----------
-HeadersExchange::FedUnbindModifier::FedUnbindModifier(string & origin) : fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {}
+HeadersExchange::FedUnbindModifier::FedUnbindModifier(const string& queueName, const string& origin) : queueName(queueName), fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {}
HeadersExchange::FedUnbindModifier::FedUnbindModifier() : shouldUnbind(false), shouldPropagate(false) {}
bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
@@ -333,9 +333,9 @@ bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
if ("" == fedOrigin) {
shouldPropagate = bk.fedBinding.delOrigin();
} else {
- shouldPropagate = bk.fedBinding.delOrigin(fedOrigin);
+ shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin);
}
- if (bk.fedBinding.count() == 0)
+ if (bk.fedBinding.countFedBindings(queueName) == 0)
{
shouldUnbind = true;
}
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h
index 33c119cbbb..3b939d6851 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.h
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h
@@ -60,11 +60,12 @@ class HeadersExchange : public virtual Exchange {
struct FedUnbindModifier
{
+ std::string queueName;
std::string fedOrigin;
bool shouldUnbind;
bool shouldPropagate;
FedUnbindModifier();
- FedUnbindModifier(std::string & origin);
+ FedUnbindModifier(const std::string& queueName, const std::string& origin);
bool operator()(BoundKey & bk);
};
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index 6bc42e28bf..1b0fe71bcf 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -236,7 +236,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
for (q = qv.begin(); q != qv.end(); q++) {
if ((*q)->queue == queue) {
// already bound, but may be from a different fedOrigin
- bk->fedBinding.addOrigin(fedOrigin);
+ bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
}
@@ -245,7 +245,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
binding->startManagement();
bk->bindingVector.push_back(binding);
nBindings++;
- propagate = bk->fedBinding.addOrigin(fedOrigin);
+ propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
@@ -258,8 +258,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
RWlock::ScopedWlock l(lock);
BindingKey* bk = bindingTree.getBindingKey(routingPattern);
if (bk) {
- propagate = bk->fedBinding.delOrigin(fedOrigin);
- reallyUnbind = bk->fedBinding.count() == 0;
+ propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
+ reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0;
}
}
if (reallyUnbind)
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 92a28c01ad..973a1d366c 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -1200,15 +1200,15 @@ class FederationTests(TestBase010):
# @todo - restore code when QPID-2499 fixed!!
sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1216,13 +1216,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,
@@ -1455,18 +1454,16 @@ class FederationTests(TestBase010):
self._brokers[2].client_session.message_cancel(destination="f1")
self._brokers[2].client_session.queue_delete(queue="fedX1")
- # @todo - restore code when QPID-2499 fixed!!
- sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1474,13 +1471,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message_trp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,
@@ -1714,18 +1710,16 @@ class FederationTests(TestBase010):
self._brokers[2].client_session.message_cancel(destination="f1")
self._brokers[2].client_session.queue_delete(queue="fedX1")
- # @todo - find a proper way to check the propagation here!
- sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1733,13 +1727,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message_frp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,