/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "TopicExchange.h" #include using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; namespace _qmf = qmf::org::apache::qpid::broker; // TODO aconway 2006-09-20: More efficient matching algorithm. // Areas for improvement: // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. namespace { const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); const std::string qpidFedOrigin("qpid.fed.origin"); const std::string fedOpBind("B"); const std::string fedOpUnbind("U"); const std::string fedOpReorigin("R"); const std::string fedOpHello("H"); } Tokens& Tokens::operator=(const std::string& s) { clear(); if (s.empty()) return *this; std::string::const_iterator i = s.begin(); while (true) { // Invariant: i is at the beginning of the next untokenized word. std::string::const_iterator j = std::find(i, s.end(), '.'); push_back(std::string(i, j)); if (j == s.end()) return *this; i = j + 1; } return *this; } TopicPattern& TopicPattern::operator=(const Tokens& tokens) { Tokens::operator=(tokens); normalize(); return *this; } void Tokens::key(string& keytext) const { for (std::vector::const_iterator iter = begin(); iter != end(); iter++) { if (iter != begin()) keytext += "."; keytext += *iter; } } namespace { const std::string hashmark("#"); const std::string star("*"); } void TopicPattern::normalize() { std::string word; Tokens::iterator i = begin(); while (i != end()) { if (*i == hashmark) { ++i; while (i != end()) { // Invariant: *(i-1)==#, [begin()..i-1] is normalized. if (*i == star) { // Move * before #. std::swap(*i, *(i-1)); ++i; } else if (*i == hashmark) { erase(i); // Remove extra # } else { break; } } } else { i ++; } } } namespace { // TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string. // Need StringRef class that operates on a string in place witout copy. // Should be applied everywhere strings are extracted from frames. // bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) { // Invariant: [pattern_begin..p) matches [target_begin..t) Tokens::const_iterator p = pattern_begin; Tokens::const_iterator t = target_begin; while (p != pattern_end && t != target_end) { if (*p == star || *p == *t) { ++p, ++t; } else if (*p == hashmark) { ++p; if (do_match(p, pattern_end, t, target_end)) return true; while (t != target_end) { ++t; if (do_match(p, pattern_end, t, target_end)) return true; } return false; } else { return false; } } while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # return t == target_end && p == pattern_end; } } bool TopicPattern::match(const Tokens& target) const { return do_match(begin(), end(), target.begin(), target.end()); } TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent) : Exchange(_name, _durable, _args, _parent) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) { string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); string fedTags(args ? args->getAsString(qpidFedTags) : ""); string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); bool propagate = false; bool reallyUnbind; TopicPattern routingPattern(routingKey); if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { RWlock::ScopedWlock l(lock); if (isBound(queue, routingPattern)) { return false; } else { Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin)); BoundKey& bk = bindings[routingPattern]; bk.bindingVector.push_back(binding); propagate = bk.fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); } } } else if (fedOp == fedOpUnbind) { { RWlock::ScopedWlock l(lock); BoundKey& bk = bindings[routingPattern]; propagate = bk.fedBinding.delOrigin(fedOrigin); reallyUnbind = bk.fedBinding.count() == 0; } if (reallyUnbind) unbind(queue, routingKey, 0); } else if (fedOp == fedOpReorigin) { for (std::map::iterator iter = bindings.begin(); iter != bindings.end(); iter++) { const BoundKey& bk = iter->second; if (bk.fedBinding.hasLocal()) { string propKey; iter->first.key(propKey); propagateFedOp(propKey, string(), fedOpBind, string()); } } } routeIVE(); if (propagate) propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); return true; } bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedWlock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); if (bi == bindings.end()) return false; BoundKey& bk = bi->second; Binding::vector& qv(bk.bindingVector); bool propagate = false; Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) break; if(q == qv.end()) return false; qv.erase(q); propagate = bk.fedBinding.delOrigin(); if(qv.empty()) bindings.erase(bi); if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); } if (propagate) propagateFedOp(routingKey, string(), fedOpUnbind, string()); return true; } bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) { BindingMap::iterator bi = bindings.find(pattern); if (bi == bindings.end()) return false; Binding::vector& qv(bi->second.bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) break; return q != qv.end(); } void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ Binding::vector mb; PreRoute pr(msg, this); uint32_t count(0); { RWlock::ScopedRlock l(lock); Tokens tokens(routingKey); for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(tokens)) { Binding::vector& qv(i->second.bindingVector); for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ mb.push_back(*j); } } } } for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) { msg.deliverTo((*j)->queue); if ((*j)->mgmtBinding != 0) (*j)->mgmtBinding->inc_msgMatched (); } if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives (); mgmtExchange->inc_byteReceives (msg.contentSize ()); if (count == 0) { mgmtExchange->inc_msgDrops (); mgmtExchange->inc_byteDrops (msg.contentSize ()); } else { mgmtExchange->inc_msgRoutes (count); mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); } } } bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) { if (routingKey && queue) { TopicPattern key(*routingKey); return isBound(queue, key); } else if (!routingKey && !queue) { return bindings.size() > 0; } else if (routingKey) { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(*routingKey)) { return true; } } } else { for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { Binding::vector& qv(i->second.bindingVector); Binding::vector::iterator q; for (q = qv.begin(); q != qv.end(); q++) if ((*q)->queue == queue) return true; } } return false; } TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic");