summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp12
-rw-r--r--cpp/src/qpid/broker/Exchange.h53
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp12
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h3
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp8
-rwxr-xr-xcpp/src/tests/federation.py73
7 files changed, 90 insertions, 79 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index fc52ab3711..5b8104c77c 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -76,13 +76,13 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
if (bk.queues.add_unless(b, MatchQueue(queue))) {
b->startManagement();
- propagate = bk.fedBinding.addOrigin(fedOrigin);
+ propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
// queue already present - still need to track fedOrigin
- bk.fedBinding.addOrigin(fedOrigin);
+ bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} else if (fedOp == fedOpUnbind) {
@@ -90,11 +90,12 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
BoundKey& bk = bindings[routingKey];
QPID_LOG(debug, "Bind - fedOpUnbind key [" << routingKey << "] queue " << queue->getName()
- << " (origin=" << fedOrigin << ")");
+ << " (origin=" << fedOrigin << ")" << " (count=" << bk.fedBinding.count() << ")");
- propagate = bk.fedBinding.delOrigin(fedOrigin);
- if (bk.fedBinding.count() == 0)
+ propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
+ if (bk.fedBinding.countFedBindings(queue->getName()) == 0)
unbind(queue, routingKey, 0);
+
} else if (fedOp == fedOpReorigin) {
/** gather up all the keys that need rebinding in a local vector
* while holding the lock. Then propagate once the lock is
@@ -142,6 +143,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
}
+ // If I delete my local binding, propagate this unbind to any upstream brokers
if (propagate)
propagateFedOp(routingKey, string(), fedOpUnbind, string());
return true;
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 26d7f41015..3c8b5ca2cd 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -95,46 +95,61 @@ protected:
bool operator()(Exchange::Binding::shared_ptr b);
};
+ /** A FedBinding keeps track of information that Federation needs
+ to know when to propagate changes.
+
+ Dynamic federation needs to know which exchanges have at least
+ one local binding. The bindings on these exchanges need to be
+ propagated.
+
+ Federated binds and unbinds need to know which federation
+ origins are associated with the bindings for each queue. When
+ origins are added or deleted, the corresponding bindings need
+ to be propagated.
+
+ fedBindings[queueName] contains the origins associated with
+ the given queue.
+ */
+
class FedBinding {
uint32_t localBindings;
- std::set<std::string> originSet;
+
+ typedef std::set<std::string> originSet;
+ std::map<std::string, originSet> fedBindings;
+
public:
FedBinding() : localBindings(0) {}
bool hasLocal() const { return localBindings != 0; }
- /**
- * Returns 'true' if and only if this is the first local
- * binding.
- *
- * The first local binding may need to be propagated.
- */
- bool addOrigin(const std::string& origin) {
+ /** Returns true if propagation is needed. */
+ bool addOrigin(const std::string& queueName, const std::string& origin) {
if (origin.empty()) {
localBindings++;
return localBindings == 1;
}
- originSet.insert(origin);
+ fedBindings[queueName].insert(origin);
return true;
}
- bool delOrigin(const std::string& origin) {
- originSet.erase(origin);
+
+ /** Returns true if propagation is needed. */
+ bool delOrigin(const std::string& queueName, const std::string& origin){
+ fedBindings[queueName].erase(origin);
return true;
}
- /**
- * Returns 'true' if and only if the last local binding is
- * deleted.
- *
- * When the last local binding is deleted, it may need to
- * be propagated.
- */
+ /** Returns true if propagation is needed. */
bool delOrigin() {
if (localBindings > 0)
localBindings--;
return localBindings == 0;
}
+
uint32_t count() {
- return localBindings + originSet.size();
+ return localBindings + fedBindings.size();
+ }
+
+ uint32_t countFedBindings(const std::string& queueName) {
+ return fedBindings[queueName].size();
}
};
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index a33eba1d09..ac2c914a97 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -53,18 +53,18 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
if (bindings.add_unless(binding, MatchQueue(queue))) {
binding->startManagement();
- propagate = fedBinding.addOrigin(fedOrigin);
+ propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
// queue already present - still need to track fedOrigin
- fedBinding.addOrigin(fedOrigin);
+ fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} else if (fedOp == fedOpUnbind) {
- propagate = fedBinding.delOrigin(fedOrigin);
- if (fedBinding.count() == 0)
+ propagate = fedBinding.delOrigin(queue->getName(), fedOrigin);
+ if (fedBinding.countFedBindings(queue->getName()) == 0)
unbind(queue, "", 0);
} else if (fedOp == fedOpReorigin) {
if (fedBinding.hasLocal()) {
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 584cd4c481..82ac5911ee 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -116,12 +116,12 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
BoundKey bk(binding);
if (bindings.add_unless(bk, MatchArgs(queue, args))) {
binding->startManagement();
- propagate = bk.fedBinding.addOrigin(fedOrigin);
+ propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
} else {
- bk.fedBinding.addOrigin(fedOrigin);
+ bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
} // lock dropped
@@ -129,7 +129,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
} else if (fedOp == fedOpUnbind) {
Mutex::ScopedLock l(lock);
- FedUnbindModifier modifier(fedOrigin);
+ FedUnbindModifier modifier(queue->getName(), fedOrigin);
bindings.modify_if(MatchKey(queue, bindingKey), modifier);
propagate = modifier.shouldPropagate;
if (modifier.shouldUnbind) {
@@ -325,7 +325,7 @@ bool HeadersExchange::MatchKey::operator()(BoundKey & bk)
}
//----------
-HeadersExchange::FedUnbindModifier::FedUnbindModifier(string & origin) : fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {}
+HeadersExchange::FedUnbindModifier::FedUnbindModifier(const string& queueName, const string& origin) : queueName(queueName), fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {}
HeadersExchange::FedUnbindModifier::FedUnbindModifier() : shouldUnbind(false), shouldPropagate(false) {}
bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
@@ -333,9 +333,9 @@ bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
if ("" == fedOrigin) {
shouldPropagate = bk.fedBinding.delOrigin();
} else {
- shouldPropagate = bk.fedBinding.delOrigin(fedOrigin);
+ shouldPropagate = bk.fedBinding.delOrigin(queueName, fedOrigin);
}
- if (bk.fedBinding.count() == 0)
+ if (bk.fedBinding.countFedBindings(queueName) == 0)
{
shouldUnbind = true;
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 33c119cbbb..3b939d6851 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -60,11 +60,12 @@ class HeadersExchange : public virtual Exchange {
struct FedUnbindModifier
{
+ std::string queueName;
std::string fedOrigin;
bool shouldUnbind;
bool shouldPropagate;
FedUnbindModifier();
- FedUnbindModifier(std::string & origin);
+ FedUnbindModifier(const std::string& queueName, const std::string& origin);
bool operator()(BoundKey & bk);
};
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 6bc42e28bf..1b0fe71bcf 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -236,7 +236,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
for (q = qv.begin(); q != qv.end(); q++) {
if ((*q)->queue == queue) {
// already bound, but may be from a different fedOrigin
- bk->fedBinding.addOrigin(fedOrigin);
+ bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
return false;
}
}
@@ -245,7 +245,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
binding->startManagement();
bk->bindingVector.push_back(binding);
nBindings++;
- propagate = bk->fedBinding.addOrigin(fedOrigin);
+ propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
@@ -258,8 +258,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
RWlock::ScopedWlock l(lock);
BindingKey* bk = bindingTree.getBindingKey(routingPattern);
if (bk) {
- propagate = bk->fedBinding.delOrigin(fedOrigin);
- reallyUnbind = bk->fedBinding.count() == 0;
+ propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
+ reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0;
}
}
if (reallyUnbind)
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index 92a28c01ad..973a1d366c 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -1200,15 +1200,15 @@ class FederationTests(TestBase010):
# @todo - restore code when QPID-2499 fixed!!
sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1216,13 +1216,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,
@@ -1455,18 +1454,16 @@ class FederationTests(TestBase010):
self._brokers[2].client_session.message_cancel(destination="f1")
self._brokers[2].client_session.queue_delete(queue="fedX1")
- # @todo - restore code when QPID-2499 fixed!!
- sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1474,13 +1471,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message_trp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,
@@ -1714,18 +1710,16 @@ class FederationTests(TestBase010):
self._brokers[2].client_session.message_cancel(destination="f1")
self._brokers[2].client_session.queue_delete(queue="fedX1")
- # @todo - find a proper way to check the propagation here!
- sleep(6)
# wait for the binding count on B1 to drop from 2 to 1
- # retries = 0
- # exchanges[1].update()
- # while exchanges[1].bindingCount != 1:
- # retries += 1
- # self.failIfEqual(retries, 10,
- # "unbinding failed to propagate to broker B1: %d"
- # % exchanges[1].bindingCount)
- # sleep(1)
- # exchanges[1].update()
+ retries = 0
+ exchanges[1].update()
+ while exchanges[1].bindingCount != 1:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "unbinding failed to propagate to broker B1: %d"
+ % exchanges[1].bindingCount)
+ sleep(1)
+ exchanges[1].update()
# send 10 msgs from B0
for i in range(11, 21):
@@ -1733,13 +1727,12 @@ class FederationTests(TestBase010):
self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message_frp %d" % i))
# verify messages are forwarded to B3 only
- # note: why exchanges[1].msgRoutes == 40???, not 20??? QPID-2499?
retries = 0
for ex in exchanges:
ex.update()
while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or
- exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 40 or
- exchanges[2].msgReceives != 20 or exchanges[2].msgDrops != 10 or exchanges[2].msgRoutes != 10 or
+ exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or
+ exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or
exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20):
retries += 1
self.failIfEqual(retries, 10,