diff options
Diffstat (limited to 'cpp/broker/src/TopicExchange.cpp')
-rw-r--r-- | cpp/broker/src/TopicExchange.cpp | 134 |
1 files changed, 117 insertions, 17 deletions
diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp index e0248958f9..287502bc88 100644 --- a/cpp/broker/src/TopicExchange.cpp +++ b/cpp/broker/src/TopicExchange.cpp @@ -17,46 +17,146 @@ */ #include "TopicExchange.h" #include "ExchangeBinding.h" +#include <algorithm> using namespace qpid::broker; using namespace qpid::framing; -TopicExchange::TopicExchange(const string& _name) : name(_name) { +// TODO aconway 2006-09-20: More efficient matching algorithm. +// Areas for improvement: +// - excessive string copying: should be 0 copy, match from original buffer. +// - match/lookup: use descision tree or other more efficient structure. + +Tokens& Tokens::operator=(const std::string& s) { + clear(); + if (s.empty()) return *this; + std::string::const_iterator i = s.begin(); + while (true) { + // Invariant: i is at the beginning of the next untokenized word. + std::string::const_iterator j = find(i, s.end(), '.'); + push_back(std::string(i, j)); + if (j == s.end()) return *this; + i = j + 1; + } + return *this; +} + +size_t Tokens::Hash::operator()(const Tokens& p) const { + size_t hash = 0; + for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) { + hash += std::tr1::hash<std::string>()(*i); + } +} + +TopicPattern& TopicPattern::operator=(const Tokens& tokens) { + Tokens::operator=(tokens); + normalize(); + return *this; +} + +namespace { +const std::string hashmark("#"); +const std::string star("*"); +} + +void TopicPattern::normalize() { + std::string word; + Tokens::iterator i = begin(); + while (i != end()) { + if (*i == hashmark) { + ++i; + while (i != end()) { + // Invariant: *(i-1)==#, [begin()..i-1] is normalized. + if (*i == star) { // Move * before #. + std::swap(*i, *(i-1)); + ++i; + } else if (*i == hashmark) { + erase(i); // Remove extra # + } else { + break; + } + } + } else { + i ++; + } + } +} + + +namespace { +// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. +// Need more efficient Tokens impl that can operate on a string in place. +// +bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) +{ + // Invariant: [pattern_begin..p) matches [target_begin..t) + Tokens::const_iterator p = pattern_begin; + Tokens::const_iterator t = target_begin; + while (p != pattern_end && t != target_end) + { + if (*p == star || *p == *t) { + ++p, ++t; + } else if (*p == hashmark) { + ++p; + if (do_match(p, pattern_end, t, target_end)) return true; + while (t != target_end) { + ++t; + if (do_match(p, pattern_end, t, target_end)) return true; + } + return false; + } else { + return false; + } + } + while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # + return t == target_end && p == pattern_end; +} +} + +bool TopicPattern::match(const Tokens& target) const +{ + return do_match(begin(), end(), target.begin(), target.end()); } +TopicExchange::TopicExchange(const string& name) : Exchange(name) { } + void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ lock.acquire(); - bindings[routingKey].push_back(queue); + TopicPattern routingPattern(routingKey); + bindings[routingPattern].push_back(queue); queue->bound(new ExchangeBinding(this, queue, routingKey, args)); lock.release(); } void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i < queues.end()){ - queues.erase(i); - if(queues.empty()){ - bindings.erase(routingKey); - } - } + BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); + Queue::vector& qv(bi->second); + if (bi == bindings.end()) return; + Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); + if(q == qv.end()) return; + qv.erase(q); + if(qv.empty()) bindings.erase(bi); lock.release(); } + void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++){ - (*i)->deliver(msg); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (i->first.match(routingKey)) { + Queue::vector& qv(i->second); + for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ + (*j)->deliver(msg); + } + } } lock.release(); } -TopicExchange::~TopicExchange(){ - -} +TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic"); + + |