summaryrefslogtreecommitdiff
path: root/cpp/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker/src')
-rw-r--r--cpp/broker/src/DirectExchange.cpp4
-rw-r--r--cpp/broker/src/ExchangeRegistry.cpp14
-rw-r--r--cpp/broker/src/FanOutExchange.cpp2
-rw-r--r--cpp/broker/src/SessionHandlerFactoryImpl.cpp17
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp6
-rw-r--r--cpp/broker/src/TopicExchange.cpp134
6 files changed, 149 insertions, 28 deletions
diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp
index 70f7ee838f..ca29225bee 100644
--- a/cpp/broker/src/DirectExchange.cpp
+++ b/cpp/broker/src/DirectExchange.cpp
@@ -22,7 +22,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-DirectExchange::DirectExchange(const string& _name) : name(_name) {
+DirectExchange::DirectExchange(const string& name) : Exchange(name) {
}
@@ -59,7 +59,7 @@ void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, F
(*i)->deliver(msg);
}
if(!count){
- std::cout << "WARNING: DirectExchange " << name << " could not route message with key " << routingKey << std::endl;
+ std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
}
lock.release();
}
diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp
index 0ee581af2f..05396382a7 100644
--- a/cpp/broker/src/ExchangeRegistry.cpp
+++ b/cpp/broker/src/ExchangeRegistry.cpp
@@ -24,6 +24,10 @@ using namespace qpid::concurrent;
ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
ExchangeRegistry::~ExchangeRegistry(){
+ for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i)
+ {
+ delete i->second;
+ }
delete lock;
}
@@ -41,3 +45,13 @@ void ExchangeRegistry::destroy(const string& name){
Exchange* ExchangeRegistry::get(const string& name){
return exchanges[name];
}
+
+namespace
+{
+const std::string empty;
+}
+
+Exchange* ExchangeRegistry::getDefault()
+{
+ return get(empty);
+}
diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp
index 7f261d5eda..4eb75cb920 100644
--- a/cpp/broker/src/FanOutExchange.cpp
+++ b/cpp/broker/src/FanOutExchange.cpp
@@ -23,7 +23,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
-FanOutExchange::FanOutExchange(const string& _name) : name(_name) {}
+FanOutExchange::FanOutExchange(const std::string& name) : Exchange(name) {}
void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
Locker locker(lock);
diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
index 661cb4ef81..280e89c475 100644
--- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp
+++ b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
@@ -22,10 +22,19 @@
using namespace qpid::broker;
using namespace qpid::io;
+namespace
+{
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+}
+
SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
- exchanges.declare(new DirectExchange("amq.direct"));
- exchanges.declare(new TopicExchange("amq.topic"));
- exchanges.declare(new FanOutExchange("amq.fanout"));
+ exchanges.declare(new DirectExchange(empty)); // Default exchange.
+ exchanges.declare(new DirectExchange(amq_direct));
+ exchanges.declare(new TopicExchange(amq_topic));
+ exchanges.declare(new FanOutExchange(amq_fanout));
cleaner.start();
}
@@ -35,6 +44,4 @@ SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
cleaner.stop();
- exchanges.destroy("amq.direct");
- exchanges.destroy("amq.topic");
}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index a75b8fcf0f..872e6f124a 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -256,7 +256,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
if (queue_created.second) { // This is a new queue
parent->channels[channel]->setDefaultQueue(queue);
//add default binding:
- parent->exchanges->get("amq.direct")->bind(queue, name, 0);
+ parent->exchanges->getDefault()->bind(queue, name, 0);
if(exclusive){
parent->exclusiveQueues.push_back(queue);
} else if(autoDelete){
@@ -280,7 +280,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t tic
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
Exchange* exchange = parent->exchanges->get(exchangeName);
if(exchange){
- if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName();
+ if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
exchange->bind(queue, routingKey, &arguments);
if(!nowait) parent->client.getQueue().bindOk(channel);
}else{
@@ -361,7 +361,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
string& exchange, string& routingKey,
bool mandatory, bool immediate){
- Message* msg = new Message(parent, exchange.length() ? exchange : "amq.direct", routingKey, mandatory, immediate);
+ Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
parent->channels[channel]->handlePublish(msg);
}
diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp
index e0248958f9..287502bc88 100644
--- a/cpp/broker/src/TopicExchange.cpp
+++ b/cpp/broker/src/TopicExchange.cpp
@@ -17,46 +17,146 @@
*/
#include "TopicExchange.h"
#include "ExchangeBinding.h"
+#include <algorithm>
using namespace qpid::broker;
using namespace qpid::framing;
-TopicExchange::TopicExchange(const string& _name) : name(_name) {
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// Areas for improvement:
+// - excessive string copying: should be 0 copy, match from original buffer.
+// - match/lookup: use descision tree or other more efficient structure.
+
+Tokens& Tokens::operator=(const std::string& s) {
+ clear();
+ if (s.empty()) return *this;
+ std::string::const_iterator i = s.begin();
+ while (true) {
+ // Invariant: i is at the beginning of the next untokenized word.
+ std::string::const_iterator j = find(i, s.end(), '.');
+ push_back(std::string(i, j));
+ if (j == s.end()) return *this;
+ i = j + 1;
+ }
+ return *this;
+}
+
+size_t Tokens::Hash::operator()(const Tokens& p) const {
+ size_t hash = 0;
+ for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) {
+ hash += std::tr1::hash<std::string>()(*i);
+ }
+}
+
+TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
+ Tokens::operator=(tokens);
+ normalize();
+ return *this;
+}
+
+namespace {
+const std::string hashmark("#");
+const std::string star("*");
+}
+
+void TopicPattern::normalize() {
+ std::string word;
+ Tokens::iterator i = begin();
+ while (i != end()) {
+ if (*i == hashmark) {
+ ++i;
+ while (i != end()) {
+ // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
+ if (*i == star) { // Move * before #.
+ std::swap(*i, *(i-1));
+ ++i;
+ } else if (*i == hashmark) {
+ erase(i); // Remove extra #
+ } else {
+ break;
+ }
+ }
+ } else {
+ i ++;
+ }
+ }
+}
+
+
+namespace {
+// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// Need more efficient Tokens impl that can operate on a string in place.
+//
+bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
+{
+ // Invariant: [pattern_begin..p) matches [target_begin..t)
+ Tokens::const_iterator p = pattern_begin;
+ Tokens::const_iterator t = target_begin;
+ while (p != pattern_end && t != target_end)
+ {
+ if (*p == star || *p == *t) {
+ ++p, ++t;
+ } else if (*p == hashmark) {
+ ++p;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ while (t != target_end) {
+ ++t;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ }
+ return false;
+ } else {
+ return false;
+ }
+ }
+ while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
+ return t == target_end && p == pattern_end;
+}
+}
+
+bool TopicPattern::match(const Tokens& target) const
+{
+ return do_match(begin(), end(), target.begin(), target.end());
}
+TopicExchange::TopicExchange(const string& name) : Exchange(name) { }
+
void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
lock.acquire();
- bindings[routingKey].push_back(queue);
+ TopicPattern routingPattern(routingKey);
+ bindings[routingPattern].push_back(queue);
queue->bound(new ExchangeBinding(this, queue, routingKey, args));
lock.release();
}
void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
lock.acquire();
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
-
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i < queues.end()){
- queues.erase(i);
- if(queues.empty()){
- bindings.erase(routingKey);
- }
- }
+ BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
+ Queue::vector& qv(bi->second);
+ if (bi == bindings.end()) return;
+ Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+ if(q == qv.end()) return;
+ qv.erase(q);
+ if(qv.empty()) bindings.erase(bi);
lock.release();
}
+
void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
lock.acquire();
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++){
- (*i)->deliver(msg);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first.match(routingKey)) {
+ Queue::vector& qv(i->second);
+ for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ (*j)->deliver(msg);
+ }
+ }
}
lock.release();
}
-TopicExchange::~TopicExchange(){
-
-}
+TopicExchange::~TopicExchange() {}
const std::string TopicExchange::typeName("topic");
+
+