/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include "TopicExchange.h" #include "ExchangeBinding.h" #include using namespace qpid::broker; using namespace qpid::framing; // TODO aconway 2006-09-20: More efficient matching algorithm. // Areas for improvement: // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. Tokens& Tokens::operator=(const std::string& s) { clear(); if (s.empty()) return *this; std::string::const_iterator i = s.begin(); while (true) { // Invariant: i is at the beginning of the next untokenized word. std::string::const_iterator j = find(i, s.end(), '.'); push_back(std::string(i, j)); if (j == s.end()) return *this; i = j + 1; } return *this; } size_t Tokens::Hash::operator()(const Tokens& p) const { size_t hash = 0; for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) { hash += std::tr1::hash()(*i); } } TopicPattern& TopicPattern::operator=(const Tokens& tokens) { Tokens::operator=(tokens); normalize(); return *this; } namespace { const std::string hashmark("#"); const std::string star("*"); } void TopicPattern::normalize() { std::string word; Tokens::iterator i = begin(); while (i != end()) { if (*i == hashmark) { ++i; while (i != end()) { // Invariant: *(i-1)==#, [begin()..i-1] is normalized. if (*i == star) { // Move * before #. std::swap(*i, *(i-1)); ++i; } else if (*i == hashmark) { erase(i); // Remove extra # } else { break; } } } else { i ++; } } } namespace { // TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. // Need more efficient Tokens impl that can operate on a string in place. // bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) { // Invariant: [pattern_begin..p) matches [target_begin..t) Tokens::const_iterator p = pattern_begin; Tokens::const_iterator t = target_begin; while (p != pattern_end && t != target_end) { if (*p == star || *p == *t) { ++p, ++t; } else if (*p == hashmark) { ++p; if (do_match(p, pattern_end, t, target_end)) return true; while (t != target_end) { ++t; if (do_match(p, pattern_end, t, target_end)) return true; } return false; } else { return false; } } while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # return t == target_end && p == pattern_end; } } bool TopicPattern::match(const Tokens& target) const { return do_match(begin(), end(), target.begin(), target.end()); } TopicExchange::TopicExchange(const string& name) : Exchange(name) { } void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ lock.acquire(); TopicPattern routingPattern(routingKey); bindings[routingPattern].push_back(queue); queue->bound(new ExchangeBinding(this, queue, routingKey, args)); lock.release(); } void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ lock.acquire(); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); Queue::vector& qv(bi->second); if (bi == bindings.end()) return; Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); if(q == qv.end()) return; qv.erase(q); if(qv.empty()) bindings.erase(bi); lock.release(); } void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ lock.acquire(); for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(routingKey)) { Queue::vector& qv(i->second); for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ (*j)->deliver(msg); } } } lock.release(); } TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic");