diff options
Diffstat (limited to 'M4-RCs/qpid/cpp/src/qpid/broker/TopicExchange.cpp')
-rw-r--r-- | M4-RCs/qpid/cpp/src/qpid/broker/TopicExchange.cpp | 313 |
1 files changed, 0 insertions, 313 deletions
diff --git a/M4-RCs/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/M4-RCs/qpid/cpp/src/qpid/broker/TopicExchange.cpp deleted file mode 100644 index d4f9721162..0000000000 --- a/M4-RCs/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ /dev/null @@ -1,313 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "TopicExchange.h" -#include <algorithm> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; -namespace _qmf = qmf::org::apache::qpid::broker; - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// Areas for improvement: -// - excessive string copying: should be 0 copy, match from original buffer. -// - match/lookup: use descision tree or other more efficient structure. - -namespace -{ -const std::string qpidFedOp("qpid.fed.op"); -const std::string qpidFedTags("qpid.fed.tags"); -const std::string qpidFedOrigin("qpid.fed.origin"); - -const std::string fedOpBind("B"); -const std::string fedOpUnbind("U"); -const std::string fedOpReorigin("R"); -const std::string fedOpHello("H"); -} - -Tokens& Tokens::operator=(const std::string& s) { - clear(); - if (s.empty()) return *this; - std::string::const_iterator i = s.begin(); - while (true) { - // Invariant: i is at the beginning of the next untokenized word. - std::string::const_iterator j = std::find(i, s.end(), '.'); - push_back(std::string(i, j)); - if (j == s.end()) return *this; - i = j + 1; - } - return *this; -} - -TopicPattern& TopicPattern::operator=(const Tokens& tokens) { - Tokens::operator=(tokens); - normalize(); - return *this; -} - -void Tokens::key(string& keytext) const -{ - for (std::vector<string>::const_iterator iter = begin(); iter != end(); iter++) { - if (iter != begin()) - keytext += "."; - keytext += *iter; - } -} - -namespace { -const std::string hashmark("#"); -const std::string star("*"); -} - -void TopicPattern::normalize() { - std::string word; - Tokens::iterator i = begin(); - while (i != end()) { - if (*i == hashmark) { - ++i; - while (i != end()) { - // Invariant: *(i-1)==#, [begin()..i-1] is normalized. - if (*i == star) { // Move * before #. - std::swap(*i, *(i-1)); - ++i; - } else if (*i == hashmark) { - erase(i); // Remove extra # - } else { - break; - } - } - } else { - i ++; - } - } -} - - -namespace { -// TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string. -// Need StringRef class that operates on a string in place witout copy. -// Should be applied everywhere strings are extracted from frames. -// -bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) -{ - // Invariant: [pattern_begin..p) matches [target_begin..t) - Tokens::const_iterator p = pattern_begin; - Tokens::const_iterator t = target_begin; - while (p != pattern_end && t != target_end) - { - if (*p == star || *p == *t) { - ++p, ++t; - } else if (*p == hashmark) { - ++p; - if (do_match(p, pattern_end, t, target_end)) return true; - while (t != target_end) { - ++t; - if (do_match(p, pattern_end, t, target_end)) return true; - } - return false; - } else { - return false; - } - } - while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # - return t == target_end && p == pattern_end; -} -} - -bool TopicPattern::match(const Tokens& target) const -{ - return do_match(begin(), end(), target.begin(), target.end()); -} - -TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) -{ - if (mgmtExchange != 0) - mgmtExchange->set_type (typeName); -} - -TopicExchange::TopicExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) -{ - if (mgmtExchange != 0) - mgmtExchange->set_type (typeName); -} - -bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) -{ - string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); - string fedTags(args ? args->getAsString(qpidFedTags) : ""); - string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); - bool propagate = false; - bool reallyUnbind; - TopicPattern routingPattern(routingKey); - - if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { - RWlock::ScopedWlock l(lock); - if (isBound(queue, routingPattern)) { - return false; - } else { - Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin)); - BoundKey& bk = bindings[routingPattern]; - bk.bindingVector.push_back(binding); - propagate = bk.fedBinding.addOrigin(fedOrigin); - if (mgmtExchange != 0) { - mgmtExchange->inc_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount(); - } - } - } else if (fedOp == fedOpUnbind) { - { - RWlock::ScopedWlock l(lock); - BoundKey& bk = bindings[routingPattern]; - propagate = bk.fedBinding.delOrigin(fedOrigin); - reallyUnbind = bk.fedBinding.count() == 0; - } - if (reallyUnbind) - unbind(queue, routingKey, 0); - } else if (fedOp == fedOpReorigin) { - for (std::map<TopicPattern, BoundKey>::iterator iter = bindings.begin(); - iter != bindings.end(); iter++) { - const BoundKey& bk = iter->second; - if (bk.fedBinding.hasLocal()) { - string propKey; - iter->first.key(propKey); - propagateFedOp(propKey, string(), fedOpBind, string()); - } - } - } - - routeIVE(); - if (propagate) - propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); - return true; -} - -bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - RWlock::ScopedWlock l(lock); - BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); - if (bi == bindings.end()) return false; - BoundKey& bk = bi->second; - Binding::vector& qv(bk.bindingVector); - bool propagate = false; - - Binding::vector::iterator q; - for (q = qv.begin(); q != qv.end(); q++) - if ((*q)->queue == queue) - break; - if(q == qv.end()) return false; - qv.erase(q); - propagate = bk.fedBinding.delOrigin(); - if(qv.empty()) bindings.erase(bi); - if (mgmtExchange != 0) { - mgmtExchange->dec_bindingCount(); - ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount(); - } - - if (propagate) - propagateFedOp(routingKey, string(), fedOpUnbind, string()); - return true; -} - -bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) -{ - BindingMap::iterator bi = bindings.find(pattern); - if (bi == bindings.end()) return false; - Binding::vector& qv(bi->second.bindingVector); - Binding::vector::iterator q; - for (q = qv.begin(); q != qv.end(); q++) - if ((*q)->queue == queue) - break; - return q != qv.end(); -} - -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - Binding::vector mb; - PreRoute pr(msg, this); - uint32_t count(0); - - { - RWlock::ScopedRlock l(lock); - Tokens tokens(routingKey); - - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (i->first.match(tokens)) { - Binding::vector& qv(i->second.bindingVector); - for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ - mb.push_back(*j); - } - } - } - } - - for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) { - msg.deliverTo((*j)->queue); - if ((*j)->mgmtBinding != 0) - (*j)->mgmtBinding->inc_msgMatched (); - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } -} - -bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) -{ - if (routingKey && queue) { - TopicPattern key(*routingKey); - return isBound(queue, key); - } else if (!routingKey && !queue) { - return bindings.size() > 0; - } else if (routingKey) { - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (i->first.match(*routingKey)) { - return true; - } - } - } else { - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - Binding::vector& qv(i->second.bindingVector); - Binding::vector::iterator q; - for (q = qv.begin(); q != qv.end(); q++) - if ((*q)->queue == queue) - return true; - } - } - return false; -} - -TopicExchange::~TopicExchange() {} - -const std::string TopicExchange::typeName("topic"); - - |