summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/TopicExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp692
1 files changed, 692 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
new file mode 100644
index 0000000000..644a3d628e
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -0,0 +1,692 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/FedOps.h"
+#include "qpid/log/Statement.h"
+#include <algorithm>
+
+
+namespace qpid {
+namespace broker {
+
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace std;
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// Areas for improvement:
+// - excessive string copying: should be 0 copy, match from original buffer.
+// - match/lookup: use descision tree or other more efficient structure.
+
+namespace
+{
+const std::string STAR("*");
+const std::string HASH("#");
+}
+
+// iterator for federation ReOrigin bind operation
+class TopicExchange::ReOriginIter : public TopicExchange::BindingNode::TreeIterator {
+public:
+ ReOriginIter() {};
+ ~ReOriginIter() {};
+ bool visit(BindingNode& node) {
+ if (node.bindings.fedBinding.hasLocal()) {
+ keys2prop.push_back(node.routePattern);
+ }
+ return true;
+ }
+ std::vector<std::string> keys2prop;
+};
+
+
+// match iterator used by route(): builds BindingList of all unique queues
+// that match the routing key.
+class TopicExchange::BindingsFinderIter : public TopicExchange::BindingNode::TreeIterator {
+public:
+ BindingsFinderIter(BindingList &bl) : b(bl) {};
+ ~BindingsFinderIter() {};
+
+ BindingList& b;
+ std::set<std::string> qSet;
+
+ bool visit(BindingNode& node) {
+
+ Binding::vector& qv(node.bindings.bindingVector);
+ for (Binding::vector::iterator j = qv.begin(); j != qv.end(); j++) {
+ // do not duplicate queues on the binding list
+ if (qSet.insert(j->get()->queue->getName()).second) {
+ b->push_back(*j);
+ }
+ }
+ return true;
+ }
+};
+
+
+
+// Iterator to visit all bindings until a given queue is found
+class TopicExchange::QueueFinderIter : public TopicExchange::BindingNode::TreeIterator {
+public:
+ QueueFinderIter(Queue::shared_ptr queue) : queue(queue), found(false) {};
+ ~QueueFinderIter() {};
+ bool visit(BindingNode& node) {
+
+ Binding::vector& qv(node.bindings.bindingVector);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++) {
+ if ((*q)->queue == queue) {
+ found = true;
+ return false; // search done
+ }
+ }
+ return true; // continue search
+ }
+
+ Queue::shared_ptr queue;
+ bool found;
+};
+
+
+// Iterate over a string of '.'-separated tokens.
+struct TopicExchange::TokenIterator {
+ typedef pair<const char*,const char*> Token;
+
+ TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {}
+
+ TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {}
+
+ bool finished() const { return !token.first; }
+
+ void next() {
+ if (token.second == end)
+ token.first = token.second = 0;
+ else {
+ token.first=token.second+1;
+ token.second=(find(token.first, end, '.'));
+ }
+ }
+
+ void pop(string &top) {
+ ptrdiff_t l = len();
+ if (l) {
+ top.assign(token.first, l);
+ } else top.clear();
+ next();
+ }
+
+ bool match1(char c) const {
+ return token.second==token.first+1 && *token.first == c;
+ }
+
+ bool match(const Token& token2) const {
+ ptrdiff_t l=len();
+ return l == token2.second-token2.first &&
+ strncmp(token.first, token2.first, l) == 0;
+ }
+
+ bool match(const string& str) const {
+ ptrdiff_t l=len();
+ return l == ptrdiff_t(str.size()) &&
+ str.compare(0, l, token.first, l) == 0;
+ }
+
+ ptrdiff_t len() const { return token.second - token.first; }
+
+
+ const char* end;
+ Token token;
+};
+
+
+class TopicExchange::Normalizer : public TopicExchange::TokenIterator {
+ public:
+ Normalizer(string& p)
+ : TokenIterator(&p[0], &p[0]+p.size()), pattern(p)
+ { normalize(); }
+
+ private:
+ // Apply 2 transformations: #.* -> *.# and #.# -> #
+ void normalize() {
+ while (!finished()) {
+ if (match1('#')) {
+ const char* hash1=token.first;
+ next();
+ if (!finished()) {
+ if (match1('#')) { // Erase #.# -> #
+ pattern.erase(hash1-pattern.data(), 2);
+ token.first -= 2;
+ token.second -= 2;
+ end -= 2;
+ }
+ else if (match1('*')) { // Swap #.* -> *.#
+ swap(*const_cast<char*>(hash1),
+ *const_cast<char*>(token.first));
+ }
+ }
+ }
+ else
+ next();
+ }
+ }
+
+ string& pattern;
+};
+
+
+
+// Convert sequences of * and # to a sequence of * followed by a single #
+string TopicExchange::normalize(const string& pattern) {
+ string normal(pattern);
+ Normalizer n(normal);
+ return normal;
+}
+
+
+TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b)
+ : Exchange(_name, _parent, b),
+ nBindings(0)
+{
+ if (mgmtExchange != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+TopicExchange::TopicExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent, Broker* b) :
+ Exchange(_name, _durable, _args, _parent, b),
+ nBindings(0)
+{
+ if (mgmtExchange != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+ ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
+ string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+ string fedTags(args ? args->getAsString(qpidFedTags) : "");
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ bool propagate = false;
+ string routingPattern = normalize(routingKey);
+
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
+ RWlock::ScopedWlock l(lock);
+ BindingKey *bk = bindingTree.addBindingKey(routingPattern);
+ if (bk) {
+ Binding::vector& qv(bk->bindingVector);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++) {
+ if ((*q)->queue == queue) {
+ // already bound, but may be from a different fedOrigin
+ bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
+ return false;
+ }
+ }
+
+ Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin));
+ binding->startManagement();
+ bk->bindingVector.push_back(binding);
+ nBindings++;
+ propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ }
+ QPID_LOG(debug, "Binding key [" << routingPattern << "] to queue " << queue->getName()
+ << " on exchange " << getName() << " (origin=" << fedOrigin << ")");
+ }
+ } else if (fedOp == fedOpUnbind) {
+ RWlock::ScopedWlock l(lock);
+ BindingKey* bk = getQueueBinding(queue, routingPattern);
+ if (bk) {
+ QPID_LOG(debug, "FedOpUnbind [" << routingPattern << "] from exchange " << getName()
+ << " on queue=" << queue->getName() << " origin=" << fedOrigin);
+ propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
+ // if this was the last binding for the queue, delete the binding
+ if (bk->fedBinding.countFedBindings(queue->getName()) == 0) {
+ deleteBinding(queue, routingPattern, bk);
+ }
+ }
+ } else if (fedOp == fedOpReorigin) {
+ /** gather up all the keys that need rebinding in a local vector
+ * while holding the lock. Then propagate once the lock is
+ * released
+ */
+ ReOriginIter reOriginIter;
+ {
+ RWlock::ScopedRlock l(lock);
+ bindingTree.iterateAll( reOriginIter );
+ } /* lock dropped */
+
+ for (std::vector<std::string>::const_iterator key = reOriginIter.keys2prop.begin();
+ key != reOriginIter.keys2prop.end(); key++) {
+ propagateFedOp( *key, string(), fedOpBind, string());
+ }
+ }
+
+ cc.clearCache(); // clear the cache before we IVE route.
+ routeIVE();
+ if (propagate)
+ propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
+ return true;
+}
+
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args)
+{
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName()
+ << " on exchange " << getName() << " origin=" << fedOrigin << ")" );
+
+ ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
+ RWlock::ScopedWlock l(lock);
+ string routingKey = normalize(constRoutingKey);
+ BindingKey* bk = getQueueBinding(queue, routingKey);
+ if (!bk) return false;
+ bool propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
+ deleteBinding(queue, routingKey, bk);
+ if (propagate)
+ propagateFedOp(routingKey, string(), fedOpUnbind, string());
+ return true;
+}
+
+
+bool TopicExchange::deleteBinding(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ BindingKey *bk)
+{
+ // Note well: write lock held by caller
+ Binding::vector& qv(bk->bindingVector);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
+ if(q == qv.end()) return false;
+ qv.erase(q);
+ assert(nBindings > 0);
+ nBindings--;
+
+ if(qv.empty()) {
+ bindingTree.removeBindingKey(routingKey);
+ }
+ if (mgmtExchange != 0) {
+ mgmtExchange->dec_bindingCount();
+ }
+ QPID_LOG(debug, "Unbound key [" << routingKey << "] from queue " << queue->getName()
+ << " on exchange " << getName());
+ return true;
+}
+
+/** returns a pointer to the BindingKey if the given queue is bound to this
+ * exchange using the routing pattern. 0 if queue binding does not exist.
+ */
+TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern)
+{
+ // Note well: lock held by caller....
+ BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern
+ if (!bk) return 0;
+ Binding::vector& qv(bk->bindingVector);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
+ return (q != qv.end()) ? bk : 0;
+}
+
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
+ // Note: PERFORMANCE CRITICAL!!!
+ BindingList b;
+ std::map<std::string, BindingList>::iterator it;
+ { // only lock the cache for read
+ RWlock::ScopedRlock cl(cacheLock);
+ it = bindingCache.find(routingKey);
+ if (it != bindingCache.end()) {
+ b = it->second;
+ }
+ }
+ PreRoute pr(msg, this);
+ if (!b.get()) // no cache hit
+ {
+ RWlock::ScopedRlock l(lock);
+ b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ BindingsFinderIter bindingsFinder(b);
+ bindingTree.iterateMatch(routingKey, bindingsFinder);
+ RWlock::ScopedWlock cwl(cacheLock);
+ bindingCache[routingKey] = b; // update cache
+ }
+ doRoute(msg, b);
+}
+
+bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+ RWlock::ScopedRlock l(lock);
+ if (routingKey && queue) {
+ string key(normalize(*routingKey));
+ return getQueueBinding(queue, key) != 0;
+ } else if (!routingKey && !queue) {
+ return nBindings > 0;
+ } else if (routingKey) {
+ if (bindingTree.getBindingKey(*routingKey)) {
+ return true;
+ }
+ } else {
+ QueueFinderIter queueFinder(queue);
+ bindingTree.iterateAll( queueFinder );
+ return queueFinder.found;
+ }
+ return false;
+}
+
+TopicExchange::~TopicExchange() {}
+
+const std::string TopicExchange::typeName("topic");
+
+//
+// class BindingNode
+//
+
+TopicExchange::BindingNode::~BindingNode()
+{
+ childTokens.clear();
+}
+
+
+// Add a binding pattern to the tree. Return a pointer to the binding key
+// of the node that matches the binding pattern.
+TopicExchange::BindingKey*
+TopicExchange::BindingNode::addBindingKey(const std::string& normalizedRoute)
+{
+ TokenIterator bKey(normalizedRoute);
+ return addBindingKey(bKey, normalizedRoute);
+}
+
+
+// Return a pointer to the binding key of the leaf node that matches the binding pattern.
+TopicExchange::BindingKey*
+TopicExchange::BindingNode::getBindingKey(const std::string& normalizedRoute)
+{
+ TokenIterator bKey(normalizedRoute);
+ return getBindingKey(bKey);
+}
+
+
+// Delete the binding associated with the given route.
+void TopicExchange::BindingNode::removeBindingKey(const std::string& normalizedRoute)
+{
+ TokenIterator bKey2(normalizedRoute);
+ removeBindingKey(bKey2, normalizedRoute);
+}
+
+// visit each node in the tree. Note: all nodes are visited,
+// even non-leaf nodes (i.e. nodes without any bindings)
+bool TopicExchange::BindingNode::iterateAll(TopicExchange::BindingNode::TreeIterator& iter)
+{
+ if (!iter.visit(*this)) return false;
+
+ if (starChild && !starChild->iterateAll(iter)) return false;
+
+ if (hashChild && !hashChild->iterateAll(iter)) return false;
+
+ for (ChildMap::iterator ptr = childTokens.begin();
+ ptr != childTokens.end(); ptr++) {
+
+ if (!ptr->second->iterateAll(iter)) return false;
+ }
+
+ return true;
+}
+
+// applies iter against only matching nodes until iter returns false
+// Note Well: the iter may match against the same node more than once
+// if # wildcards are present!
+bool TopicExchange::BindingNode::iterateMatch(const std::string& routingKey, TreeIterator& iter)
+{
+ TopicExchange::TokenIterator rKey(routingKey);
+ return iterateMatch( rKey, iter );
+}
+
+
+// recurse over binding using token iterator.
+// Note well: bkey is modified!
+TopicExchange::BindingKey*
+TopicExchange::BindingNode::addBindingKey(TokenIterator &bKey,
+ const string& fullPattern)
+{
+ if (bKey.finished()) {
+ // this node's binding
+ if (routePattern.empty()) {
+ routePattern = fullPattern;
+ } else assert(routePattern == fullPattern);
+
+ return &bindings;
+
+ } else {
+ // pop the topmost token & recurse...
+
+ if (bKey.match(STAR)) {
+ if (!starChild) {
+ starChild.reset(new StarNode());
+ }
+ bKey.next();
+ return starChild->addBindingKey(bKey, fullPattern);
+
+ } else if (bKey.match(HASH)) {
+ if (!hashChild) {
+ hashChild.reset(new HashNode());
+ }
+ bKey.next();
+ return hashChild->addBindingKey(bKey, fullPattern);
+
+ } else {
+ ChildMap::iterator ptr;
+ std::string next_token;
+ bKey.pop(next_token);
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->addBindingKey(bKey, fullPattern);
+ } else {
+ BindingNode::shared_ptr child(new BindingNode(next_token));
+ childTokens[next_token] = child;
+ return child->addBindingKey(bKey, fullPattern);
+ }
+ }
+ }
+}
+
+
+// Remove a binding pattern from the tree. Return true if the current
+// node becomes a leaf without any bindings (therefore can be deleted).
+// Note Well: modifies parameter bKey's value!
+bool
+TopicExchange::BindingNode::removeBindingKey(TokenIterator &bKey,
+ const string& fullPattern)
+{
+ bool remove;
+
+ if (!bKey.finished()) {
+
+ if (bKey.match(STAR)) {
+ bKey.next();
+ if (starChild) {
+ remove = starChild->removeBindingKey(bKey, fullPattern);
+ if (remove) {
+ starChild.reset();
+ }
+ }
+ } else if (bKey.match(HASH)) {
+ bKey.next();
+ if (hashChild) {
+ remove = hashChild->removeBindingKey(bKey, fullPattern);
+ if (remove) {
+ hashChild.reset();
+ }
+ }
+ } else {
+ ChildMap::iterator ptr;
+ std::string next_token;
+ bKey.pop(next_token);
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ remove = ptr->second->removeBindingKey(bKey, fullPattern);
+ if (remove) {
+ childTokens.erase(ptr);
+ }
+ }
+ }
+ }
+
+ // no bindings and no children == parent can delete this node.
+ return getChildCount() == 0 && bindings.bindingVector.empty();
+}
+
+
+// find the binding key that matches the given binding pattern.
+// Note Well: modifies key parameter!
+TopicExchange::BindingKey*
+TopicExchange::BindingNode::getBindingKey(TokenIterator &key)
+{
+ if (key.finished()) {
+ return &bindings;
+ }
+
+ string next_token;
+
+ key.pop(next_token);
+
+ if (next_token == STAR) {
+ if (starChild)
+ return starChild->getBindingKey(key);
+ } else if (next_token == HASH) {
+ if (hashChild)
+ return hashChild->getBindingKey(key);
+ } else {
+ ChildMap::iterator ptr;
+ ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->getBindingKey(key);
+ }
+ }
+
+ return 0;
+}
+
+
+
+// iterate over all nodes that match the given key. Note well: the set of nodes
+// that are visited includes matching non-leaf nodes.
+// Note well: parameter key is modified!
+bool TopicExchange::BindingNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
+{
+ // invariant: key has matched all previous tokens up to this node.
+ if (key.finished()) {
+ // exact match this node: visit if bound
+ if (!bindings.bindingVector.empty())
+ if (!iter.visit(*this)) return false;
+ }
+
+ // check remaining key against children, even if empty.
+ return iterateMatchChildren(key, iter);
+}
+
+
+TopicExchange::StarNode::StarNode()
+ : BindingNode(STAR) {}
+
+
+// See iterateMatch() above.
+// Special case: this node must verify a token is available (match exactly one).
+bool TopicExchange::StarNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
+{
+ // must match one token:
+ if (key.finished())
+ return true; // match failed, but continue iteration on siblings
+
+ // pop the topmost token
+ key.next();
+
+ if (key.finished()) {
+ // exact match this node: visit if bound
+ if (!bindings.bindingVector.empty())
+ if (!iter.visit(*this)) return false;
+ }
+
+ return iterateMatchChildren(key, iter);
+}
+
+
+TopicExchange::HashNode::HashNode()
+ : BindingNode(HASH) {}
+
+
+// See iterateMatch() above.
+// Special case: can match zero or more tokens at the head of the key.
+bool TopicExchange::HashNode::iterateMatch(TokenIterator& key, TreeIterator& iter)
+{
+ // consume each token and look for a match on the
+ // remaining key.
+ while (!key.finished()) {
+ if (!iterateMatchChildren(key, iter)) return false;
+ key.next();
+ }
+
+ if (!bindings.bindingVector.empty())
+ return iter.visit(*this);
+
+ return true;
+}
+
+
+// helper: iterate over current node's matching children
+bool
+TopicExchange::BindingNode::iterateMatchChildren(const TopicExchange::TokenIterator& key,
+ TopicExchange::BindingNode::TreeIterator& iter)
+{
+ // always try glob - it can match empty keys
+ if (hashChild) {
+ TokenIterator tmp(key);
+ if (!hashChild->iterateMatch(tmp, iter))
+ return false;
+ }
+
+ if (!key.finished()) {
+
+ if (starChild) {
+ TokenIterator tmp(key);
+ if (!starChild->iterateMatch(tmp, iter))
+ return false;
+ }
+
+ if (!childTokens.empty()) {
+ TokenIterator newKey(key);
+ std::string next_token;
+ newKey.pop(next_token);
+
+ ChildMap::iterator ptr = childTokens.find(next_token);
+ if (ptr != childTokens.end()) {
+ return ptr->second->iterateMatch(newKey, iter);
+ }
+ }
+ }
+
+ return true;
+}
+
+}} // namespace qpid::broker