diff options
Diffstat (limited to 'cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 78 |
1 files changed, 68 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 691b42a1ae..853c131571 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -31,6 +31,18 @@ namespace _qmf = qmf::org::apache::qpid::broker; // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. +namespace +{ +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); +} + Tokens& Tokens::operator=(const std::string& s) { clear(); if (s.empty()) return *this; @@ -51,6 +63,15 @@ TopicPattern& TopicPattern::operator=(const Tokens& tokens) { return *this; } +void Tokens::key(string& keytext) const +{ + for (std::vector<string>::const_iterator iter = begin(); iter != end(); iter++) { + if (iter != begin()) + keytext += "."; + keytext += *iter; + } +} + namespace { const std::string hashmark("#"); const std::string star("*"); @@ -81,7 +102,7 @@ void TopicPattern::normalize() { namespace { -// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. +// TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string. // Need StringRef class that operates on a string in place witout copy. // Should be applied everywhere strings are extracted from frames. // @@ -130,30 +151,63 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - { +bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) +{ + string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); + string fedTags(args ? args->getAsString(qpidFedTags) : ""); + string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); + bool propagate = false; + bool reallyUnbind; + TopicPattern routingPattern(routingKey); + + if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { RWlock::ScopedWlock l(lock); - TopicPattern routingPattern(routingKey); if (isBound(queue, routingPattern)) { return false; } else { - Binding::shared_ptr binding (new Binding (routingKey, queue, this)); - bindings[routingPattern].push_back(binding); + Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin)); + BoundKey& bk = bindings[routingPattern]; + bk.bindingVector.push_back(binding); + propagate = bk.fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); } } + } else if (fedOp == fedOpUnbind) { + { + RWlock::ScopedWlock l(lock); + BoundKey& bk = bindings[routingPattern]; + propagate = bk.fedBinding.delOrigin(fedOrigin); + reallyUnbind = bk.fedBinding.count() == 0; + } + if (reallyUnbind) + unbind(queue, routingKey, 0); + } else if (fedOp == fedOpReorigin) { + for (std::map<TopicPattern, BoundKey>::iterator iter = bindings.begin(); + iter != bindings.end(); iter++) { + const BoundKey& bk = iter->second; + if (bk.fedBinding.hasLocal()) { + string propKey; + iter->first.key(propKey); + propagateFedOp(propKey, string(), fedOpBind, string()); + } + } } + routeIVE(); + if (propagate) + propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); return true; } bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); - Binding::vector& qv(bi->second); if (bi == bindings.end()) return false; + BoundKey& bk = bi->second; + Binding::vector& qv(bk.bindingVector); + bool propagate = false; Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) @@ -161,11 +215,15 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co break; if(q == qv.end()) return false; qv.erase(q); + propagate = bk.fedBinding.delOrigin(); if(qv.empty()) bindings.erase(bi); if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); } + + if (propagate) + propagateFedOp(routingKey, string(), fedOpUnbind, string()); return true; } @@ -173,7 +231,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) { BindingMap::iterator bi = bindings.find(pattern); if (bi == bindings.end()) return false; - Binding::vector& qv(bi->second); + Binding::vector& qv(bi->second.bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) @@ -189,7 +247,7 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(tokens)) { - Binding::vector& qv(i->second); + Binding::vector& qv(i->second.bindingVector); for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ msg.deliverTo((*j)->queue); if ((*j)->mgmtBinding != 0) @@ -230,7 +288,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing } } else { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - Binding::vector& qv(i->second); + Binding::vector& qv(i->second.bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) |