summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/TopicExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp78
1 files changed, 68 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 691b42a1ae..853c131571 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -31,6 +31,18 @@ namespace _qmf = qmf::org::apache::qpid::broker;
// - excessive string copying: should be 0 copy, match from original buffer.
// - match/lookup: use descision tree or other more efficient structure.
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
Tokens& Tokens::operator=(const std::string& s) {
clear();
if (s.empty()) return *this;
@@ -51,6 +63,15 @@ TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
return *this;
}
+void Tokens::key(string& keytext) const
+{
+ for (std::vector<string>::const_iterator iter = begin(); iter != end(); iter++) {
+ if (iter != begin())
+ keytext += ".";
+ keytext += *iter;
+ }
+}
+
namespace {
const std::string hashmark("#");
const std::string star("*");
@@ -81,7 +102,7 @@ void TopicPattern::normalize() {
namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string.
// Need StringRef class that operates on a string in place witout copy.
// Should be applied everywhere strings are extracted from frames.
//
@@ -130,30 +151,63 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable,
mgmtExchange->set_type (typeName);
}
-bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- {
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+ string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+ string fedTags(args ? args->getAsString(qpidFedTags) : "");
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ bool propagate = false;
+ bool reallyUnbind;
+ TopicPattern routingPattern(routingKey);
+
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
RWlock::ScopedWlock l(lock);
- TopicPattern routingPattern(routingKey);
if (isBound(queue, routingPattern)) {
return false;
} else {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingPattern].push_back(binding);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin));
+ BoundKey& bk = bindings[routingPattern];
+ bk.bindingVector.push_back(binding);
+ propagate = bk.fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
}
+ } else if (fedOp == fedOpUnbind) {
+ {
+ RWlock::ScopedWlock l(lock);
+ BoundKey& bk = bindings[routingPattern];
+ propagate = bk.fedBinding.delOrigin(fedOrigin);
+ reallyUnbind = bk.fedBinding.count() == 0;
+ }
+ if (reallyUnbind)
+ unbind(queue, routingKey, 0);
+ } else if (fedOp == fedOpReorigin) {
+ for (std::map<TopicPattern, BoundKey>::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+ if (bk.fedBinding.hasLocal()) {
+ string propKey;
+ iter->first.key(propKey);
+ propagateFedOp(propKey, string(), fedOpBind, string());
+ }
+ }
}
+
routeIVE();
+ if (propagate)
+ propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
return true;
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Binding::vector& qv(bi->second);
if (bi == bindings.end()) return false;
+ BoundKey& bk = bi->second;
+ Binding::vector& qv(bk.bindingVector);
+ bool propagate = false;
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
@@ -161,11 +215,15 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co
break;
if(q == qv.end()) return false;
qv.erase(q);
+ propagate = bk.fedBinding.delOrigin();
if(qv.empty()) bindings.erase(bi);
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
}
+
+ if (propagate)
+ propagateFedOp(routingKey, string(), fedOpUnbind, string());
return true;
}
@@ -173,7 +231,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
{
BindingMap::iterator bi = bindings.find(pattern);
if (bi == bindings.end()) return false;
- Binding::vector& qv(bi->second);
+ Binding::vector& qv(bi->second.bindingVector);
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)
@@ -189,7 +247,7 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(tokens)) {
- Binding::vector& qv(i->second);
+ Binding::vector& qv(i->second.bindingVector);
for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
msg.deliverTo((*j)->queue);
if ((*j)->mgmtBinding != 0)
@@ -230,7 +288,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing
}
} else {
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- Binding::vector& qv(i->second);
+ Binding::vector& qv(i->second.bindingVector);
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)