diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/HeadersExchange.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/HeadersExchange.cpp | 130 |
1 files changed, 99 insertions, 31 deletions
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 9975d26c72..02c05852ff 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/HeadersExchange.h" +#include "qpid/broker/MapHandler.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" @@ -55,6 +56,100 @@ namespace { const std::string fedOpUnbind("U"); const std::string fedOpReorigin("R"); const std::string fedOpHello("H"); + +std::string getMatch(const FieldTable* args) +{ + if (!args) { + throw InternalErrorException(QPID_MSG("No arguments given.")); + } + FieldTable::ValuePtr what = args->get(x_match); + if (!what) { + return empty; + } + if (!what->convertsTo<std::string>()) { + throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]")); + } + return what->get<std::string>(); +} +class Matcher : public MapHandler +{ + public: + Matcher(const FieldTable& b) : binding(b), matched(0) {} + void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { processUint(std::string(key.data, key.size), value); } + void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { processUint(std::string(key.data, key.size), value); } + void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { processUint(std::string(key.data, key.size), value); } + void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { processUint(std::string(key.data, key.size), value); } + void handleInt8(const MapHandler::CharSequence& key, int8_t value) { processInt(std::string(key.data, key.size), value); } + void handleInt16(const MapHandler::CharSequence& key, int16_t value) { processInt(std::string(key.data, key.size), value); } + void handleInt32(const MapHandler::CharSequence& key, int32_t value) { processInt(std::string(key.data, key.size), value); } + void handleInt64(const MapHandler::CharSequence& key, int64_t value) { processInt(std::string(key.data, key.size), value); } + void handleFloat(const MapHandler::CharSequence& key, float value) { processFloat(std::string(key.data, key.size), value); } + void handleDouble(const MapHandler::CharSequence& key, double value) { processFloat(std::string(key.data, key.size), value); } + void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/) + { + processString(std::string(key.data, key.size), std::string(value.data, value.size)); + } + void handleVoid(const MapHandler::CharSequence& key) + { + valueCheckRequired(std::string(key.data, key.size)); + } + bool matches() + { + std::string what = getMatch(&binding); + if (what == all) { + //must match all entries in the binding, except the match mode indicator + return matched == binding.size() - 1; + } else if (what == any) { + //match any of the entries in the binding + return matched > 0; + } else { + return false; + } + } + private: + bool valueCheckRequired(const std::string& key) + { + FieldTable::ValuePtr v = binding.get(key); + if (v) { + if (v->getType() == 0xf0/*VOID*/) { + ++matched; + return false; + } else { + return true; + } + } else { + return false; + } + } + + void processString(const std::string& key, const std::string& actual) + { + if (valueCheckRequired(key) && binding.getAsString(key) == actual) { + ++matched; + } + } + void processFloat(const std::string& key, double actual) + { + double bound; + if (valueCheckRequired(key) && binding.getDouble(key, bound) && bound == actual) { + ++matched; + } + } + void processInt(const std::string& key, int64_t actual) + { + if (valueCheckRequired(key) && binding.getAsInt64(key) == actual) { + ++matched; + } + } + void processUint(const std::string& key, uint64_t actual) + { + if (valueCheckRequired(key) && binding.getAsUInt64(key) == actual) { + ++matched; + } + } + const FieldTable& binding; + size_t matched; +}; } HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) : @@ -72,21 +167,6 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, mgmtExchange->set_type (typeName); } -std::string HeadersExchange::getMatch(const FieldTable* args) -{ - if (!args) { - throw InternalErrorException(QPID_MSG("No arguments given.")); - } - FieldTable::ValuePtr what = args->get(x_match); - if (!what) { - return empty; - } - if (!what->convertsTo<std::string>()) { - throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]")); - } - return what->get<std::string>(); -} - bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args) { string fedOp(fedOpBind); @@ -196,28 +276,16 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, void HeadersExchange::route(Deliverable& msg) { - const FieldTable* args = msg.getMessage().getApplicationHeaders(); - if (!args) { - //can't match if there were no headers passed in - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsNoRoute(); - } - return; - } - PreRoute pr(msg, this); BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); Bindings::ConstPtr p = bindings.snapshot(); if (p.get()) { for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { - if (match((*i).binding->args, *args)) { - b->push_back((*i).binding); + Matcher matcher(i->binding->args); + msg.getMessage().processProperties(matcher); + if (matcher.matches()) { + b->push_back(i->binding); } } } |