diff options
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/AddressHelper.cpp')
-rw-r--r-- | cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 258b4b84bd..f3a42b201b 100644 --- a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -21,6 +21,7 @@ #include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/AddressImpl.h" +#include "qpid/amqp/descriptors.h" #include "qpid/log/Statement.h" #include <vector> #include <set> @@ -64,6 +65,10 @@ const std::string DURABLE("durable"); const std::string NAME("name"); const std::string RELIABILITY("reliability"); const std::string SELECTOR("selector"); +const std::string FILTER("filter"); +const std::string DESCRIPTOR("descriptor"); +const std::string VALUE("value"); +const std::string SUBJECT_FILTER("subject-filter"); //distribution modes: const std::string MOVE("move"); @@ -100,7 +105,15 @@ pn_bytes_t convert(const std::string& s) result.size = s.size(); return result; } +bool hasWildcards(const std::string& key) +{ + return key.find('*') != std::string::npos || key.find('#') != std::string::npos; +} +uint64_t getFilterDescriptor(const std::string& key) +{ + return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; +} bool contains(const Variant::List& list, const std::string& item) { for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { @@ -183,6 +196,58 @@ void flatten(Variant::Map& base, const std::string& nested) base.erase(i); } } +void write(pn_data_t* data, const Variant& value); + +void write(pn_data_t* data, const Variant::Map& map) +{ + pn_data_put_map(data); + pn_data_enter(data); + for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + pn_data_put_string(data, convert(i->first)); + write(data, i->second); + } + pn_data_exit(data); +} +void write(pn_data_t* data, const Variant::List& list) +{ + pn_data_put_list(data); + pn_data_enter(data); + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + write(data, *i); + } + pn_data_exit(data); +} +void write(pn_data_t* data, const Variant& value) +{ + switch (value.getType()) { + case qpid::types::VAR_VOID: + pn_data_put_null(data); + break; + case qpid::types::VAR_BOOL: + pn_data_put_bool(data, value.asBool()); + break; + case qpid::types::VAR_UINT64: + pn_data_put_ulong(data, value.asUint64()); + break; + case qpid::types::VAR_INT64: + pn_data_put_long(data, value.asInt64()); + break; + case qpid::types::VAR_DOUBLE: + pn_data_put_double(data, value.asDouble()); + break; + case qpid::types::VAR_STRING: + pn_data_put_string(data, convert(value.asString())); + break; + case qpid::types::VAR_MAP: + write(data, value.asMap()); + break; + case qpid::types::VAR_LIST: + write(data, value.asList()); + break; + default: + break; + } +} } AddressHelper::AddressHelper(const Address& address) : @@ -241,6 +306,67 @@ AddressHelper::AddressHelper(const Address& address) : if (properties.size() && !(isTemporary || createPolicy.size())) { QPID_LOG(warning, "Properties will be ignored! " << address); } + + qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR); + if (selector != link.end()) { + addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second); + } + if (!address.getSubject().empty()) { + addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject()); + } + qpid::types::Variant::Map::const_iterator filter = link.find(FILTER); + if (filter != link.end()) { + if (filter->second.getType() == qpid::types::VAR_MAP) { + addFilter(filter->second.asMap()); + } else if (filter->second.getType() == qpid::types::VAR_LIST) { + addFilters(filter->second.asList()); + } else { + throw qpid::messaging::AddressError("Filter must be a map or a list of maps, each containing name, descriptor and value."); + } + } +} + +void AddressHelper::addFilters(const qpid::types::Variant::List& f) +{ + for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) { + addFilter(i->asMap()); + } +} + +void AddressHelper::addFilter(const qpid::types::Variant::Map& f) +{ + qpid::types::Variant::Map::const_iterator name = f.find(NAME); + qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR); + qpid::types::Variant::Map::const_iterator value = f.find(VALUE); + //all fields are required at present (may relax this at a later stage): + if (name == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify name"); + } + if (descriptor == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify descriptor"); + } + if (value == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify value"); + } + try { + addFilter(name->second.asString(), descriptor->second.asUint64(), value->second); + } catch (const qpid::types::InvalidConversion&) { + addFilter(name->second.asString(), descriptor->second.asString(), value->second); + } + +} + +AddressHelper::Filter::Filter() : descriptorCode(0){} +AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant& v) : name(n), descriptorCode(d), value(v) {} +AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant& v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v) {} + +void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value) +{ + filters.push_back(Filter(name, descriptor, value)); +} +void AddressHelper::addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value) +{ + filters.push_back(Filter(name, descriptor, value)); } void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) @@ -331,6 +457,26 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode) if (mode == FOR_RECEIVER && browse) { //when PROTON-139 is resolved, set the required delivery-mode } + //set filter(s): + if (mode == FOR_RECEIVER && !filters.empty()) { + pn_data_t* filter = pn_terminus_filter(terminus); + pn_data_put_map(filter); + pn_data_enter(filter); + for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) { + pn_data_put_symbol(filter, convert(i->name)); + pn_data_put_described(filter); + pn_data_enter(filter); + if (i->descriptorSymbol.size()) { + pn_data_put_symbol(filter, convert(i->descriptorSymbol)); + } else { + pn_data_put_ulong(filter, i->descriptorCode); + } + write(filter, i->value); + pn_data_exit(filter); + } + pn_data_exit(filter); + } + } void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) @@ -401,6 +547,7 @@ Verifier::Verifier() link[X_DECLARE] = true; link[X_BINDINGS] = true; link[SELECTOR] = true; + link[FILTER] = true; defined[LINK] = link; } void Verifier::verify(const Address& address) const |