summaryrefslogtreecommitdiff
path: root/cpp/broker/inc
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker/inc')
-rw-r--r--cpp/broker/inc/DirectExchange.h11
-rw-r--r--cpp/broker/inc/Exchange.h8
-rw-r--r--cpp/broker/inc/ExchangeRegistry.h4
-rw-r--r--cpp/broker/inc/FanOutExchange.h11
-rw-r--r--cpp/broker/inc/TopicExchange.h71
5 files changed, 71 insertions, 34 deletions
diff --git a/cpp/broker/inc/DirectExchange.h b/cpp/broker/inc/DirectExchange.h
index bf8c5f0b37..faf5a0b949 100644
--- a/cpp/broker/inc/DirectExchange.h
+++ b/cpp/broker/inc/DirectExchange.h
@@ -29,22 +29,19 @@
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
- const string name;
std::map<string, std::vector<Queue::shared_ptr> > bindings;
qpid::concurrent::MonitorImpl lock;
public:
static const std::string typeName;
- DirectExchange(const string& name);
+ DirectExchange(const std::string& name);
- inline virtual const string& getName(){ return name; }
-
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
virtual ~DirectExchange();
};
diff --git a/cpp/broker/inc/Exchange.h b/cpp/broker/inc/Exchange.h
index 5f5dc5ce71..4066f5ac20 100644
--- a/cpp/broker/inc/Exchange.h
+++ b/cpp/broker/inc/Exchange.h
@@ -25,12 +25,14 @@
namespace qpid {
namespace broker {
class Exchange{
- public:
- virtual const string& getName() = 0;
+ const std::string name;
+ public:
+ explicit Exchange(const std::string& name) : name(name) {}
+ virtual ~Exchange(){}
+ std::string getName() { return name; }
virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual ~Exchange(){}
};
}
}
diff --git a/cpp/broker/inc/ExchangeRegistry.h b/cpp/broker/inc/ExchangeRegistry.h
index 0f0eaae0d0..a4a778482c 100644
--- a/cpp/broker/inc/ExchangeRegistry.h
+++ b/cpp/broker/inc/ExchangeRegistry.h
@@ -25,13 +25,15 @@
namespace qpid {
namespace broker {
class ExchangeRegistry{
- std::map<string, Exchange*> exchanges;
+ typedef std::map<string, Exchange*> ExchangeMap;
+ ExchangeMap exchanges;
qpid::concurrent::Monitor* lock;
public:
ExchangeRegistry();
void declare(Exchange* exchange);
void destroy(const string& name);
Exchange* get(const string& name);
+ Exchange* getDefault();
inline qpid::concurrent::Monitor* getLock(){ return lock; }
~ExchangeRegistry();
};
diff --git a/cpp/broker/inc/FanOutExchange.h b/cpp/broker/inc/FanOutExchange.h
index 9d0d32bbf8..1932e8429c 100644
--- a/cpp/broker/inc/FanOutExchange.h
+++ b/cpp/broker/inc/FanOutExchange.h
@@ -30,22 +30,19 @@ namespace qpid {
namespace broker {
class FanOutExchange : public virtual Exchange {
- const string name;
std::vector<Queue::shared_ptr> bindings;
qpid::concurrent::MonitorImpl lock;
public:
static const std::string typeName;
- FanOutExchange(const string& name);
+ FanOutExchange(const std::string& name);
- inline virtual const string& getName(){ return name; }
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
virtual ~FanOutExchange();
};
diff --git a/cpp/broker/inc/TopicExchange.h b/cpp/broker/inc/TopicExchange.h
index d9ff62ecc6..68a4026ee7 100644
--- a/cpp/broker/inc/TopicExchange.h
+++ b/cpp/broker/inc/TopicExchange.h
@@ -18,7 +18,7 @@
#ifndef _TopicExchange_
#define _TopicExchange_
-#include <map>
+#include <tr1/unordered_map>
#include <vector>
#include "Exchange.h"
#include "FieldTable.h"
@@ -28,28 +28,67 @@
namespace qpid {
namespace broker {
- class TopicExchange : public virtual Exchange{
- const string name;
- std::map<string, std::vector<Queue::shared_ptr> > bindings;//NOTE: pattern matching not yet supported
- qpid::concurrent::MonitorImpl lock;
- public:
- static const std::string typeName;
-
- TopicExchange(const string& name);
+/** A vector of string tokens */
+class Tokens : public std::vector<std::string> {
+ public:
+ Tokens() {};
+ // Default copy, assign, dtor are sufficient.
+
+ /** Tokenize s, provides automatic conversion of string to Tokens */
+ Tokens(const std::string& s) { operator=(s); }
+ /** Tokenize s */
+ Tokens & operator=(const std::string& s);
+
+ struct Hash { size_t operator()(const Tokens&) const; };
+ typedef std::equal_to<Tokens> Equal;
+};
+
+/**
+ * Tokens that have been normalized as a pattern and can be matched
+ * with topic Tokens. Normalized meands all sequences of mixed * and
+ * # are reduced to a series of * followed by at most one #.
+ */
+class TopicPattern : public Tokens
+{
+ public:
+ TopicPattern() {}
+ // Default copy, assign, dtor are sufficient.
+ TopicPattern(const Tokens& tokens) { operator=(tokens); }
+ TopicPattern(const std::string& str) { operator=(str); }
+ TopicPattern& operator=(const Tokens&);
+ TopicPattern& operator=(const std::string& str) { operator=(Tokens(str)); }
+
+ /** Match a topic */
+ bool match(const std::string& topic) { return match(Tokens(topic)); }
+ bool match(const Tokens& topic) const;
+
+ private:
+ void normalize();
+};
+
+class TopicExchange : public virtual Exchange{
+ typedef std::tr1::unordered_map<TopicPattern, Queue::vector, TopicPattern::Hash> BindingMap;
+ BindingMap bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ TopicExchange(const string& name);
- inline virtual const string& getName(){ return name; }
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~TopicExchange();
+};
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
- virtual ~TopicExchange();
- };
}
}
-
#endif