diff options
author | Alan Conway <aconway@apache.org> | 2006-09-27 19:50:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-09-27 19:50:23 +0000 |
commit | caca23c5dc055d985fecfe188573104bc707ad9d (patch) | |
tree | 154c0bbd4c7bca70080de28116b5654491657906 /cpp/broker/src | |
parent | 9d718c2348708b0b27ce9fb9fcbf05c4b0a997cc (diff) | |
download | qpid-python-caca23c5dc055d985fecfe188573104bc707ad9d.tar.gz |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450556 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src')
-rw-r--r-- | cpp/broker/src/HeadersExchange.cpp | 119 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerFactoryImpl.cpp | 3 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 8 |
3 files changed, 129 insertions, 1 deletions
diff --git a/cpp/broker/src/HeadersExchange.cpp b/cpp/broker/src/HeadersExchange.cpp new file mode 100644 index 0000000000..03a029ea4d --- /dev/null +++ b/cpp/broker/src/HeadersExchange.cpp @@ -0,0 +1,119 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "HeadersExchange.h" +#include "ExchangeBinding.h" +#include "Value.h" +#include <algorithm> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +// TODO aconway 2006-09-20: More efficient matching algorithm. +// The current search algorithm really sucks. +// Fieldtables are heavy, maybe use shared_ptr to do handle-body. + +namespace qpid { +namespace broker { + +namespace { +const std::string all("all"); +const std::string any("any"); +const std::string x_match("x-match"); +} + +HeadersExchange::HeadersExchange(const string& name) : Exchange(name) { } + +void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + std::cout << "HeadersExchange::bind" << std::endl; + Locker locker(lock); + std::string what = args->getString("x-match"); + // TODO aconway 2006-09-26: throw an exception for invalid bindings. + if (what != all && what != any) return; // Invalid. + bindings.push_back(Binding(*args, queue)); + queue->bound(new ExchangeBinding(this, queue, routingKey, args)); +} + +void HeadersExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ + Locker locker(lock);; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (i->first == *args) { + bindings.erase(i); + } + } +} + + +void HeadersExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){ + std::cout << "route: " << *args << std::endl; + Locker locker(lock);; + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, *args)) i->second->deliver(msg); + } +} + +HeadersExchange::~HeadersExchange() {} + +const std::string HeadersExchange::typeName("headers"); +namespace +{ + +bool match_values(const Value& bind, const Value& msg) { + return dynamic_cast<const EmptyValue*>(&bind) || bind == msg; +} + +} + + +bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { + typedef FieldTable::ValueMap Map; + std::string what = bind.getString(x_match); + if (what == all) { + for (Map::const_iterator i = bind.getMap().begin(); + i != bind.getMap().end(); + ++i) + { + if (i->first != x_match) + { + Map::const_iterator j = msg.getMap().find(i->first); + if (j == msg.getMap().end()) return false; + if (!match_values(*(i->second), *(j->second))) return false; + } + } + return true; + } else if (what == any) { + for (Map::const_iterator i = bind.getMap().begin(); + i != bind.getMap().end(); + ++i) + { + if (i->first != x_match) + { + Map::const_iterator j = msg.getMap().find(i->first); + if (j != msg.getMap().end()) { + if (match_values(*(i->second), *(j->second))) return true; + } + } + } + return false; + } else { + return false; + } +} + +}} + diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp index 280e89c475..39c627afef 100644 --- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp +++ b/cpp/broker/src/SessionHandlerFactoryImpl.cpp @@ -18,6 +18,7 @@ #include "SessionHandlerFactoryImpl.h" #include "SessionHandlerImpl.h" #include "FanOutExchange.h" +#include "HeadersExchange.h" using namespace qpid::broker; using namespace qpid::io; @@ -28,6 +29,7 @@ const std::string empty; const std::string amq_direct("amq.direct"); const std::string amq_topic("amq.topic"); const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); } SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){ @@ -35,6 +37,7 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeo exchanges.declare(new DirectExchange(amq_direct)); exchanges.declare(new TopicExchange(amq_topic)); exchanges.declare(new FanOutExchange(amq_fanout)); + exchanges.declare(new HeadersExchange(amq_match)); cleaner.start(); } diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 2ce1c4b298..eb8f37030c 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -18,6 +18,8 @@ #include <iostream> #include "SessionHandlerImpl.h" #include "FanOutExchange.h" +#include "TopicExchange.h" +#include "HeadersExchange.h" #include "assert.h" using namespace std::tr1; @@ -223,7 +225,9 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 if(!passive && ( type != TopicExchange::typeName && type != DirectExchange::typeName && - type != FanOutExchange::typeName) + type != FanOutExchange::typeName && + type != HeadersExchange::typeName + ) ) { throw ChannelException(540, "Exchange type not implemented: " + type); @@ -237,6 +241,8 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 parent->exchanges->declare(new DirectExchange(exchange)); }else if(type == FanOutExchange::typeName){ parent->exchanges->declare(new DirectExchange(exchange)); + }else if (type == HeadersExchange::typeName) { + parent->exchanges->declare(new HeadersExchange(exchange)); } } parent->exchanges->getLock()->release(); |