diff options
Diffstat (limited to 'cpp/broker/inc')
-rw-r--r-- | cpp/broker/inc/DirectExchange.h | 11 | ||||
-rw-r--r-- | cpp/broker/inc/Exchange.h | 8 | ||||
-rw-r--r-- | cpp/broker/inc/ExchangeRegistry.h | 4 | ||||
-rw-r--r-- | cpp/broker/inc/FanOutExchange.h | 11 | ||||
-rw-r--r-- | cpp/broker/inc/TopicExchange.h | 71 |
5 files changed, 71 insertions, 34 deletions
diff --git a/cpp/broker/inc/DirectExchange.h b/cpp/broker/inc/DirectExchange.h index bf8c5f0b37..faf5a0b949 100644 --- a/cpp/broker/inc/DirectExchange.h +++ b/cpp/broker/inc/DirectExchange.h @@ -29,22 +29,19 @@ namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ - const string name; std::map<string, std::vector<Queue::shared_ptr> > bindings; qpid::concurrent::MonitorImpl lock; public: static const std::string typeName; - DirectExchange(const string& name); + DirectExchange(const std::string& name); - inline virtual const string& getName(){ return name; } - - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args); virtual ~DirectExchange(); }; diff --git a/cpp/broker/inc/Exchange.h b/cpp/broker/inc/Exchange.h index 5f5dc5ce71..4066f5ac20 100644 --- a/cpp/broker/inc/Exchange.h +++ b/cpp/broker/inc/Exchange.h @@ -25,12 +25,14 @@ namespace qpid { namespace broker { class Exchange{ - public: - virtual const string& getName() = 0; + const std::string name; + public: + explicit Exchange(const std::string& name) : name(name) {} + virtual ~Exchange(){} + std::string getName() { return name; } virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0; - virtual ~Exchange(){} }; } } diff --git a/cpp/broker/inc/ExchangeRegistry.h b/cpp/broker/inc/ExchangeRegistry.h index 0f0eaae0d0..a4a778482c 100644 --- a/cpp/broker/inc/ExchangeRegistry.h +++ b/cpp/broker/inc/ExchangeRegistry.h @@ -25,13 +25,15 @@ namespace qpid { namespace broker { class ExchangeRegistry{ - std::map<string, Exchange*> exchanges; + typedef std::map<string, Exchange*> ExchangeMap; + ExchangeMap exchanges; qpid::concurrent::Monitor* lock; public: ExchangeRegistry(); void declare(Exchange* exchange); void destroy(const string& name); Exchange* get(const string& name); + Exchange* getDefault(); inline qpid::concurrent::Monitor* getLock(){ return lock; } ~ExchangeRegistry(); }; diff --git a/cpp/broker/inc/FanOutExchange.h b/cpp/broker/inc/FanOutExchange.h index 9d0d32bbf8..1932e8429c 100644 --- a/cpp/broker/inc/FanOutExchange.h +++ b/cpp/broker/inc/FanOutExchange.h @@ -30,22 +30,19 @@ namespace qpid { namespace broker { class FanOutExchange : public virtual Exchange { - const string name; std::vector<Queue::shared_ptr> bindings; qpid::concurrent::MonitorImpl lock; public: static const std::string typeName; - FanOutExchange(const string& name); + FanOutExchange(const std::string& name); - inline virtual const string& getName(){ return name; } + virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); + virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args); virtual ~FanOutExchange(); }; diff --git a/cpp/broker/inc/TopicExchange.h b/cpp/broker/inc/TopicExchange.h index d9ff62ecc6..68a4026ee7 100644 --- a/cpp/broker/inc/TopicExchange.h +++ b/cpp/broker/inc/TopicExchange.h @@ -18,7 +18,7 @@ #ifndef _TopicExchange_ #define _TopicExchange_ -#include <map> +#include <tr1/unordered_map> #include <vector> #include "Exchange.h" #include "FieldTable.h" @@ -28,28 +28,67 @@ namespace qpid { namespace broker { - class TopicExchange : public virtual Exchange{ - const string name; - std::map<string, std::vector<Queue::shared_ptr> > bindings;//NOTE: pattern matching not yet supported - qpid::concurrent::MonitorImpl lock; - public: - static const std::string typeName; - - TopicExchange(const string& name); +/** A vector of string tokens */ +class Tokens : public std::vector<std::string> { + public: + Tokens() {}; + // Default copy, assign, dtor are sufficient. + + /** Tokenize s, provides automatic conversion of string to Tokens */ + Tokens(const std::string& s) { operator=(s); } + /** Tokenize s */ + Tokens & operator=(const std::string& s); + + struct Hash { size_t operator()(const Tokens&) const; }; + typedef std::equal_to<Tokens> Equal; +}; + +/** + * Tokens that have been normalized as a pattern and can be matched + * with topic Tokens. Normalized meands all sequences of mixed * and + * # are reduced to a series of * followed by at most one #. + */ +class TopicPattern : public Tokens +{ + public: + TopicPattern() {} + // Default copy, assign, dtor are sufficient. + TopicPattern(const Tokens& tokens) { operator=(tokens); } + TopicPattern(const std::string& str) { operator=(str); } + TopicPattern& operator=(const Tokens&); + TopicPattern& operator=(const std::string& str) { operator=(Tokens(str)); } + + /** Match a topic */ + bool match(const std::string& topic) { return match(Tokens(topic)); } + bool match(const Tokens& topic) const; + + private: + void normalize(); +}; + +class TopicExchange : public virtual Exchange{ + typedef std::tr1::unordered_map<TopicPattern, Queue::vector, TopicPattern::Hash> BindingMap; + BindingMap bindings; + qpid::concurrent::MonitorImpl lock; + + public: + static const std::string typeName; + + TopicExchange(const string& name); - inline virtual const string& getName(){ return name; } + virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + + virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); + + virtual ~TopicExchange(); +}; - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); - virtual ~TopicExchange(); - }; } } - #endif |