summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-10-24 00:12:47 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-10-24 00:12:47 +0000
commit33994618c072a0922a8db76b82ada44551c4946b (patch)
tree7a1c51684739b2b45d69e4459689a0adff7ec417
parent837350ce104f345861d1241ddf3ecd04c19b0b5c (diff)
downloadqpid-python-33994618c072a0922a8db76b82ada44551c4946b.tar.gz
QPID-2897: modify TopicExchange for better performance with respect to large number of bindings.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1026715 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp556
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h121
-rw-r--r--cpp/src/tests/TopicExchangeTest.cpp359
3 files changed, 867 insertions, 169 deletions
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 3f70f17ea4..6bc42e28bf 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -38,12 +38,82 @@ namespace _qmf = qmf::org::apache::qpid::broker;
// - excessive string copying: should be 0 copy, match from original buffer.
// - match/lookup: use descision tree or other more efficient structure.
-namespace {
+namespace
+{
+const std::string STAR("*");
+const std::string HASH("#");
+}
+
+// iterator for federation ReOrigin bind operation
+class TopicExchange::ReOriginIter : public TopicExchange::BindingNode::TreeIterator {
+public:
+ ReOriginIter() {};
+ ~ReOriginIter() {};
+ bool visit(BindingNode& node) {
+ if (node.bindings.fedBinding.hasLocal()) {
+ keys2prop.push_back(node.routePattern);
+ }
+ return true;
+ }
+ std::vector<std::string> keys2prop;
+};
+
+
+// match iterator used by route(): builds BindingList of all unique queues
+// that match the routing key.
+class TopicExchange::BindingsFinderIter : public TopicExchange::BindingNode::TreeIterator {
+public:
+ BindingsFinderIter(BindingList &bl) : b(bl) {};
+ ~BindingsFinderIter() {};
+
+ BindingList& b;
+ std::set<std::string> qSet;
+
+ bool visit(BindingNode& node) {
+
+ Binding::vector& qv(node.bindings.bindingVector);
+ for (Binding::vector::iterator j = qv.begin(); j != qv.end(); j++) {
+ // do not duplicate queues on the binding list
+ if (qSet.insert(j->get()->queue->getName()).second) {
+ b->push_back(*j);
+ }
+ }
+ return true;
+ }
+};
+
+
+
+// Iterator to visit all bindings until a given queue is found
+class TopicExchange::QueueFinderIter : public TopicExchange::BindingNode::TreeIterator {
+public:
+ QueueFinderIter(Queue::shared_ptr queue) : queue(queue), found(false) {};
+ ~QueueFinderIter() {};
+ bool visit(BindingNode& node) {
+
+ Binding::vector& qv(node.bindings.bindingVector);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++) {
+ if ((*q)->queue == queue) {
+ found = true;
+ return false; // search done
+ }
+ }
+ return true; // continue search
+ }
+
+ Queue::shared_ptr queue;
+ bool found;
+};
+
+
// Iterate over a string of '.'-separated tokens.
-struct TokenIterator {
+struct TopicExchange::TokenIterator {
typedef pair<const char*,const char*> Token;
- TokenIterator(const char* b, const char* e) : token(make_pair(b, find(b,e,'.'))), end(e) {}
+ TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {}
+
+ TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {}
bool finished() const { return !token.first; }
@@ -56,23 +126,39 @@ struct TokenIterator {
}
}
+ void pop(string &top) {
+ ptrdiff_t l = len();
+ if (l) {
+ top.assign(token.first, l);
+ } else top.clear();
+ next();
+ }
+
bool match1(char c) const {
return token.second==token.first+1 && *token.first == c;
}
- bool match(const Token& token2) {
+ bool match(const Token& token2) const {
ptrdiff_t l=len();
return l == token2.second-token2.first &&
strncmp(token.first, token2.first, l) == 0;
}
+ bool match(const string& str) const {
+ ptrdiff_t l=len();
+ return l == ptrdiff_t(str.size()) &&
+ str.compare(0, l, token.first, l) == 0;
+ }
+
ptrdiff_t len() const { return token.second - token.first; }
- Token token;
+
const char* end;
+ Token token;
};
-class Normalizer : public TokenIterator {
+
+class TopicExchange::Normalizer : public TopicExchange::TokenIterator {
public:
Normalizer(string& p)
: TokenIterator(&p[0], &p[0]+p.size()), pattern(p)
@@ -106,54 +192,7 @@ class Normalizer : public TokenIterator {
string& pattern;
};
-class Matcher {
- public:
- Matcher(const string& p, const string& k)
- : matched(), pattern(&p[0], &p[0]+p.size()), key(&k[0], &k[0]+k.size())
- { matched = match(); }
-
- operator bool() const { return matched; }
-
- private:
- Matcher(const char* bp, const char* ep, const char* bk, const char* ek)
- : matched(), pattern(bp,ep), key(bk,ek) { matched = match(); }
-
- bool match() {
- // Invariant: pattern and key match up to but excluding
- // pattern.token and key.token
- while (!pattern.finished() && !key.finished()) {
- if (pattern.match1('*') && !key.finished()) {
- pattern.next();
- key.next();
- }
- else if (pattern.match1('#')) {
- pattern.next();
- if (pattern.finished()) return true; // Trailing # matches anything.
- while (!key.finished()) {
- if (Matcher(pattern.token.first, pattern.end,
- key.token.first, key.end))
- return true;
- key.next();
- }
- return false;
- }
- else if (pattern.len() == key.len() &&
- equal(pattern.token.first,pattern.token.second,key.token.first)) {
- pattern.next();
- key.next();
- }
- else
- return false;
- }
- if (!pattern.finished() && pattern.match1('#'))
- pattern.next(); // Trailing # matches empty.
- return pattern.finished() && key.finished();
- }
- bool matched;
- TokenIterator pattern, key;
-};
-}
// Convert sequences of * and # to a sequence of * followed by a single #
string TopicExchange::normalize(const string& pattern) {
@@ -162,12 +201,10 @@ string TopicExchange::normalize(const string& pattern) {
return normal;
}
-bool TopicExchange::match(const string& pattern, const string& key)
-{
- return Matcher(pattern, key);
-}
-TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
+TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b)
+ : Exchange(_name, _parent, b),
+ nBindings(0)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
@@ -175,7 +212,8 @@ TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b
TopicExchange::TopicExchange(const std::string& _name, bool _durable,
const FieldTable& _args, Manageable* _parent, Broker* b) :
- Exchange(_name, _durable, _args, _parent, b)
+ Exchange(_name, _durable, _args, _parent, b),
+ nBindings(0)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
@@ -187,22 +225,27 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
string fedTags(args ? args->getAsString(qpidFedTags) : "");
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
bool propagate = false;
- bool reallyUnbind;
string routingPattern = normalize(routingKey);
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
RWlock::ScopedWlock l(lock);
- if (isBound(queue, routingPattern)) {
- // already bound, but may be from a different fedOrigin
- BoundKey& bk = bindings[routingPattern];
- bk.fedBinding.addOrigin(fedOrigin);
- return false;
- } else {
+ BindingKey *bk = bindingTree.addBindingKey(routingPattern);
+ if (bk) {
+ Binding::vector& qv(bk->bindingVector);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++) {
+ if ((*q)->queue == queue) {
+ // already bound, but may be from a different fedOrigin
+ bk->fedBinding.addOrigin(fedOrigin);
+ return false;
+ }
+ }
+
Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin));
binding->startManagement();
- BoundKey& bk = bindings[routingPattern];
- bk.bindingVector.push_back(binding);
- propagate = bk.fedBinding.addOrigin(fedOrigin);
+ bk->bindingVector.push_back(binding);
+ nBindings++;
+ propagate = bk->fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
@@ -210,11 +253,14 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
<< " (origin=" << fedOrigin << ")");
}
} else if (fedOp == fedOpUnbind) {
+ bool reallyUnbind = false;
{
RWlock::ScopedWlock l(lock);
- BoundKey& bk = bindings[routingPattern];
- propagate = bk.fedBinding.delOrigin(fedOrigin);
- reallyUnbind = bk.fedBinding.count() == 0;
+ BindingKey* bk = bindingTree.getBindingKey(routingPattern);
+ if (bk) {
+ propagate = bk->fedBinding.delOrigin(fedOrigin);
+ reallyUnbind = bk->fedBinding.count() == 0;
+ }
}
if (reallyUnbind)
unbind(queue, routingPattern, 0);
@@ -223,20 +269,14 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
* while holding the lock. Then propagate once the lock is
* released
*/
- std::vector<std::string> keys2prop;
+ ReOriginIter reOriginIter;
{
- RWlock::ScopedRlock l(lock);
- for (BindingMap::iterator iter = bindings.begin();
- iter != bindings.end(); iter++) {
- const BoundKey& bk = iter->second;
-
- if (bk.fedBinding.hasLocal()) {
- keys2prop.push_back(iter->first);
- }
- }
+ RWlock::ScopedRlock l(lock);
+ bindingTree.iterateAll( reOriginIter );
} /* lock dropped */
- for (std::vector<std::string>::const_iterator key = keys2prop.begin();
- key != keys2prop.end(); key++) {
+
+ for (std::vector<std::string>::const_iterator key = reOriginIter.keys2prop.begin();
+ key != reOriginIter.keys2prop.end(); key++) {
propagateFedOp( *key, string(), fedOpBind, string());
}
}
@@ -250,11 +290,9 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
string routingKey = normalize(constRoutingKey);
-
- BindingMap::iterator bi = bindings.find(routingKey);
- if (bi == bindings.end()) return false;
- BoundKey& bk = bi->second;
- Binding::vector& qv(bk.bindingVector);
+ BindingKey* bk = bindingTree.getBindingKey(routingKey);
+ if (!bk) return false;
+ Binding::vector& qv(bk->bindingVector);
bool propagate = false;
Binding::vector::iterator q;
@@ -263,8 +301,12 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe
break;
if(q == qv.end()) return false;
qv.erase(q);
- propagate = bk.fedBinding.delOrigin();
- if(qv.empty()) bindings.erase(bi);
+ assert(nBindings > 0);
+ nBindings--;
+ propagate = bk->fedBinding.delOrigin();
+ if(qv.empty()) {
+ bindingTree.removeBindingKey(routingKey);
+ }
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
}
@@ -277,9 +319,10 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe
bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)
{
- BindingMap::iterator bi = bindings.find(pattern);
- if (bi == bindings.end()) return false;
- Binding::vector& qv(bi->second.bindingVector);
+ // Note well: lock held by caller....
+ BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern
+ if (!bk) return false;
+ Binding::vector& qv(bk->bindingVector);
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)
@@ -289,22 +332,13 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
{
+ // Note: PERFORMANCE CRITICAL!!!
BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
PreRoute pr(msg, this);
- std::set<std::string> qSet;
+ BindingsFinderIter bindingsFinder(b);
{
RWlock::ScopedRlock l(lock);
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, routingKey)) {
- Binding::vector& qv(i->second.bindingVector);
- for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
- // do not duplicate queues on the binding list
- if (qSet.insert(j->get()->queue->getName()).second) {
- b->push_back(*j);
- }
- }
- }
- }
+ bindingTree.iterateMatch(routingKey, bindingsFinder);
}
doRoute(msg, b);
}
@@ -316,27 +350,311 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routing
string key(normalize(*routingKey));
return isBound(queue, key);
} else if (!routingKey && !queue) {
- return bindings.size() > 0;
+ return nBindings > 0;
} else if (routingKey) {
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *routingKey))
- return true;
- }
- } else {
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- Binding::vector& qv(i->second.bindingVector);
- Binding::vector::iterator q;
- for (q = qv.begin(); q != qv.end(); q++)
- if ((*q)->queue == queue)
- return true;
+ if (bindingTree.getBindingKey(*routingKey)) {
+ return true;
}
+ } else {
+ QueueFinderIter queueFinder(queue);
+ bindingTree.iterateAll( queueFinder );
+ return queueFinder.found;
}
return false;
- return queue && routingKey;
}
TopicExchange::~TopicExchange() {}
const std::string TopicExchange::typeName("topic");
+//
+// class BindingNode
+//
+
+TopicExchange::BindingNode::~BindingNode()
+{
+ childTokens.clear();
+}
+
+
+// Add a binding pattern to the tree. Return a pointer to the binding key
+// of the node that matches the binding pattern.
+TopicExchange::BindingKey*
+TopicExchange::BindingNode::addBindingKey(const std::string& normalizedRoute)
+{
+ TokenIterator bKey(normalizedRoute);
+ return addBindingKey(bKey, normalizedRoute);
+}
+
+
+// Return a pointer to the binding key of the leaf node that matches the binding pattern.
+TopicExchange::BindingKey*
+TopicExchange::BindingNode::getBindingKey(const std::string& normalizedRoute)
+{
+ TokenIterator bKey(normalizedRoute);
+ return getBindingKey(bKey);
+}
+
+
+// Delete the binding associated with the given route.
+void TopicExchange::BindingNode::removeBindingKey(const std::string& normalizedRoute)
+{
+ TokenIterator bKey2(normalizedRoute);
+ removeBindingKey(bKey2, normalizedRoute);
+}
+
+// visit each node in the tree. Note: all nodes are visited,
+// even non-leaf nodes (i.e. nodes without any bindings)
+bool TopicExchange::BindingNode::iterateAll(TopicExchange::BindingNode::TreeIterator& iter)
+{
+ if (!iter.visit(*this)) return false;
+
+ if (starChild && !starChild->iterateAll(iter)) return false;
+
+ if (hashChild && !hashChild->iterateAll(iter)) return false;
+
+ for (ChildMap::iterator ptr = childTokens.begin();
+ ptr != childTokens.end(); ptr++) {
+
+ if (!ptr->second->iterateAll(iter)) return false;
+ }
+
+ return true;
+}
+
+// applies iter against only matching nodes until iter returns false
+// Note Well: the iter may match against the same node more than once
+// if # wildcards are present!
+bool TopicExchange::BindingNode::iterateMatch(const std::string& routingKey, TreeIterator& iter)
+{
+ TopicExchange::TokenIterator rKey(routingKey);
+ return iterateMatch( rKey, iter );
+}
+
+
+// recurse over binding using token iterator.
+// Note well: bkey is modified!
+TopicExchange::BindingKey*
+TopicExchange::BindingNode::addBindingKey(TokenIterator &bKey,
+ const string& fullPattern)
+{
+ if (bKey.finished()) {
+ // this node's binding
+ if (routePattern.empty()) {
+ routePattern = fullPattern;
+ } else assert(routePattern == fullPattern);
+
+ return &bindings;
+
+ } else {
+ // pop the topmost token & recurse...
+
+ if (bKey.match(STAR)) {
+ if (!starChild) {
+ starChild.reset(new StarNode());
+ }
+ bKey.next();
+ return starChild->addBindingKey(bKey, fullPattern);
+
+ } else if (bKey.match(HASH)) {
+ if (!hashChild) {
+ hashChild.reset(new HashNode());
+ }
+ bKey.next();
+ return hashChild->addBindingKey(bKey, fullPattern);
+
+ } else {
+ ChildMap::iterator ptr;
+ std::string next_token;
+ bKey.pop(next_token);
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->addBindingKey(bKey, fullPattern);
+ } else {
+ BindingNode::shared_ptr child(new BindingNode(next_token));
+ childTokens[next_token] = child;
+ return child->addBindingKey(bKey, fullPattern);
+ }
+ }
+ }
+}
+
+
+// Remove a binding pattern from the tree. Return true if the current
+// node becomes a leaf without any bindings (therefore can be deleted).
+// Note Well: modifies parameter bKey's value!
+bool
+TopicExchange::BindingNode::removeBindingKey(TokenIterator &bKey,
+ const string& fullPattern)
+{
+ bool remove;
+
+ if (!bKey.finished()) {
+
+ if (bKey.match(STAR)) {
+ bKey.next();
+ if (starChild) {
+ remove = starChild->removeBindingKey(bKey, fullPattern);
+ if (remove) {
+ starChild.reset();
+ }
+ }
+ } else if (bKey.match(HASH)) {
+ bKey.next();
+ if (hashChild) {
+ remove = hashChild->removeBindingKey(bKey, fullPattern);
+ if (remove) {
+ hashChild.reset();
+ }
+ }
+ } else {
+ ChildMap::iterator ptr;
+ std::string next_token;
+ bKey.pop(next_token);
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ remove = ptr->second->removeBindingKey(bKey, fullPattern);
+ if (remove) {
+ childTokens.erase(ptr);
+ }
+ }
+ }
+ }
+
+ // no bindings and no children == parent can delete this node.
+ return getChildCount() == 0 && bindings.bindingVector.empty();
+}
+
+
+// find the binding key that matches the given binding pattern.
+// Note Well: modifies key parameter!
+TopicExchange::BindingKey*
+TopicExchange::BindingNode::getBindingKey(TokenIterator &key)
+{
+ if (key.finished()) {
+ return &bindings;
+ }
+
+ string next_token;
+
+ key.pop(next_token);
+
+ if (next_token == STAR) {
+ if (starChild)
+ return starChild->getBindingKey(key);
+ } else if (next_token == HASH) {
+ if (hashChild)
+ return hashChild->getBindingKey(key);
+ } else {
+ ChildMap::iterator ptr;
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->getBindingKey(key);
+ }
+ }
+
+ return 0;
+}
+
+
+
+// iterate over all nodes that match the given key. Note well: the set of nodes
+// that are visited includes matching non-leaf nodes.
+// Note well: parameter key is modified!
+bool TopicExchange::BindingNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
+{
+ // invariant: key has matched all previous tokens up to this node.
+ if (key.finished()) {
+ // exact match this node: visit if bound
+ if (!bindings.bindingVector.empty())
+ if (!iter.visit(*this)) return false;
+ }
+
+ // check remaining key against children, even if empty.
+ return iterateMatchChildren(key, iter);
+}
+
+
+TopicExchange::StarNode::StarNode()
+ : BindingNode(STAR) {}
+
+
+// See iterateMatch() above.
+// Special case: this node must verify a token is available (match exactly one).
+bool TopicExchange::StarNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
+{
+ // must match one token:
+ if (key.finished())
+ return true; // match failed, but continue iteration on siblings
+
+ // pop the topmost token
+ key.next();
+
+ if (key.finished()) {
+ // exact match this node: visit if bound
+ if (!bindings.bindingVector.empty())
+ if (!iter.visit(*this)) return false;
+ }
+
+ return iterateMatchChildren(key, iter);
+}
+
+
+TopicExchange::HashNode::HashNode()
+ : BindingNode(HASH) {}
+
+
+// See iterateMatch() above.
+// Special case: can match zero or more tokens at the head of the key.
+bool TopicExchange::HashNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
+{
+ // consume each token and look for a match on the
+ // remaining key.
+ while (!key.finished()) {
+ if (!iterateMatchChildren(key, iter)) return false;
+ key.next();
+ }
+
+ if (!bindings.bindingVector.empty())
+ return iter.visit(*this);
+
+ return true;
+}
+
+
+// helper: iterate over current node's matching children
+bool
+TopicExchange::BindingNode::iterateMatchChildren(const TopicExchange::TokenIterator& key,
+ TopicExchange::BindingNode::TreeIterator& iter)
+{
+ // always try glob - it can match empty keys
+ if (hashChild) {
+ TokenIterator tmp(key);
+ if (!hashChild->iterateMatch(tmp, iter))
+ return false;
+ }
+
+ if (!key.finished()) {
+
+ if (starChild) {
+ TokenIterator tmp(key);
+ if (!starChild->iterateMatch(tmp, iter))
+ return false;
+ }
+
+ if (!childTokens.empty()) {
+ TokenIterator newKey(key);
+ std::string next_token;
+ newKey.pop(next_token);
+
+ ChildMap::iterator ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->iterateMatch(newKey, iter);
+ }
+ }
+ }
+
+ return true;
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 54c3bb32c8..f5573b3463 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,34 +29,132 @@
#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
+
namespace qpid {
namespace broker {
class TopicExchange : public virtual Exchange {
- struct BoundKey {
+
+ struct TokenIterator;
+ class Normalizer;
+
+ struct BindingKey { // binding for this node
Binding::vector bindingVector;
FedBinding fedBinding;
};
- typedef std::map<std::string, BoundKey> BindingMap;
- BindingMap bindings;
- qpid::sys::RWlock lock;
+
+ // Binding database:
+ // The dotted form of a binding key is broken up and stored in a directed tree graph.
+ // Common binding prefix are merged. This allows the route match alogrithm to quickly
+ // isolate those sub-trees that match a given routingKey.
+ // For example, given the routes:
+ // a.b.c.<...>
+ // a.b.d.<...>
+ // a.x.y.<...>
+ // The resulting tree would be:
+ // a-->b-->c-->...
+ // | +-->d-->...
+ // +-->x-->y-->...
+ //
+ class BindingNode {
+ public:
+
+ typedef boost::shared_ptr<BindingNode> shared_ptr;
+
+ // for database transversal (visit a node).
+ class TreeIterator {
+ public:
+ TreeIterator() {};
+ virtual ~TreeIterator() {};
+ virtual bool visit(BindingNode& node) = 0;
+ };
+
+ BindingNode() {};
+ BindingNode(const std::string& token) : token(token) {};
+ virtual ~BindingNode();
+
+ // add normalizedRoute to tree, return associated BindingKey
+ BindingKey* addBindingKey(const std::string& normalizedRoute);
+
+ // return BindingKey associated with normalizedRoute
+ BindingKey* getBindingKey(const std::string& normalizedRoute);
+
+ // remove BindingKey associated with normalizedRoute
+ void removeBindingKey(const std::string& normalizedRoute);
+
+ // applies iter against each node in tree until iter returns false
+ bool iterateAll(TreeIterator& iter);
+
+ // applies iter against only matching nodes until iter returns false
+ bool iterateMatch(const std::string& routingKey, TreeIterator& iter);
+
+ std::string routePattern; // normalized binding that matches this node
+ BindingKey bindings; // for matches against this node
+
+ protected:
+
+ std::string token; // portion of pattern represented by this node
+
+ // children
+ typedef std::map<const std::string, BindingNode::shared_ptr> ChildMap;
+ ChildMap childTokens;
+ BindingNode::shared_ptr starChild; // "*" subtree
+ BindingNode::shared_ptr hashChild; // "#" subtree
+
+ unsigned int getChildCount() { return childTokens.size() +
+ (starChild ? 1 : 0) + (hashChild ? 1 : 0); }
+ BindingKey* addBindingKey(TokenIterator& bKey,
+ const std::string& fullPattern);
+ bool removeBindingKey(TokenIterator& bKey,
+ const std::string& fullPattern);
+ BindingKey* getBindingKey(TokenIterator& bKey);
+ virtual bool iterateMatch(TokenIterator& rKey, TreeIterator& iter);
+ bool iterateMatchChildren(const TokenIterator& key, TreeIterator& iter);
+ };
+
+ // Special case: ("*" token) Node in the tree for a match exactly one wildcard
+ class StarNode : public BindingNode {
+ public:
+ StarNode();
+ ~StarNode() {};
+
+ protected:
+ virtual bool iterateMatch(TokenIterator& key, TreeIterator& iter);
+ };
+
+ // Special case: ("#" token) Node in the tree for a match zero or more
+ class HashNode : public BindingNode {
+ public:
+ HashNode();
+ ~HashNode() {};
+
+ protected:
+ virtual bool iterateMatch(TokenIterator& key, TreeIterator& iter);
+ };
+
+ BindingNode bindingTree;
+ unsigned long nBindings;
+ qpid::sys::RWlock lock; // protects bindingTree and nBindings
bool isBound(Queue::shared_ptr queue, const std::string& pattern);
-
+
+ class ReOriginIter;
+ class BindingsFinderIter;
+ class QueueFinderIter;
+
public:
static const std::string typeName;
- static QPID_BROKER_EXTERN bool match(const std::string& pattern, const std::string& topic);
static QPID_BROKER_EXTERN std::string normalize(const std::string& pattern);
QPID_BROKER_EXTERN TopicExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN TopicExchange(const std::string& _name,
- bool _durable,
+ bool _durable,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
- virtual std::string getType() const { return typeName; }
+ virtual std::string getType() const { return typeName; }
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
@@ -74,6 +172,9 @@ class TopicExchange : public virtual Exchange {
QPID_BROKER_EXTERN virtual ~TopicExchange();
virtual bool supportsDynamicBinding() { return true; }
+
+ class TopicExchangeTester;
+ friend class TopicExchangeTester;
};
diff --git a/cpp/src/tests/TopicExchangeTest.cpp b/cpp/src/tests/TopicExchangeTest.cpp
index c103620dbf..ff8931f9c9 100644
--- a/cpp/src/tests/TopicExchangeTest.cpp
+++ b/cpp/src/tests/TopicExchangeTest.cpp
@@ -23,13 +23,121 @@
using namespace qpid::broker;
using namespace std;
+
namespace qpid {
+namespace broker {
+
+// Class for exercising the pattern match code in the TopicExchange
+class TopicExchange::TopicExchangeTester {
+
+public:
+ typedef std::vector<std::string> BindingVec;
+
+private:
+ // binding node iterator that collects all routes that are bound
+ class TestFinder : public TopicExchange::BindingNode::TreeIterator {
+ public:
+ TestFinder(BindingVec& m) : bv(m) {};
+ ~TestFinder() {};
+ bool visit(BindingNode& node) {
+ if (!node.bindings.bindingVector.empty())
+ bv.push_back(node.routePattern);
+ return true;
+ }
+
+ BindingVec& bv;
+ };
+
+public:
+ TopicExchangeTester() {};
+ ~TopicExchangeTester() {};
+ bool addBindingKey(const std::string& bKey) {
+ string routingPattern = normalize(bKey);
+ BindingKey *bk = bindingTree.addBindingKey(routingPattern);
+ if (bk) {
+ // push a dummy binding to mark this node as "non-leaf"
+ bk->bindingVector.push_back(Binding::shared_ptr());
+ return true;
+ }
+ return false;
+ }
+
+ bool removeBindingKey(const std::string& bKey){
+ string routingPattern = normalize(bKey);
+ BindingKey *bk = bindingTree.getBindingKey(routingPattern);
+ if (bk) {
+ bk->bindingVector.pop_back();
+ if (bk->bindingVector.empty()) {
+ // no more bindings - remove this node
+ bindingTree.removeBindingKey(routingPattern);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ void findMatches(const std::string& rKey, BindingVec& matches) {
+ TestFinder testFinder(matches);
+ bindingTree.iterateMatch( rKey, testFinder );
+ }
+
+ void getAll(BindingVec& bindings) {
+ TestFinder testFinder(bindings);
+ bindingTree.iterateAll( testFinder );
+ }
+
+private:
+ TopicExchange::BindingNode bindingTree;
+};
+} // namespace broker
+
+
namespace tests {
QPID_AUTO_TEST_SUITE(TopicExchangeTestSuite)
#define CHECK_NORMALIZED(expect, pattern) BOOST_CHECK_EQUAL(expect, TopicExchange::normalize(pattern));
+namespace {
+ // return the count of bindings that match 'pattern'
+ int match(TopicExchange::TopicExchangeTester &tt,
+ const std::string& pattern)
+ {
+ TopicExchange::TopicExchangeTester::BindingVec bv;
+ tt.findMatches(pattern, bv);
+ return int(bv.size());
+ }
+
+ // return true if expected contains exactly all bindings that match
+ // against pattern.
+ bool compare(TopicExchange::TopicExchangeTester& tt,
+ const std::string& pattern,
+ const TopicExchange::TopicExchangeTester::BindingVec& expected)
+ {
+ TopicExchange::TopicExchangeTester::BindingVec bv;
+ tt.findMatches(pattern, bv);
+ if (expected.size() != bv.size()) {
+ // std::cout << "match failed 1 f=[" << bv << "]" << std::endl;
+ // std::cout << "match failed 1 e=[" << expected << "]" << std::endl;
+ return false;
+ }
+ TopicExchange::TopicExchangeTester::BindingVec::const_iterator i;
+ for (i = expected.begin(); i != expected.end(); i++) {
+ TopicExchange::TopicExchangeTester::BindingVec::iterator j;
+ for (j = bv.begin(); j != bv.end(); j++) {
+ // std::cout << "matched [" << *j << "]" << std::endl;
+ if (*i == *j) break;
+ }
+ if (j == bv.end()) {
+ // std::cout << "match failed 2 [" << bv << "]" << std::endl;
+ return false;
+ }
+ }
+ return true;
+ }
+}
+
+
QPID_AUTO_TEST_CASE(testNormalize)
{
CHECK_NORMALIZED("", "");
@@ -45,81 +153,252 @@ QPID_AUTO_TEST_CASE(testNormalize)
QPID_AUTO_TEST_CASE(testPlain)
{
+ TopicExchange::TopicExchangeTester tt;
string pattern("ab.cd.e");
- BOOST_CHECK(TopicExchange::match(pattern, "ab.cd.e"));
- BOOST_CHECK(!TopicExchange::match(pattern, "abx.cd.e"));
- BOOST_CHECK(!TopicExchange::match(pattern, "ab.cd"));
- BOOST_CHECK(!TopicExchange::match(pattern, "ab.cd..e."));
- BOOST_CHECK(!TopicExchange::match(pattern, "ab.cd.e."));
- BOOST_CHECK(!TopicExchange::match(pattern, ".ab.cd.e"));
+
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "ab.cd.e"));
+ BOOST_CHECK_EQUAL(0, match(tt, "abx.cd.e"));
+ BOOST_CHECK_EQUAL(0, match(tt, "ab.cd"));
+ BOOST_CHECK_EQUAL(0, match(tt, "ab.cd..e."));
+ BOOST_CHECK_EQUAL(0, match(tt, "ab.cd.e."));
+ BOOST_CHECK_EQUAL(0, match(tt, ".ab.cd.e"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = "";
- BOOST_CHECK(TopicExchange::match(pattern, ""));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, ""));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = ".";
- BOOST_CHECK(TopicExchange::match(pattern, "."));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "."));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
}
QPID_AUTO_TEST_CASE(testStar)
{
+ TopicExchange::TopicExchangeTester tt;
string pattern("a.*.b");
- BOOST_CHECK(TopicExchange::match(pattern, "a.xx.b"));
- BOOST_CHECK(!TopicExchange::match(pattern, "a.b"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.xx.b"));
+ BOOST_CHECK_EQUAL(0, match(tt, "a.b"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = "*.x";
- BOOST_CHECK(TopicExchange::match(pattern, "y.x"));
- BOOST_CHECK(TopicExchange::match(pattern, ".x"));
- BOOST_CHECK(!TopicExchange::match(pattern, "x"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "y.x"));
+ BOOST_CHECK_EQUAL(1, match(tt, ".x"));
+ BOOST_CHECK_EQUAL(0, match(tt, "x"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = "x.x.*";
- BOOST_CHECK(TopicExchange::match(pattern, "x.x.y"));
- BOOST_CHECK(TopicExchange::match(pattern, "x.x."));
- BOOST_CHECK(!TopicExchange::match(pattern, "x.x"));
- BOOST_CHECK(!TopicExchange::match(pattern, "q.x.y"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "x.x.y"));
+ BOOST_CHECK_EQUAL(1, match(tt, "x.x."));
+ BOOST_CHECK_EQUAL(0, match(tt, "x.x"));
+ BOOST_CHECK_EQUAL(0, match(tt, "q.x.y"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
}
QPID_AUTO_TEST_CASE(testHash)
{
+ TopicExchange::TopicExchangeTester tt;
string pattern("a.#.b");
- BOOST_CHECK(TopicExchange::match(pattern, "a.b"));
- BOOST_CHECK(TopicExchange::match(pattern, "a.x.b"));
- BOOST_CHECK(TopicExchange::match(pattern, "a..x.y.zz.b"));
- BOOST_CHECK(!TopicExchange::match(pattern, "a.b."));
- BOOST_CHECK(!TopicExchange::match(pattern, "q.x.b"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.b"));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.x.b"));
+ BOOST_CHECK_EQUAL(1, match(tt, "a..x.y.zz.b"));
+ BOOST_CHECK_EQUAL(0, match(tt, "a.b."));
+ BOOST_CHECK_EQUAL(0, match(tt, "q.x.b"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = "a.#";
- BOOST_CHECK(TopicExchange::match(pattern, "a"));
- BOOST_CHECK(TopicExchange::match(pattern, "a.b"));
- BOOST_CHECK(TopicExchange::match(pattern, "a.b.c"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "a"));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.b"));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.b.c"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = "#.a";
- BOOST_CHECK(TopicExchange::match(pattern, "a"));
- BOOST_CHECK(TopicExchange::match(pattern, "x.y.a"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "a"));
+ BOOST_CHECK_EQUAL(1, match(tt, "x.y.a"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = "a.#.b.#.c";
- BOOST_CHECK(TopicExchange::match(pattern, "a.b.c"));
- BOOST_CHECK(TopicExchange::match(pattern, "a.x.b.y.c"));
- BOOST_CHECK(TopicExchange::match(pattern, "a.x.x.b.y.y.c"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.b.c"));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.x.b.y.c"));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.x.x.b.y.y.c"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
}
QPID_AUTO_TEST_CASE(testMixed)
{
+ TopicExchange::TopicExchangeTester tt;
string pattern("*.x.#.y");
- BOOST_CHECK(TopicExchange::match(pattern, "a.x.y"));
- BOOST_CHECK(TopicExchange::match(pattern, "a.x.p.qq.y"));
- BOOST_CHECK(!TopicExchange::match(pattern, "a.a.x.y"));
- BOOST_CHECK(!TopicExchange::match(pattern, "aa.x.b.c"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.x.y"));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.x.p.qq.y"));
+ BOOST_CHECK_EQUAL(0, match(tt, "a.a.x.y"));
+ BOOST_CHECK_EQUAL(0, match(tt, "aa.x.b.c"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = "a.#.b.*";
- BOOST_CHECK(TopicExchange::match(pattern, "a.b.x"));
- BOOST_CHECK(TopicExchange::match(pattern, "a.x.x.x.b.x"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.b.x"));
+ BOOST_CHECK_EQUAL(1, match(tt, "a.x.x.x.b.x"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
pattern = "*.*.*.#";
- BOOST_CHECK(TopicExchange::match(pattern, "x.y.z"));
- BOOST_CHECK(TopicExchange::match(pattern, "x.y.z.a.b.c"));
- BOOST_CHECK(!TopicExchange::match(pattern, "x.y"));
- BOOST_CHECK(!TopicExchange::match(pattern, "x"));
+ BOOST_CHECK(tt.addBindingKey(pattern));
+ BOOST_CHECK_EQUAL(1, match(tt, "x.y.z"));
+ BOOST_CHECK_EQUAL(1, match(tt, "x.y.z.a.b.c"));
+ BOOST_CHECK_EQUAL(0, match(tt, "x.y"));
+ BOOST_CHECK_EQUAL(0, match(tt, "x"));
+ BOOST_CHECK(tt.removeBindingKey(pattern));
+}
+
+
+QPID_AUTO_TEST_CASE(testMultiple)
+{
+ TopicExchange::TopicExchangeTester tt;
+ const std::string bindings[] =
+ { "a", "b",
+ "a.b", "b.c",
+ "a.b.c.d", "b.c.d.e",
+ "a.*", "a.#", "a.*.#",
+ "#.b", "*.b", "*.#.b",
+ "a.*.b", "a.#.b", "a.*.#.b",
+ "*.b.*", "#.b.#",
+ };
+ const size_t nBindings = sizeof(bindings)/sizeof(bindings[0]);
+
+ // setup bindings
+ for (size_t idx = 0; idx < nBindings; idx++) {
+ BOOST_CHECK(tt.addBindingKey(bindings[idx]));
+ }
+
+ {
+ // read all bindings, and verify all are present
+ TopicExchange::TopicExchangeTester::BindingVec b;
+ tt.getAll(b);
+ BOOST_CHECK_EQUAL(b.size(), nBindings);
+ for (size_t idx = 0; idx < nBindings; idx++) {
+ bool found = false;
+ for (TopicExchange::TopicExchangeTester::BindingVec::iterator i = b.begin();
+ i != b.end(); i++) {
+ if (*i == bindings[idx]) {
+ found = true;
+ break;
+ }
+ }
+ BOOST_CHECK(found);
+ }
+ }
+
+ { // test match on pattern "a"
+ const std::string matches[] = { "a", "a.#" };
+ const size_t nMatches = 2;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "a", expected));
+ }
+
+ { // test match on pattern "a.z"
+ const std::string matches[] = { "a.*", "a.#", "a.*.#" };
+ const size_t nMatches = 3;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "a.z", expected));
+ }
+
+ { // test match on pattern "a.b"
+ const std::string matches[] = {
+ "a.b", "a.*", "a.#", "a.*.#",
+ "#.b", "#.b.#", "*.#.b", "*.b",
+ "a.#.b"
+ };
+ const size_t nMatches = 9;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "a.b", expected));
+ }
+
+ { // test match on pattern "a.c.c.b"
+
+ const std::string matches[] = {
+ "#.b", "#.b.#", "*.#.b", "a.#.b",
+ "a.#", "a.*.#.b", "a.*.#"
+ };
+ const size_t nMatches = 7;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "a.c.c.b", expected));
+ }
+
+ { // test match on pattern "a.b.c"
+
+ const std::string matches[] = {
+ "#.b.#", "*.b.*", "a.#", "a.*.#"
+ };
+ const size_t nMatches = 4;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "a.b.c", expected));
+ }
+
+ { // test match on pattern "b"
+
+ const std::string matches[] = {
+ "#.b", "#.b.#", "b"
+ };
+ const size_t nMatches = 3;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "b", expected));
+ }
+
+ { // test match on pattern "x.b"
+
+ const std::string matches[] = {
+ "#.b", "#.b.#", "*.#.b", "*.b"
+ };
+ const size_t nMatches = 4;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "x.b", expected));
+ }
+
+ { // test match on pattern "x.y.z.b"
+
+ const std::string matches[] = {
+ "#.b", "#.b.#", "*.#.b"
+ };
+ const size_t nMatches = 3;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "x.y.z.b", expected));
+ }
+
+ { // test match on pattern "x.y.z.b.a.b.c"
+
+ const std::string matches[] = {
+ "#.b.#", "#.b.#"
+ };
+ const size_t nMatches = 2;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "x.y.z.b.a.b.c", expected));
+ }
+
+ { // test match on pattern "a.b.c.d"
+
+ const std::string matches[] = {
+ "#.b.#", "a.#", "a.*.#", "a.b.c.d",
+ };
+ const size_t nMatches = 4;
+ TopicExchange::TopicExchangeTester::BindingVec expected(matches, matches + nMatches);
+ BOOST_CHECK(compare(tt, "a.b.c.d", expected));
+ }
+
+ // cleanup bindings
+ for (size_t idx = 0; idx < nBindings; idx++) {
+ BOOST_CHECK(tt.removeBindingKey(bindings[idx]));
+ }
}
QPID_AUTO_TEST_SUITE_END()