summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp781
1 files changed, 781 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
new file mode 100644
index 0000000000..8033cc5dee
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -0,0 +1,781 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PnData.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/AddressImpl.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/types/encodings.h"
+#include "qpid/log/Statement.h"
+#include <vector>
+#include <set>
+#include <sstream>
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+extern "C" {
+#include <proton/engine.h>
+}
+
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using qpid::types::Variant;
+
+namespace {
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
+
+const std::string NODE("node");
+const std::string LINK("link");
+const std::string CAPABILITIES("capabilities");
+const std::string PROPERTIES("properties");
+const std::string MODE("mode");
+const std::string BROWSE("browse");
+const std::string CONSUME("consume");
+const std::string TIMEOUT("timeout");
+
+const std::string TYPE("type");
+const std::string TOPIC("topic");
+const std::string QUEUE("queue");
+const std::string DURABLE("durable");
+const std::string NAME("name");
+const std::string RELIABILITY("reliability");
+const std::string SELECTOR("selector");
+const std::string FILTER("filter");
+const std::string DESCRIPTOR("descriptor");
+const std::string VALUE("value");
+const std::string SUBJECT_FILTER("subject-filter");
+const std::string SOURCE("sender-source");
+const std::string TARGET("receiver-target");
+
+//reliability options:
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
+
+//distribution modes:
+const std::string MOVE("move");
+const std::string COPY("copy");
+
+const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+const std::string AUTO_DELETE("auto-delete");
+const std::string LIFETIME_POLICY("lifetime-policy");
+const std::string DELETE_ON_CLOSE("delete-on-close");
+const std::string DELETE_IF_UNUSED("delete-if-unused");
+const std::string DELETE_IF_EMPTY("delete-if-empty");
+const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty");
+const std::string CREATE_ON_DEMAND("create-on-demand");
+
+const std::string X_DECLARE("x-declare");
+const std::string X_BINDINGS("x-bindings");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string ARGUMENTS("arguments");
+const std::string EXCHANGE_TYPE("exchange-type");
+
+const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
+const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
+
+class Verifier
+{
+ public:
+ Verifier();
+ void verify(const Address& address) const;
+ private:
+ Variant::Map defined;
+ void verify(const Variant::Map& allowed, const Variant::Map& actual) const;
+};
+const Verifier verifier;
+
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+
+std::string convert(pn_bytes_t in)
+{
+ return std::string(in.start, in.size);
+}
+
+bool hasWildcards(const std::string& key)
+{
+ return key.find('*') != std::string::npos || key.find('#') != std::string::npos;
+}
+
+uint64_t getFilterDescriptor(const std::string& key)
+{
+ return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
+}
+
+bool test(const Variant::Map& options, const std::string& name)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ return j->second;
+ }
+}
+
+template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return defaultValue;
+ } else {
+ return j->second;
+ }
+}
+
+bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asString();
+ return true;
+ }
+}
+
+bool bind(const Variant::Map& options, const std::string& name, Variant::Map& variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asMap();
+ return true;
+ }
+}
+
+bool bind(const Variant::Map& options, const std::string& name, Variant::List& variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asList();
+ return true;
+ }
+}
+
+bool bind(const Address& address, const std::string& name, std::string& variable)
+{
+ return bind(address.getOptions(), name, variable);
+}
+
+bool bind(const Address& address, const std::string& name, Variant::Map& variable)
+{
+ return bind(address.getOptions(), name, variable);
+}
+
+bool in(const std::string& value, const std::vector<std::string>& choices)
+{
+ for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+ if (value == *i) return true;
+ }
+ return false;
+}
+void add(Variant::Map& target, const Variant::Map& source)
+{
+ for (Variant::Map::const_iterator i = source.begin(); i != source.end(); ++i) {
+ target[i->first] = i->second;
+ }
+}
+void flatten(Variant::Map& base, const std::string& nested)
+{
+ Variant::Map::iterator i = base.find(nested);
+ if (i != base.end()) {
+ add(base, i->second.asMap());
+ base.erase(i);
+ }
+}
+bool replace(Variant::Map& map, const std::string& original, const std::string& desired)
+{
+ Variant::Map::iterator i = map.find(original);
+ if (i != map.end()) {
+ map[desired] = i->second;
+ map.erase(original);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes
+const uint32_t DEFAULT_TIMEOUT(0);
+}
+
+AddressHelper::AddressHelper(const Address& address) :
+ isTemporary(AddressImpl::isTemporary(address)),
+ name(address.getName()),
+ type(address.getType()),
+ durableNode(false),
+ durableLink(false),
+ timeout(0),
+ browse(false)
+{
+ verifier.verify(address);
+ bind(address, CREATE, createPolicy);
+ bind(address, DELETE, deletePolicy);
+ bind(address, ASSERT, assertPolicy);
+
+ bind(address, NODE, node);
+ bind(address, LINK, link);
+ bind(node, PROPERTIES, properties);
+ bind(node, CAPABILITIES, capabilities);
+ bind(link, RELIABILITY, reliability);
+ durableNode = test(node, DURABLE);
+ durableLink = test(link, DURABLE);
+ timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
+ std::string mode;
+ if (bind(address, MODE, mode)) {
+ if (mode == BROWSE) {
+ browse = true;
+ } else if (mode != CONSUME) {
+ throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'.");
+ }
+ }
+
+ if (!deletePolicy.empty()) {
+ throw qpid::messaging::AddressError("Delete policies not supported over AMQP 1.0.");
+ }
+ if (node.find(X_BINDINGS) != node.end()) {
+ throw qpid::messaging::AddressError("Node scoped x-bindings element not supported over AMQP 1.0.");
+ }
+ if (link.find(X_BINDINGS) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-bindings element not supported over AMQP 1.0.");
+ }
+ if (link.find(X_SUBSCRIBE) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-subscribe element not supported over AMQP 1.0.");
+ }
+ if (link.find(X_DECLARE) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-declare element not supported over AMQP 1.0.");
+ }
+ //massage x-declare into properties
+ Variant::Map::iterator i = node.find(X_DECLARE);
+ if (i != node.end()) {
+ Variant::Map x_declare = i->second.asMap();
+ replace(x_declare, TYPE, EXCHANGE_TYPE);
+ flatten(x_declare, ARGUMENTS);
+ add(properties, x_declare);
+ node.erase(i);
+ }
+ //for temp queues, if neither lifetime-policy nor autodelete are specified, assume delete-on-close
+ if (isTemporary && properties.find(LIFETIME_POLICY) == properties.end() && properties.find(AUTO_DELETE) == properties.end()) {
+ properties[LIFETIME_POLICY] = DELETE_ON_CLOSE;
+ }
+
+ if (properties.size() && !(isTemporary || !createPolicy.empty() || !assertPolicy.empty())) {
+ QPID_LOG(warning, "Properties will be ignored! " << address);
+ }
+
+ qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR);
+ if (selector != link.end()) {
+ addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second);
+ }
+ if (!address.getSubject().empty()) {
+ addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject());
+ }
+ qpid::types::Variant::Map::const_iterator filter = link.find(FILTER);
+ if (filter != link.end()) {
+ if (filter->second.getType() == qpid::types::VAR_MAP) {
+ addFilter(filter->second.asMap());
+ } else if (filter->second.getType() == qpid::types::VAR_LIST) {
+ addFilters(filter->second.asList());
+ } else {
+ throw qpid::messaging::AddressError("Filter must be a map or a list of maps, each containing name, descriptor and value.");
+ }
+ }
+}
+
+void AddressHelper::addFilters(const qpid::types::Variant::List& f)
+{
+ for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) {
+ addFilter(i->asMap());
+ }
+}
+
+void AddressHelper::addFilter(const qpid::types::Variant::Map& f)
+{
+ qpid::types::Variant::Map::const_iterator name = f.find(NAME);
+ qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR);
+ qpid::types::Variant::Map::const_iterator value = f.find(VALUE);
+ //all fields are required at present (may relax this at a later stage):
+ if (name == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify name");
+ }
+ if (descriptor == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify descriptor");
+ }
+ if (value == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify value");
+ }
+ try {
+ addFilter(name->second.asString(), descriptor->second.asUint64(), value->second);
+ } catch (const qpid::types::InvalidConversion&) {
+ addFilter(name->second.asString(), descriptor->second.asString(), value->second);
+ }
+
+}
+
+AddressHelper::Filter::Filter() : descriptorCode(0), confirmed(false) {}
+AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant& v) : name(n), descriptorCode(d), value(v), confirmed(false) {}
+AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant& v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v), confirmed(false) {}
+
+void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value)
+{
+ filters.push_back(Filter(name, descriptor, value));
+}
+void AddressHelper::addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value)
+{
+ filters.push_back(Filter(name, descriptor, value));
+}
+
+namespace {
+bool checkLifetimePolicy(const std::string& requested, const std::string& actual)
+{
+ if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL && requested == DELETE_ON_CLOSE) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL && requested == DELETE_IF_UNUSED) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL && requested == DELETE_IF_EMPTY) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL && requested == DELETE_IF_UNUSED_AND_EMPTY) return true;
+ else return actual == requested;
+}
+bool checkLifetimePolicy(const std::string& requested, uint64_t actual)
+{
+ if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL);
+ else
+ return false;
+}
+bool checkLifetimePolicy(const std::string& requested, pn_data_t* actual)
+{
+ bool result(false);
+ if (pn_data_is_described(actual)) {
+ pn_data_enter(actual);
+ pn_data_next(actual);
+ if (pn_data_type(actual) == PN_ULONG) {
+ result = checkLifetimePolicy(requested, pn_data_get_ulong(actual));
+ } else if (pn_data_type(actual) == PN_SYMBOL) {
+ result = checkLifetimePolicy(requested, convert(pn_data_get_symbol(actual)));
+ }
+ pn_data_exit(actual);
+ }
+ return result;
+}
+}
+void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
+{
+ if (assertEnabled(mode)) {
+ QPID_LOG(debug, "checking capabilities: " << capabilities);
+ //ensure all desired capabilities have been offered
+ std::set<std::string> desired;
+ for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+ if (*i != CREATE_ON_DEMAND) desired.insert(i->asString());
+ }
+ pn_data_t* data = pn_terminus_capabilities(terminus);
+ if (pn_data_next(data)) {
+ pn_type_t type = pn_data_type(data);
+ if (type == PN_ARRAY) {
+ pn_data_enter(data);
+ while (pn_data_next(data)) {
+ desired.erase(convert(pn_data_get_symbol(data)));
+ }
+ pn_data_exit(data);
+ } else if (type == PN_SYMBOL) {
+ desired.erase(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
+ }
+ }
+
+ if (desired.size()) {
+ std::stringstream missing;
+ missing << "Desired capabilities not met: ";
+ bool first(true);
+ for (std::set<std::string>::const_iterator i = desired.begin(); i != desired.end(); ++i) {
+ if (first) first = false;
+ else missing << ", ";
+ missing << *i;
+ }
+ throw qpid::messaging::AssertionFailed(missing.str());
+ }
+
+ //ensure all desired filters are in use
+ data = pn_terminus_filter(terminus);
+ if (pn_data_next(data)) {
+ size_t count = pn_data_get_map(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ //skip key:
+ if (!pn_data_next(data)) break;
+ //expecting described value:
+ if (pn_data_is_described(data)) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ if (pn_data_type(data) == PN_ULONG) {
+ confirmFilter(pn_data_get_ulong(data));
+ } else if (pn_data_type(data) == PN_SYMBOL) {
+ confirmFilter(convert(pn_data_get_symbol(data)));
+ }
+ pn_data_exit(data);
+ }
+ }
+ pn_data_exit(data);
+ }
+ std::stringstream missing;
+ missing << "Desired filters not in use: ";
+ bool first(true);
+ for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) {
+ if (!i->confirmed) {
+ if (first) first = false;
+ else missing << ", ";
+ missing << i->name << "(";
+ if (i->descriptorSymbol.empty()) missing << "0x" << std::hex << i->descriptorCode;
+ else missing << i->descriptorSymbol;
+ missing << ")";
+ }
+ }
+ if (!first) throw qpid::messaging::AssertionFailed(missing.str());
+
+ //assert on properties (Note: this violates the AMQP 1.0
+ //specification - as does the create option - by sending
+ //node-properties even if the dynamic option is not
+ //set. However this can be avoided by not specifying any node
+ //properties when asserting)
+ if (!type.empty() || durableNode || !properties.empty()) {
+ bool isAutoDeleted = false;
+ qpid::types::Variant::Map requested = properties;
+ if (!type.empty()) requested[SUPPORTED_DIST_MODES] = type == TOPIC ? COPY : MOVE;
+ if (durableNode) requested[DURABLE] = true;
+
+ data = pn_terminus_properties(terminus);
+ if (pn_data_next(data)) {
+ size_t count = pn_data_get_map(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ std::string key = convert(pn_data_get_symbol(data));
+ pn_data_next(data);
+ qpid::types::Variant::Map::const_iterator j = requested.find(key);
+ qpid::types::Variant v;
+ if (key == LIFETIME_POLICY) {
+ isAutoDeleted = true;
+ if (j != requested.end() && checkLifetimePolicy(j->second.asString(), data)) {
+ requested.erase(j->first);
+ }
+ } else if (key == AUTO_DELETE) {
+ PnData(data).get(v);
+ isAutoDeleted = v.asBool();
+ } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) {
+ requested.erase(j->first);
+ }
+ }
+ pn_data_exit(data);
+ qpid::types::Variant::Map::iterator i = requested.find(AUTO_DELETE);
+ if (i != requested.end() && i->second.asBool() == isAutoDeleted) {
+ requested.erase(i);
+ }
+ if (!requested.empty()) {
+ std::stringstream missing;
+ missing << "Requested node properties not met: " << requested;
+ throw qpid::messaging::AssertionFailed(missing.str());
+ }
+ }
+ }
+ }
+}
+
+void AddressHelper::confirmFilter(const std::string& descriptor)
+{
+ for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) {
+ if (descriptor == i->descriptorSymbol) i->confirmed = true;
+ }
+}
+
+void AddressHelper::confirmFilter(uint64_t descriptor)
+{
+ for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) {
+ if (descriptor == i->descriptorCode) i->confirmed = true;
+ }
+}
+
+bool AddressHelper::createEnabled(CheckMode mode) const
+{
+ return enabled(createPolicy, mode);
+}
+
+bool AddressHelper::assertEnabled(CheckMode mode) const
+{
+ return enabled(assertPolicy, mode);
+}
+
+bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const
+{
+ bool result = false;
+ switch (mode) {
+ case FOR_RECEIVER:
+ result = in(policy, RECEIVER_MODES);
+ break;
+ case FOR_SENDER:
+ result = in(policy, SENDER_MODES);
+ break;
+ }
+ return result;
+}
+
+bool AddressHelper::isUnreliable() const
+{
+ return reliability == AT_MOST_ONCE || reliability == UNRELIABLE ||
+ (reliability.empty() && browse); // A browser defaults to unreliable.
+}
+
+const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
+{
+ return node;
+}
+const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const
+{
+ return link;
+}
+
+bool AddressHelper::getLinkSource(std::string& out) const
+{
+ return getLinkOption(SOURCE, out);
+}
+
+bool AddressHelper::getLinkTarget(std::string& out) const
+{
+ return getLinkOption(TARGET, out);
+}
+
+bool AddressHelper::getLinkOption(const std::string& name, std::string& out) const
+{
+ qpid::types::Variant::Map::const_iterator i = link.find(name);
+ if (i != link.end()) {
+ out = i->second.asString();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode)
+{
+ bool createOnDemand(false);
+ if (isTemporary) {
+ //application expects a name to be generated
+ pn_terminus_set_dynamic(terminus, true);
+ setNodeProperties(terminus);
+ } else {
+ pn_terminus_set_address(terminus, name.c_str());
+ if (createEnabled(mode)) {
+ //application expects name of node to be as specified
+ setNodeProperties(terminus);
+ createOnDemand = true;
+ } else if (assertEnabled(mode)) {
+ setNodeProperties(terminus);
+ }
+ }
+
+ setCapabilities(terminus, createOnDemand);
+ if (durableLink) {
+ pn_terminus_set_durability(terminus, PN_DELIVERIES);
+ }
+ if (mode == FOR_RECEIVER) {
+ if (timeout) pn_terminus_set_timeout(terminus, timeout);
+ if (browse) {
+ pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY);
+ }
+ //set filter(s):
+ if (!filters.empty()) {
+ pn_data_t* filter = pn_terminus_filter(terminus);
+ pn_data_put_map(filter);
+ pn_data_enter(filter);
+ for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
+ pn_data_put_symbol(filter, convert(i->name));
+ pn_data_put_described(filter);
+ pn_data_enter(filter);
+ if (i->descriptorSymbol.size()) {
+ pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+ } else {
+ pn_data_put_ulong(filter, i->descriptorCode);
+ }
+ PnData(filter).put(i->value);
+ pn_data_exit(filter);
+ }
+ pn_data_exit(filter);
+ }
+ }
+ if (isUnreliable()) {
+ pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+ } else if (!reliability.empty()) {
+ if (reliability == EXACTLY_ONCE ) {
+ QPID_LOG(warning, "Unsupported reliability mode: " << reliability);
+ } else if (reliability != AT_LEAST_ONCE ) {
+ QPID_LOG(warning, "Unrecognised reliability mode: " << reliability);
+ }
+ pn_link_set_snd_settle_mode(link, PN_SND_UNSETTLED);
+ }
+}
+
+void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
+{
+ if (create) capabilities.push_back(CREATE_ON_DEMAND);
+ if (!type.empty()) capabilities.push_back(type);
+ if (durableNode) capabilities.push_back(DURABLE);
+
+ pn_data_t* data = pn_terminus_capabilities(terminus);
+ if (capabilities.size() == 1) {
+ pn_data_put_symbol(data, convert(capabilities.front().asString()));
+ } else if (capabilities.size() > 1) {
+ pn_data_put_array(data, false, PN_SYMBOL);
+ pn_data_enter(data);
+ for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+ pn_data_put_symbol(data, convert(i->asString()));
+ }
+ pn_data_exit(data);
+ }
+}
+std::string AddressHelper::getLinkName(const Address& address)
+{
+ AddressHelper helper(address);
+ const qpid::types::Variant::Map& linkProps = helper.getLinkProperties();
+ qpid::types::Variant::Map::const_iterator i = linkProps.find(NAME);
+ if (i != linkProps.end()) {
+ return i->second.asString();
+ } else {
+ std::stringstream name;
+ name << address.getName() << "_" << qpid::types::Uuid(true);
+ return name.str();
+ }
+}
+namespace {
+std::string toLifetimePolicy(const std::string& value)
+{
+ if (value == DELETE_ON_CLOSE) return qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL;
+ else if (value == DELETE_IF_UNUSED) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL;
+ else if (value == DELETE_IF_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL;
+ else if (value == DELETE_IF_UNUSED_AND_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL;
+ else return value;//asume value is itself the symbolic descriptor
+}
+void putLifetimePolicy(pn_data_t* data, const std::string& value)
+{
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, convert(value));
+ pn_data_put_list(data);
+ pn_data_exit(data);
+}
+}
+void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
+{
+ if (properties.size() || type.size() || durableNode) {
+ pn_data_t* data = pn_terminus_properties(terminus);
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ if (type.size()) {
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(type == TOPIC ? COPY : MOVE));
+ }
+ if (durableNode) {
+ pn_data_put_symbol(data, convert(DURABLE));
+ pn_data_put_bool(data, true);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if (i->first == LIFETIME_POLICY) {
+ pn_data_put_symbol(data, convert(i->first));
+ putLifetimePolicy(data, toLifetimePolicy(i->second.asString()));
+ } else {
+ pn_data_put_symbol(data, convert(i->first));
+ PnData(data).put(i->second);
+ }
+ }
+ pn_data_exit(data);
+ }
+}
+
+Verifier::Verifier()
+{
+ defined[CREATE] = true;
+ defined[ASSERT] = true;
+ defined[DELETE] = true;
+ defined[MODE] = true;
+ Variant::Map node;
+ node[TYPE] = true;
+ node[DURABLE] = true;
+ node[PROPERTIES] = true;
+ node[CAPABILITIES] = true;
+ node[X_DECLARE] = true;
+ node[X_BINDINGS] = true;
+ defined[NODE] = node;
+ Variant::Map link;
+ link[NAME] = true;
+ link[DURABLE] = true;
+ link[RELIABILITY] = true;
+ link[TIMEOUT] = true;
+ link[SOURCE] = true;
+ link[TARGET] = true;
+ link[X_SUBSCRIBE] = true;
+ link[X_DECLARE] = true;
+ link[X_BINDINGS] = true;
+ link[SELECTOR] = true;
+ link[FILTER] = true;
+ defined[LINK] = link;
+}
+void Verifier::verify(const Address& address) const
+{
+ verify(defined, address.getOptions());
+}
+
+void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const
+{
+ for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) {
+ Variant::Map::const_iterator option = allowed.find(i->first);
+ if (option == allowed.end()) {
+ throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str());
+ } else if (option->second.getType() == qpid::types::VAR_MAP) {
+ verify(option->second.asMap(), i->second.asMap());
+ }
+ }
+}
+
+}}} // namespace qpid::messaging::amqp