diff options
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 781 |
1 files changed, 781 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp new file mode 100644 index 0000000000..8033cc5dee --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -0,0 +1,781 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PnData.h" +#include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" +#include <vector> +#include <set> +#include <sstream> +#include <boost/assign.hpp> +#include <boost/format.hpp> +extern "C" { +#include <proton/engine.h> +} + + +namespace qpid { +namespace messaging { +namespace amqp { + +using qpid::types::Variant; + +namespace { +//policy types +const std::string CREATE("create"); +const std::string ASSERT("assert"); +const std::string DELETE("delete"); + +//policy values +const std::string ALWAYS("always"); +const std::string NEVER("never"); +const std::string RECEIVER("receiver"); +const std::string SENDER("sender"); + +const std::string NODE("node"); +const std::string LINK("link"); +const std::string CAPABILITIES("capabilities"); +const std::string PROPERTIES("properties"); +const std::string MODE("mode"); +const std::string BROWSE("browse"); +const std::string CONSUME("consume"); +const std::string TIMEOUT("timeout"); + +const std::string TYPE("type"); +const std::string TOPIC("topic"); +const std::string QUEUE("queue"); +const std::string DURABLE("durable"); +const std::string NAME("name"); +const std::string RELIABILITY("reliability"); +const std::string SELECTOR("selector"); +const std::string FILTER("filter"); +const std::string DESCRIPTOR("descriptor"); +const std::string VALUE("value"); +const std::string SUBJECT_FILTER("subject-filter"); +const std::string SOURCE("sender-source"); +const std::string TARGET("receiver-target"); + +//reliability options: +const std::string UNRELIABLE("unreliable"); +const std::string AT_MOST_ONCE("at-most-once"); +const std::string AT_LEAST_ONCE("at-least-once"); +const std::string EXACTLY_ONCE("exactly-once"); + +//distribution modes: +const std::string MOVE("move"); +const std::string COPY("copy"); + +const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); +const std::string AUTO_DELETE("auto-delete"); +const std::string LIFETIME_POLICY("lifetime-policy"); +const std::string DELETE_ON_CLOSE("delete-on-close"); +const std::string DELETE_IF_UNUSED("delete-if-unused"); +const std::string DELETE_IF_EMPTY("delete-if-empty"); +const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty"); +const std::string CREATE_ON_DEMAND("create-on-demand"); + +const std::string X_DECLARE("x-declare"); +const std::string X_BINDINGS("x-bindings"); +const std::string X_SUBSCRIBE("x-subscribe"); +const std::string ARGUMENTS("arguments"); +const std::string EXCHANGE_TYPE("exchange-type"); + +const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER); +const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER); + +class Verifier +{ + public: + Verifier(); + void verify(const Address& address) const; + private: + Variant::Map defined; + void verify(const Variant::Map& allowed, const Variant::Map& actual) const; +}; +const Verifier verifier; + +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} + +std::string convert(pn_bytes_t in) +{ + return std::string(in.start, in.size); +} + +bool hasWildcards(const std::string& key) +{ + return key.find('*') != std::string::npos || key.find('#') != std::string::npos; +} + +uint64_t getFilterDescriptor(const std::string& key) +{ + return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; +} + +bool test(const Variant::Map& options, const std::string& name) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + return j->second; + } +} + +template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return defaultValue; + } else { + return j->second; + } +} + +bool bind(const Variant::Map& options, const std::string& name, std::string& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asString(); + return true; + } +} + +bool bind(const Variant::Map& options, const std::string& name, Variant::Map& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asMap(); + return true; + } +} + +bool bind(const Variant::Map& options, const std::string& name, Variant::List& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asList(); + return true; + } +} + +bool bind(const Address& address, const std::string& name, std::string& variable) +{ + return bind(address.getOptions(), name, variable); +} + +bool bind(const Address& address, const std::string& name, Variant::Map& variable) +{ + return bind(address.getOptions(), name, variable); +} + +bool in(const std::string& value, const std::vector<std::string>& choices) +{ + for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) { + if (value == *i) return true; + } + return false; +} +void add(Variant::Map& target, const Variant::Map& source) +{ + for (Variant::Map::const_iterator i = source.begin(); i != source.end(); ++i) { + target[i->first] = i->second; + } +} +void flatten(Variant::Map& base, const std::string& nested) +{ + Variant::Map::iterator i = base.find(nested); + if (i != base.end()) { + add(base, i->second.asMap()); + base.erase(i); + } +} +bool replace(Variant::Map& map, const std::string& original, const std::string& desired) +{ + Variant::Map::iterator i = map.find(original); + if (i != map.end()) { + map[desired] = i->second; + map.erase(original); + return true; + } else { + return false; + } +} + +const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes +const uint32_t DEFAULT_TIMEOUT(0); +} + +AddressHelper::AddressHelper(const Address& address) : + isTemporary(AddressImpl::isTemporary(address)), + name(address.getName()), + type(address.getType()), + durableNode(false), + durableLink(false), + timeout(0), + browse(false) +{ + verifier.verify(address); + bind(address, CREATE, createPolicy); + bind(address, DELETE, deletePolicy); + bind(address, ASSERT, assertPolicy); + + bind(address, NODE, node); + bind(address, LINK, link); + bind(node, PROPERTIES, properties); + bind(node, CAPABILITIES, capabilities); + bind(link, RELIABILITY, reliability); + durableNode = test(node, DURABLE); + durableLink = test(link, DURABLE); + timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); + std::string mode; + if (bind(address, MODE, mode)) { + if (mode == BROWSE) { + browse = true; + } else if (mode != CONSUME) { + throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'."); + } + } + + if (!deletePolicy.empty()) { + throw qpid::messaging::AddressError("Delete policies not supported over AMQP 1.0."); + } + if (node.find(X_BINDINGS) != node.end()) { + throw qpid::messaging::AddressError("Node scoped x-bindings element not supported over AMQP 1.0."); + } + if (link.find(X_BINDINGS) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-bindings element not supported over AMQP 1.0."); + } + if (link.find(X_SUBSCRIBE) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-subscribe element not supported over AMQP 1.0."); + } + if (link.find(X_DECLARE) != link.end()) { + throw qpid::messaging::AddressError("Link scoped x-declare element not supported over AMQP 1.0."); + } + //massage x-declare into properties + Variant::Map::iterator i = node.find(X_DECLARE); + if (i != node.end()) { + Variant::Map x_declare = i->second.asMap(); + replace(x_declare, TYPE, EXCHANGE_TYPE); + flatten(x_declare, ARGUMENTS); + add(properties, x_declare); + node.erase(i); + } + //for temp queues, if neither lifetime-policy nor autodelete are specified, assume delete-on-close + if (isTemporary && properties.find(LIFETIME_POLICY) == properties.end() && properties.find(AUTO_DELETE) == properties.end()) { + properties[LIFETIME_POLICY] = DELETE_ON_CLOSE; + } + + if (properties.size() && !(isTemporary || !createPolicy.empty() || !assertPolicy.empty())) { + QPID_LOG(warning, "Properties will be ignored! " << address); + } + + qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR); + if (selector != link.end()) { + addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second); + } + if (!address.getSubject().empty()) { + addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject()); + } + qpid::types::Variant::Map::const_iterator filter = link.find(FILTER); + if (filter != link.end()) { + if (filter->second.getType() == qpid::types::VAR_MAP) { + addFilter(filter->second.asMap()); + } else if (filter->second.getType() == qpid::types::VAR_LIST) { + addFilters(filter->second.asList()); + } else { + throw qpid::messaging::AddressError("Filter must be a map or a list of maps, each containing name, descriptor and value."); + } + } +} + +void AddressHelper::addFilters(const qpid::types::Variant::List& f) +{ + for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) { + addFilter(i->asMap()); + } +} + +void AddressHelper::addFilter(const qpid::types::Variant::Map& f) +{ + qpid::types::Variant::Map::const_iterator name = f.find(NAME); + qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR); + qpid::types::Variant::Map::const_iterator value = f.find(VALUE); + //all fields are required at present (may relax this at a later stage): + if (name == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify name"); + } + if (descriptor == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify descriptor"); + } + if (value == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify value"); + } + try { + addFilter(name->second.asString(), descriptor->second.asUint64(), value->second); + } catch (const qpid::types::InvalidConversion&) { + addFilter(name->second.asString(), descriptor->second.asString(), value->second); + } + +} + +AddressHelper::Filter::Filter() : descriptorCode(0), confirmed(false) {} +AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant& v) : name(n), descriptorCode(d), value(v), confirmed(false) {} +AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant& v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v), confirmed(false) {} + +void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value) +{ + filters.push_back(Filter(name, descriptor, value)); +} +void AddressHelper::addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value) +{ + filters.push_back(Filter(name, descriptor, value)); +} + +namespace { +bool checkLifetimePolicy(const std::string& requested, const std::string& actual) +{ + if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL && requested == DELETE_ON_CLOSE) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL && requested == DELETE_IF_UNUSED) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL && requested == DELETE_IF_EMPTY) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL && requested == DELETE_IF_UNUSED_AND_EMPTY) return true; + else return actual == requested; +} +bool checkLifetimePolicy(const std::string& requested, uint64_t actual) +{ + if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL); + else + return false; +} +bool checkLifetimePolicy(const std::string& requested, pn_data_t* actual) +{ + bool result(false); + if (pn_data_is_described(actual)) { + pn_data_enter(actual); + pn_data_next(actual); + if (pn_data_type(actual) == PN_ULONG) { + result = checkLifetimePolicy(requested, pn_data_get_ulong(actual)); + } else if (pn_data_type(actual) == PN_SYMBOL) { + result = checkLifetimePolicy(requested, convert(pn_data_get_symbol(actual))); + } + pn_data_exit(actual); + } + return result; +} +} +void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) +{ + if (assertEnabled(mode)) { + QPID_LOG(debug, "checking capabilities: " << capabilities); + //ensure all desired capabilities have been offered + std::set<std::string> desired; + for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { + if (*i != CREATE_ON_DEMAND) desired.insert(i->asString()); + } + pn_data_t* data = pn_terminus_capabilities(terminus); + if (pn_data_next(data)) { + pn_type_t type = pn_data_type(data); + if (type == PN_ARRAY) { + pn_data_enter(data); + while (pn_data_next(data)) { + desired.erase(convert(pn_data_get_symbol(data))); + } + pn_data_exit(data); + } else if (type == PN_SYMBOL) { + desired.erase(convert(pn_data_get_symbol(data))); + } else { + QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type)); + } + } + + if (desired.size()) { + std::stringstream missing; + missing << "Desired capabilities not met: "; + bool first(true); + for (std::set<std::string>::const_iterator i = desired.begin(); i != desired.end(); ++i) { + if (first) first = false; + else missing << ", "; + missing << *i; + } + throw qpid::messaging::AssertionFailed(missing.str()); + } + + //ensure all desired filters are in use + data = pn_terminus_filter(terminus); + if (pn_data_next(data)) { + size_t count = pn_data_get_map(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + //skip key: + if (!pn_data_next(data)) break; + //expecting described value: + if (pn_data_is_described(data)) { + pn_data_enter(data); + pn_data_next(data); + if (pn_data_type(data) == PN_ULONG) { + confirmFilter(pn_data_get_ulong(data)); + } else if (pn_data_type(data) == PN_SYMBOL) { + confirmFilter(convert(pn_data_get_symbol(data))); + } + pn_data_exit(data); + } + } + pn_data_exit(data); + } + std::stringstream missing; + missing << "Desired filters not in use: "; + bool first(true); + for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { + if (!i->confirmed) { + if (first) first = false; + else missing << ", "; + missing << i->name << "("; + if (i->descriptorSymbol.empty()) missing << "0x" << std::hex << i->descriptorCode; + else missing << i->descriptorSymbol; + missing << ")"; + } + } + if (!first) throw qpid::messaging::AssertionFailed(missing.str()); + + //assert on properties (Note: this violates the AMQP 1.0 + //specification - as does the create option - by sending + //node-properties even if the dynamic option is not + //set. However this can be avoided by not specifying any node + //properties when asserting) + if (!type.empty() || durableNode || !properties.empty()) { + bool isAutoDeleted = false; + qpid::types::Variant::Map requested = properties; + if (!type.empty()) requested[SUPPORTED_DIST_MODES] = type == TOPIC ? COPY : MOVE; + if (durableNode) requested[DURABLE] = true; + + data = pn_terminus_properties(terminus); + if (pn_data_next(data)) { + size_t count = pn_data_get_map(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + std::string key = convert(pn_data_get_symbol(data)); + pn_data_next(data); + qpid::types::Variant::Map::const_iterator j = requested.find(key); + qpid::types::Variant v; + if (key == LIFETIME_POLICY) { + isAutoDeleted = true; + if (j != requested.end() && checkLifetimePolicy(j->second.asString(), data)) { + requested.erase(j->first); + } + } else if (key == AUTO_DELETE) { + PnData(data).get(v); + isAutoDeleted = v.asBool(); + } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) { + requested.erase(j->first); + } + } + pn_data_exit(data); + qpid::types::Variant::Map::iterator i = requested.find(AUTO_DELETE); + if (i != requested.end() && i->second.asBool() == isAutoDeleted) { + requested.erase(i); + } + if (!requested.empty()) { + std::stringstream missing; + missing << "Requested node properties not met: " << requested; + throw qpid::messaging::AssertionFailed(missing.str()); + } + } + } + } +} + +void AddressHelper::confirmFilter(const std::string& descriptor) +{ + for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { + if (descriptor == i->descriptorSymbol) i->confirmed = true; + } +} + +void AddressHelper::confirmFilter(uint64_t descriptor) +{ + for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { + if (descriptor == i->descriptorCode) i->confirmed = true; + } +} + +bool AddressHelper::createEnabled(CheckMode mode) const +{ + return enabled(createPolicy, mode); +} + +bool AddressHelper::assertEnabled(CheckMode mode) const +{ + return enabled(assertPolicy, mode); +} + +bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const +{ + bool result = false; + switch (mode) { + case FOR_RECEIVER: + result = in(policy, RECEIVER_MODES); + break; + case FOR_SENDER: + result = in(policy, SENDER_MODES); + break; + } + return result; +} + +bool AddressHelper::isUnreliable() const +{ + return reliability == AT_MOST_ONCE || reliability == UNRELIABLE || + (reliability.empty() && browse); // A browser defaults to unreliable. +} + +const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const +{ + return node; +} +const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const +{ + return link; +} + +bool AddressHelper::getLinkSource(std::string& out) const +{ + return getLinkOption(SOURCE, out); +} + +bool AddressHelper::getLinkTarget(std::string& out) const +{ + return getLinkOption(TARGET, out); +} + +bool AddressHelper::getLinkOption(const std::string& name, std::string& out) const +{ + qpid::types::Variant::Map::const_iterator i = link.find(name); + if (i != link.end()) { + out = i->second.asString(); + return true; + } else { + return false; + } +} + +void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode) +{ + bool createOnDemand(false); + if (isTemporary) { + //application expects a name to be generated + pn_terminus_set_dynamic(terminus, true); + setNodeProperties(terminus); + } else { + pn_terminus_set_address(terminus, name.c_str()); + if (createEnabled(mode)) { + //application expects name of node to be as specified + setNodeProperties(terminus); + createOnDemand = true; + } else if (assertEnabled(mode)) { + setNodeProperties(terminus); + } + } + + setCapabilities(terminus, createOnDemand); + if (durableLink) { + pn_terminus_set_durability(terminus, PN_DELIVERIES); + } + if (mode == FOR_RECEIVER) { + if (timeout) pn_terminus_set_timeout(terminus, timeout); + if (browse) { + pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY); + } + //set filter(s): + if (!filters.empty()) { + pn_data_t* filter = pn_terminus_filter(terminus); + pn_data_put_map(filter); + pn_data_enter(filter); + for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) { + pn_data_put_symbol(filter, convert(i->name)); + pn_data_put_described(filter); + pn_data_enter(filter); + if (i->descriptorSymbol.size()) { + pn_data_put_symbol(filter, convert(i->descriptorSymbol)); + } else { + pn_data_put_ulong(filter, i->descriptorCode); + } + PnData(filter).put(i->value); + pn_data_exit(filter); + } + pn_data_exit(filter); + } + } + if (isUnreliable()) { + pn_link_set_snd_settle_mode(link, PN_SND_SETTLED); + } else if (!reliability.empty()) { + if (reliability == EXACTLY_ONCE ) { + QPID_LOG(warning, "Unsupported reliability mode: " << reliability); + } else if (reliability != AT_LEAST_ONCE ) { + QPID_LOG(warning, "Unrecognised reliability mode: " << reliability); + } + pn_link_set_snd_settle_mode(link, PN_SND_UNSETTLED); + } +} + +void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) +{ + if (create) capabilities.push_back(CREATE_ON_DEMAND); + if (!type.empty()) capabilities.push_back(type); + if (durableNode) capabilities.push_back(DURABLE); + + pn_data_t* data = pn_terminus_capabilities(terminus); + if (capabilities.size() == 1) { + pn_data_put_symbol(data, convert(capabilities.front().asString())); + } else if (capabilities.size() > 1) { + pn_data_put_array(data, false, PN_SYMBOL); + pn_data_enter(data); + for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { + pn_data_put_symbol(data, convert(i->asString())); + } + pn_data_exit(data); + } +} +std::string AddressHelper::getLinkName(const Address& address) +{ + AddressHelper helper(address); + const qpid::types::Variant::Map& linkProps = helper.getLinkProperties(); + qpid::types::Variant::Map::const_iterator i = linkProps.find(NAME); + if (i != linkProps.end()) { + return i->second.asString(); + } else { + std::stringstream name; + name << address.getName() << "_" << qpid::types::Uuid(true); + return name.str(); + } +} +namespace { +std::string toLifetimePolicy(const std::string& value) +{ + if (value == DELETE_ON_CLOSE) return qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL; + else if (value == DELETE_IF_UNUSED) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL; + else if (value == DELETE_IF_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL; + else if (value == DELETE_IF_UNUSED_AND_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL; + else return value;//asume value is itself the symbolic descriptor +} +void putLifetimePolicy(pn_data_t* data, const std::string& value) +{ + pn_data_put_described(data); + pn_data_enter(data); + pn_data_put_symbol(data, convert(value)); + pn_data_put_list(data); + pn_data_exit(data); +} +} +void AddressHelper::setNodeProperties(pn_terminus_t* terminus) +{ + if (properties.size() || type.size() || durableNode) { + pn_data_t* data = pn_terminus_properties(terminus); + pn_data_put_map(data); + pn_data_enter(data); + if (type.size()) { + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(type == TOPIC ? COPY : MOVE)); + } + if (durableNode) { + pn_data_put_symbol(data, convert(DURABLE)); + pn_data_put_bool(data, true); + } + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if (i->first == LIFETIME_POLICY) { + pn_data_put_symbol(data, convert(i->first)); + putLifetimePolicy(data, toLifetimePolicy(i->second.asString())); + } else { + pn_data_put_symbol(data, convert(i->first)); + PnData(data).put(i->second); + } + } + pn_data_exit(data); + } +} + +Verifier::Verifier() +{ + defined[CREATE] = true; + defined[ASSERT] = true; + defined[DELETE] = true; + defined[MODE] = true; + Variant::Map node; + node[TYPE] = true; + node[DURABLE] = true; + node[PROPERTIES] = true; + node[CAPABILITIES] = true; + node[X_DECLARE] = true; + node[X_BINDINGS] = true; + defined[NODE] = node; + Variant::Map link; + link[NAME] = true; + link[DURABLE] = true; + link[RELIABILITY] = true; + link[TIMEOUT] = true; + link[SOURCE] = true; + link[TARGET] = true; + link[X_SUBSCRIBE] = true; + link[X_DECLARE] = true; + link[X_BINDINGS] = true; + link[SELECTOR] = true; + link[FILTER] = true; + defined[LINK] = link; +} +void Verifier::verify(const Address& address) const +{ + verify(defined, address.getOptions()); +} + +void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const +{ + for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) { + Variant::Map::const_iterator option = allowed.find(i->first); + if (option == allowed.end()) { + throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str()); + } else if (option->second.getType() == qpid::types::VAR_MAP) { + verify(option->second.asMap(), i->second.asMap()); + } + } +} + +}}} // namespace qpid::messaging::amqp |