diff options
Diffstat (limited to 'cpp/broker/src')
-rw-r--r-- | cpp/broker/src/DirectExchange.cpp | 4 | ||||
-rw-r--r-- | cpp/broker/src/ExchangeRegistry.cpp | 14 | ||||
-rw-r--r-- | cpp/broker/src/FanOutExchange.cpp | 2 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerFactoryImpl.cpp | 17 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/broker/src/TopicExchange.cpp | 134 |
6 files changed, 149 insertions, 28 deletions
diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp index 70f7ee838f..ca29225bee 100644 --- a/cpp/broker/src/DirectExchange.cpp +++ b/cpp/broker/src/DirectExchange.cpp @@ -22,7 +22,7 @@ using namespace qpid::broker; using namespace qpid::framing; -DirectExchange::DirectExchange(const string& _name) : name(_name) { +DirectExchange::DirectExchange(const string& name) : Exchange(name) { } @@ -59,7 +59,7 @@ void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, F (*i)->deliver(msg); } if(!count){ - std::cout << "WARNING: DirectExchange " << name << " could not route message with key " << routingKey << std::endl; + std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; } lock.release(); } diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp index 0ee581af2f..05396382a7 100644 --- a/cpp/broker/src/ExchangeRegistry.cpp +++ b/cpp/broker/src/ExchangeRegistry.cpp @@ -24,6 +24,10 @@ using namespace qpid::concurrent; ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){} ExchangeRegistry::~ExchangeRegistry(){ + for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i) + { + delete i->second; + } delete lock; } @@ -41,3 +45,13 @@ void ExchangeRegistry::destroy(const string& name){ Exchange* ExchangeRegistry::get(const string& name){ return exchanges[name]; } + +namespace +{ +const std::string empty; +} + +Exchange* ExchangeRegistry::getDefault() +{ + return get(empty); +} diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp index 7f261d5eda..4eb75cb920 100644 --- a/cpp/broker/src/FanOutExchange.cpp +++ b/cpp/broker/src/FanOutExchange.cpp @@ -23,7 +23,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::concurrent; -FanOutExchange::FanOutExchange(const string& _name) : name(_name) {} +FanOutExchange::FanOutExchange(const std::string& name) : Exchange(name) {} void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ Locker locker(lock); diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp index 661cb4ef81..280e89c475 100644 --- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp +++ b/cpp/broker/src/SessionHandlerFactoryImpl.cpp @@ -22,10 +22,19 @@ using namespace qpid::broker; using namespace qpid::io; +namespace +{ +const std::string empty; +const std::string amq_direct("amq.direct"); +const std::string amq_topic("amq.topic"); +const std::string amq_fanout("amq.fanout"); +} + SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){ - exchanges.declare(new DirectExchange("amq.direct")); - exchanges.declare(new TopicExchange("amq.topic")); - exchanges.declare(new FanOutExchange("amq.fanout")); + exchanges.declare(new DirectExchange(empty)); // Default exchange. + exchanges.declare(new DirectExchange(amq_direct)); + exchanges.declare(new TopicExchange(amq_topic)); + exchanges.declare(new FanOutExchange(amq_fanout)); cleaner.start(); } @@ -35,6 +44,4 @@ SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){ SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){ cleaner.stop(); - exchanges.destroy("amq.direct"); - exchanges.destroy("amq.topic"); } diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index a75b8fcf0f..872e6f124a 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -256,7 +256,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t if (queue_created.second) { // This is a new queue parent->channels[channel]->setDefaultQueue(queue); //add default binding: - parent->exchanges->get("amq.direct")->bind(queue, name, 0); + parent->exchanges->getDefault()->bind(queue, name, 0); if(exclusive){ parent->exclusiveQueues.push_back(queue); } else if(autoDelete){ @@ -280,7 +280,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t tic Queue::shared_ptr queue = parent->getQueue(queueName, channel); Exchange* exchange = parent->exchanges->get(exchangeName); if(exchange){ - if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName(); + if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); exchange->bind(queue, routingKey, &arguments); if(!nowait) parent->client.getQueue().bindOk(channel); }else{ @@ -361,7 +361,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t string& exchange, string& routingKey, bool mandatory, bool immediate){ - Message* msg = new Message(parent, exchange.length() ? exchange : "amq.direct", routingKey, mandatory, immediate); + Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); parent->channels[channel]->handlePublish(msg); } diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp index e0248958f9..287502bc88 100644 --- a/cpp/broker/src/TopicExchange.cpp +++ b/cpp/broker/src/TopicExchange.cpp @@ -17,46 +17,146 @@ */ #include "TopicExchange.h" #include "ExchangeBinding.h" +#include <algorithm> using namespace qpid::broker; using namespace qpid::framing; -TopicExchange::TopicExchange(const string& _name) : name(_name) { +// TODO aconway 2006-09-20: More efficient matching algorithm. +// Areas for improvement: +// - excessive string copying: should be 0 copy, match from original buffer. +// - match/lookup: use descision tree or other more efficient structure. + +Tokens& Tokens::operator=(const std::string& s) { + clear(); + if (s.empty()) return *this; + std::string::const_iterator i = s.begin(); + while (true) { + // Invariant: i is at the beginning of the next untokenized word. + std::string::const_iterator j = find(i, s.end(), '.'); + push_back(std::string(i, j)); + if (j == s.end()) return *this; + i = j + 1; + } + return *this; +} + +size_t Tokens::Hash::operator()(const Tokens& p) const { + size_t hash = 0; + for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) { + hash += std::tr1::hash<std::string>()(*i); + } +} + +TopicPattern& TopicPattern::operator=(const Tokens& tokens) { + Tokens::operator=(tokens); + normalize(); + return *this; +} + +namespace { +const std::string hashmark("#"); +const std::string star("*"); +} + +void TopicPattern::normalize() { + std::string word; + Tokens::iterator i = begin(); + while (i != end()) { + if (*i == hashmark) { + ++i; + while (i != end()) { + // Invariant: *(i-1)==#, [begin()..i-1] is normalized. + if (*i == star) { // Move * before #. + std::swap(*i, *(i-1)); + ++i; + } else if (*i == hashmark) { + erase(i); // Remove extra # + } else { + break; + } + } + } else { + i ++; + } + } +} + + +namespace { +// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. +// Need more efficient Tokens impl that can operate on a string in place. +// +bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) +{ + // Invariant: [pattern_begin..p) matches [target_begin..t) + Tokens::const_iterator p = pattern_begin; + Tokens::const_iterator t = target_begin; + while (p != pattern_end && t != target_end) + { + if (*p == star || *p == *t) { + ++p, ++t; + } else if (*p == hashmark) { + ++p; + if (do_match(p, pattern_end, t, target_end)) return true; + while (t != target_end) { + ++t; + if (do_match(p, pattern_end, t, target_end)) return true; + } + return false; + } else { + return false; + } + } + while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # + return t == target_end && p == pattern_end; +} +} + +bool TopicPattern::match(const Tokens& target) const +{ + return do_match(begin(), end(), target.begin(), target.end()); } +TopicExchange::TopicExchange(const string& name) : Exchange(name) { } + void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ lock.acquire(); - bindings[routingKey].push_back(queue); + TopicPattern routingPattern(routingKey); + bindings[routingPattern].push_back(queue); queue->bound(new ExchangeBinding(this, queue, routingKey, args)); lock.release(); } void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i < queues.end()){ - queues.erase(i); - if(queues.empty()){ - bindings.erase(routingKey); - } - } + BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); + Queue::vector& qv(bi->second); + if (bi == bindings.end()) return; + Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); + if(q == qv.end()) return; + qv.erase(q); + if(qv.empty()) bindings.erase(bi); lock.release(); } + void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++){ - (*i)->deliver(msg); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (i->first.match(routingKey)) { + Queue::vector& qv(i->second); + for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ + (*j)->deliver(msg); + } + } } lock.release(); } -TopicExchange::~TopicExchange(){ - -} +TopicExchange::~TopicExchange() {} const std::string TopicExchange::typeName("topic"); + + |