summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp966
1 files changed, 966 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
new file mode 100644
index 0000000000..f1295a3b66
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -0,0 +1,966 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/amqp0_10/AddressResolution.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/client/amqp0_10/MessageSource.h"
+#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/amqp0_10/OutgoingMessage.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/types/Variant.h"
+#include "qpid/messaging/exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/ExchangeBoundResult.h"
+#include "qpid/framing/ExchangeQueryResult.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/QueueQueryResult.h"
+#include "qpid/framing/ReplyTo.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::Exception;
+using qpid::messaging::Address;
+using qpid::messaging::AddressError;
+using qpid::messaging::MalformedAddress;
+using qpid::messaging::ResolutionError;
+using qpid::messaging::NotFound;
+using qpid::messaging::AssertionFailed;
+using qpid::framing::ExchangeBoundResult;
+using qpid::framing::ExchangeQueryResult;
+using qpid::framing::FieldTable;
+using qpid::framing::QueueQueryResult;
+using qpid::framing::ReplyTo;
+using qpid::framing::Uuid;
+using namespace qpid::types;
+using namespace qpid::framing::message;
+using namespace qpid::amqp_0_10;
+using namespace boost::assign;
+
+class Verifier
+{
+ public:
+ Verifier();
+ void verify(const Address& address) const;
+ private:
+ Variant::Map defined;
+ void verify(const Variant::Map& allowed, const Variant::Map& actual) const;
+};
+
+namespace{
+const Variant EMPTY_VARIANT;
+const FieldTable EMPTY_FIELD_TABLE;
+const Variant::List EMPTY_LIST;
+const std::string EMPTY_STRING;
+
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+
+//option names
+const std::string NODE("node");
+const std::string LINK("link");
+const std::string MODE("mode");
+const std::string RELIABILITY("reliability");
+const std::string NAME("name");
+const std::string DURABLE("durable");
+const std::string X_DECLARE("x-declare");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string X_BINDINGS("x-bindings");
+const std::string EXCHANGE("exchange");
+const std::string QUEUE("queue");
+const std::string KEY("key");
+const std::string ARGUMENTS("arguments");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string TYPE("type");
+const std::string EXCLUSIVE("exclusive");
+const std::string AUTO_DELETE("auto-delete");
+
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
+
+//address types
+const std::string QUEUE_ADDRESS("queue");
+const std::string TOPIC_ADDRESS("topic");
+
+//reliability options:
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
+
+//receiver modes:
+const std::string BROWSE("browse");
+const std::string CONSUME("consume");
+
+//0-10 exchange types:
+const std::string TOPIC_EXCHANGE("topic");
+const std::string FANOUT_EXCHANGE("fanout");
+const std::string DIRECT_EXCHANGE("direct");
+const std::string HEADERS_EXCHANGE("headers");
+const std::string XML_EXCHANGE("xml");
+const std::string WILDCARD_ANY("#");
+
+const Verifier verifier;
+}
+
+struct Binding
+{
+ Binding(const Variant::Map&);
+ Binding(const std::string& exchange, const std::string& queue, const std::string& key);
+
+ std::string exchange;
+ std::string queue;
+ std::string key;
+ FieldTable arguments;
+};
+
+struct Bindings : std::vector<Binding>
+{
+ void add(const Variant::List& bindings);
+ void setDefaultExchange(const std::string&);
+ void setDefaultQueue(const std::string&);
+ void bind(qpid::client::AsyncSession& session);
+ void unbind(qpid::client::AsyncSession& session);
+ void check(qpid::client::AsyncSession& session);
+};
+
+class Node
+{
+ protected:
+ enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+
+ Node(const Address& address);
+
+ const std::string name;
+ Variant createPolicy;
+ Variant assertPolicy;
+ Variant deletePolicy;
+ Bindings nodeBindings;
+ Bindings linkBindings;
+
+ static bool enabled(const Variant& policy, CheckMode mode);
+ static bool createEnabled(const Address& address, CheckMode mode);
+ static void convert(const Variant& option, FieldTable& arguments);
+ static std::vector<std::string> RECEIVER_MODES;
+ static std::vector<std::string> SENDER_MODES;
+};
+
+
+class Queue : protected Node
+{
+ public:
+ Queue(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ private:
+ const bool durable;
+ const bool autoDelete;
+ const bool exclusive;
+ const std::string alternateExchange;
+ FieldTable arguments;
+};
+
+class Exchange : protected Node
+{
+ public:
+ Exchange(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+
+ protected:
+ const std::string specifiedType;
+ private:
+ const bool durable;
+ const bool autoDelete;
+ const std::string alternateExchange;
+ FieldTable arguments;
+};
+
+class QueueSource : public Queue, public MessageSource
+{
+ public:
+ QueueSource(const Address& address);
+ void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+ void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+ private:
+ const AcceptMode acceptMode;
+ const AcquireMode acquireMode;
+ bool exclusive;
+ FieldTable options;
+};
+
+class Subscription : public Exchange, public MessageSource
+{
+ public:
+ Subscription(const Address&, const std::string& actualType);
+ void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
+ void cancel(qpid::client::AsyncSession& session, const std::string& destination);
+ private:
+ const std::string queue;
+ const bool reliable;
+ const bool durable;
+ const std::string actualType;
+ FieldTable queueOptions;
+ FieldTable subscriptionOptions;
+ Bindings bindings;
+
+ void bindSubject(const std::string& subject);
+ void bindAll();
+ void add(const std::string& exchange, const std::string& key);
+ static std::string getSubscriptionName(const std::string& base, const std::string& name);
+};
+
+class ExchangeSink : public Exchange, public MessageSink
+{
+ public:
+ ExchangeSink(const Address& name);
+ void declare(qpid::client::AsyncSession& session, const std::string& name);
+ void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
+ void cancel(qpid::client::AsyncSession& session, const std::string& name);
+ private:
+};
+
+class QueueSink : public Queue, public MessageSink
+{
+ public:
+ QueueSink(const Address& name);
+ void declare(qpid::client::AsyncSession& session, const std::string& name);
+ void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
+ void cancel(qpid::client::AsyncSession& session, const std::string& name);
+ private:
+};
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address);
+
+bool in(const Variant& value, const std::vector<std::string>& choices)
+{
+ if (!value.isVoid()) {
+ for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+ if (value.asString() == *i) return true;
+ }
+ }
+ return false;
+}
+
+const Variant& getOption(const Variant::Map& options, const std::string& name)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return EMPTY_VARIANT;
+ } else {
+ return j->second;
+ }
+}
+
+const Variant& getOption(const Address& address, const std::string& name)
+{
+ return getOption(address.getOptions(), name);
+}
+
+bool getReceiverPolicy(const Address& address, const std::string& key)
+{
+ return in(getOption(address, key), list_of<std::string>(ALWAYS)(RECEIVER));
+}
+
+bool getSenderPolicy(const Address& address, const std::string& key)
+{
+ return in(getOption(address, key), list_of<std::string>(ALWAYS)(SENDER));
+}
+
+struct Opt
+{
+ Opt(const Address& address);
+ Opt(const Variant::Map& base);
+ Opt& operator/(const std::string& name);
+ operator bool() const;
+ std::string str() const;
+ const Variant::List& asList() const;
+ void collect(qpid::framing::FieldTable& args) const;
+
+ const Variant::Map* options;
+ const Variant* value;
+};
+
+Opt::Opt(const Address& address) : options(&(address.getOptions())), value(0) {}
+Opt::Opt(const Variant::Map& base) : options(&base), value(0) {}
+Opt& Opt::operator/(const std::string& name)
+{
+ if (options) {
+ Variant::Map::const_iterator j = options->find(name);
+ if (j == options->end()) {
+ value = 0;
+ options = 0;
+ } else {
+ value = &(j->second);
+ if (value->getType() == VAR_MAP) options = &(value->asMap());
+ else options = 0;
+ }
+ }
+ return *this;
+}
+
+
+Opt::operator bool() const
+{
+ return value && !value->isVoid() && value->asBool();
+}
+
+std::string Opt::str() const
+{
+ if (value) return value->asString();
+ else return EMPTY_STRING;
+}
+
+const Variant::List& Opt::asList() const
+{
+ if (value) return value->asList();
+ else return EMPTY_LIST;
+}
+
+void Opt::collect(qpid::framing::FieldTable& args) const
+{
+ if (value) {
+ translate(value->asMap(), args);
+ }
+}
+
+bool AddressResolution::is_unreliable(const Address& address)
+{
+
+ return in((Opt(address)/LINK/RELIABILITY).str(),
+ list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+}
+
+bool AddressResolution::is_reliable(const Address& address)
+{
+ return in((Opt(address)/LINK/RELIABILITY).str(),
+ list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
+}
+
+std::string checkAddressType(qpid::client::Session session, const Address& address)
+{
+ verifier.verify(address);
+ if (address.getName().empty()) {
+ throw MalformedAddress("Name cannot be null");
+ }
+ std::string type = (Opt(address)/NODE/TYPE).str();
+ if (type.empty()) {
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name; treat it as a queue
+ type = QUEUE_ADDRESS;
+ } else if (result.getExchangeNotFound()) {
+ //name refers to a queue
+ type = QUEUE_ADDRESS;
+ } else if (result.getQueueNotFound()) {
+ //name refers to an exchange
+ type = TOPIC_ADDRESS;
+ } else {
+ //both a queue and exchange exist for that name
+ throw ResolutionError("Ambiguous address, please specify queue or topic as node type");
+ }
+ }
+ return type;
+}
+
+std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
+ const Address& address)
+{
+ std::string type = checkAddressType(session, address);
+ if (type == TOPIC_ADDRESS) {
+ std::string exchangeType = sync(session).exchangeQuery(address.getName()).getType();
+ std::auto_ptr<MessageSource> source(new Subscription(address, exchangeType));
+ QPID_LOG(debug, "treating source address as topic: " << address);
+ return source;
+ } else if (type == QUEUE_ADDRESS) {
+ std::auto_ptr<MessageSource> source(new QueueSource(address));
+ QPID_LOG(debug, "treating source address as queue: " << address);
+ return source;
+ } else {
+ throw ResolutionError("Unrecognised type: " + type);
+ }
+}
+
+
+std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
+ const qpid::messaging::Address& address)
+{
+ std::string type = checkAddressType(session, address);
+ if (type == TOPIC_ADDRESS) {
+ std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
+ QPID_LOG(debug, "treating target address as topic: " << address);
+ return sink;
+ } else if (type == QUEUE_ADDRESS) {
+ std::auto_ptr<MessageSink> sink(new QueueSink(address));
+ QPID_LOG(debug, "treating target address as queue: " << address);
+ return sink;
+ } else {
+ throw ResolutionError("Unrecognised type: " + type);
+ }
+}
+
+bool isBrowse(const Address& address)
+{
+ const Variant& mode = getOption(address, MODE);
+ if (!mode.isVoid()) {
+ std::string value = mode.asString();
+ if (value == BROWSE) return true;
+ else if (value != CONSUME) throw ResolutionError("Invalid mode");
+ }
+ return false;
+}
+
+QueueSource::QueueSource(const Address& address) :
+ Queue(address),
+ acceptMode(AddressResolution::is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+ acquireMode(isBrowse(address) ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
+ exclusive(false)
+{
+ //extract subscription arguments from address options (nb: setting
+ //of accept-mode/acquire-mode/destination controlled though other
+ //options)
+ exclusive = Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE;
+ (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options);
+}
+
+void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
+ linkBindings.bind(session);
+ session.messageSubscribe(arg::queue=name,
+ arg::destination=destination,
+ arg::acceptMode=acceptMode,
+ arg::acquireMode=acquireMode,
+ arg::exclusive=exclusive,
+ arg::arguments=options);
+}
+
+void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ linkBindings.unbind(session);
+ session.messageCancel(destination);
+ checkDelete(session, FOR_RECEIVER);
+}
+
+std::string Subscription::getSubscriptionName(const std::string& base, const std::string& name)
+{
+ if (name.empty()) {
+ return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
+ } else {
+ return (boost::format("%1%_%2%") % base % name).str();
+ }
+}
+
+Subscription::Subscription(const Address& address, const std::string& type)
+ : Exchange(address),
+ queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
+ reliable(AddressResolution::is_reliable(address)),
+ durable(Opt(address)/LINK/DURABLE),
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
+{
+ (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
+ (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
+
+ if (!address.getSubject().empty()) bindSubject(address.getSubject());
+ else if (linkBindings.empty()) bindAll();
+}
+
+void Subscription::bindSubject(const std::string& subject)
+{
+ if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, subject);
+ b.arguments.setString("qpid.subject", subject);
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else if (actualType == XML_EXCHANGE) {
+ Binding b(name, queue, subject);
+ std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
+ % subject).str();
+ b.arguments.setString("xquery", query);
+ bindings.push_back(b);
+ } else {
+ //Note: the fanout exchange doesn't support any filtering, so
+ //the subject is ignored in that case
+ add(name, subject);
+ }
+}
+
+void Subscription::bindAll()
+{
+ if (actualType == TOPIC_EXCHANGE) {
+ add(name, WILDCARD_ANY);
+ } else if (actualType == FANOUT_EXCHANGE) {
+ add(name, queue);
+ } else if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, "match-all");
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else if (actualType == XML_EXCHANGE) {
+ Binding b(name, queue, EMPTY_STRING);
+ b.arguments.setString("xquery", "true()");
+ bindings.push_back(b);
+ } else {
+ add(name, EMPTY_STRING);
+ }
+}
+
+void Subscription::add(const std::string& exchange, const std::string& key)
+{
+ bindings.push_back(Binding(exchange, queue, key));
+}
+
+void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ //create exchange if required and specified by policy:
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
+
+ //create subscription queue:
+ session.queueDeclare(arg::queue=queue, arg::exclusive=true,
+ arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
+ //'default' binding:
+ bindings.bind(session);
+ //any explicit bindings:
+ linkBindings.setDefaultQueue(queue);
+ linkBindings.bind(session);
+ //subscribe to subscription queue:
+ AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+}
+
+void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
+{
+ linkBindings.unbind(session);
+ session.messageCancel(destination);
+ session.queueDelete(arg::queue=queue);
+ checkDelete(session, FOR_RECEIVER);
+}
+
+ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {}
+
+void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
+}
+
+void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
+{
+ m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
+ m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
+}
+
+void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ linkBindings.unbind(session);
+ checkDelete(session, FOR_SENDER);
+}
+
+QueueSink::QueueSink(const Address& address) : Queue(address) {}
+
+void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
+}
+void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
+{
+ m.message.getDeliveryProperties().setRoutingKey(name);
+ m.status = session.messageTransfer(arg::content=m.message);
+}
+
+void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ linkBindings.unbind(session);
+ checkDelete(session, FOR_SENDER);
+}
+
+Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
+{
+ Address address;
+ if (rt.getExchange().empty()) {//if default exchange, treat as queue
+ address.setName(rt.getRoutingKey());
+ address.setType(QUEUE_ADDRESS);
+ } else {
+ address.setName(rt.getExchange());
+ address.setSubject(rt.getRoutingKey());
+ address.setType(TOPIC_ADDRESS);
+ }
+ return address;
+}
+
+qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
+{
+ if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
+ return ReplyTo(EMPTY_STRING, address.getName());
+ } else if (address.getType() == TOPIC_ADDRESS) {
+ return ReplyTo(address.getName(), address.getSubject());
+ } else {
+ QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType());
+ return ReplyTo(EMPTY_STRING, address.getName());//treat as queue
+ }
+}
+
+bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
+{
+ return address.getType() == QUEUE_ADDRESS ||
+ (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
+}
+
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address)
+{
+ if (address.getType().empty()) {
+ return !session.exchangeQuery(address.getName()).getNotFound();
+ } else if (address.getType() == TOPIC_ADDRESS) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+Node::Node(const Address& address) : name(address.getName()),
+ createPolicy(getOption(address, CREATE)),
+ assertPolicy(getOption(address, ASSERT)),
+ deletePolicy(getOption(address, DELETE))
+{
+ nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList());
+ linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList());
+}
+
+Queue::Queue(const Address& a) : Node(a),
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ exclusive(Opt(a)/NODE/X_DECLARE/EXCLUSIVE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultQueue(name);
+ linkBindings.setDefaultQueue(name);
+}
+
+void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ QPID_LOG(debug, "Auto-creating queue '" << name << "'");
+ try {
+ session.queueDeclare(arg::queue=name,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::exclusive=exclusive,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ nodeBindings.bind(session);
+ session.sync();
+ } catch (const qpid::framing::ResourceLockedException& e) {
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotAllowedException& e) {
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {//may be thrown when creating bindings
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+ } catch (const qpid::framing::NotFoundException& /*e*/) {
+ throw NotFound((boost::format("Queue %1% does not exist") % name).str());
+ }
+ }
+}
+
+void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ //Note: queue-delete will cause a session exception if the queue
+ //does not exist, the query here prevents obvious cases of this
+ //but there is a race whenever two deletions are made concurrently
+ //so careful use of the delete policy is recommended at present
+ if (enabled(deletePolicy, mode) && sync(session).queueQuery(name).getQueue() == name) {
+ QPID_LOG(debug, "Auto-deleting queue '" << name << "'");
+ sync(session).queueDelete(arg::queue=name);
+ }
+}
+
+void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ QueueQueryResult result = sync(session).queueQuery(name);
+ if (result.getQueue() != name) {
+ throw NotFound((boost::format("Queue not found: %1%") % name).str());
+ } else {
+ if (durable && !result.getDurable()) {
+ throw AssertionFailed((boost::format("Queue not durable: %1%") % name).str());
+ }
+ if (autoDelete && !result.getAutoDelete()) {
+ throw AssertionFailed((boost::format("Queue not set to auto-delete: %1%") % name).str());
+ }
+ if (exclusive && !result.getExclusive()) {
+ throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str());
+ }
+ if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
+ throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ % name % alternateExchange % result.getAlternateExchange()).str());
+ }
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (*i->second != *v) {
+ throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ nodeBindings.check(session);
+ }
+ }
+}
+
+Exchange::Exchange(const Address& a) : Node(a),
+ specifiedType((Opt(a)/NODE/X_DECLARE/TYPE).str()),
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultExchange(name);
+ linkBindings.setDefaultExchange(name);
+}
+
+void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ try {
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
+ session.exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ nodeBindings.bind(session);
+ session.sync();
+ } catch (const qpid::framing::NotAllowedException& e) {
+ throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {//can be caused when creating bindings
+ throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } catch (const qpid::framing::NotFoundException& /*e*/) {
+ throw NotFound((boost::format("Exchange %1% does not exist") % name).str());
+ }
+ }
+}
+
+void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ //Note: exchange-delete will cause a session exception if the
+ //exchange does not exist, the query here prevents obvious cases
+ //of this but there is a race whenever two deletions are made
+ //concurrently so careful use of the delete policy is recommended
+ //at present
+ if (enabled(deletePolicy, mode) && !sync(session).exchangeQuery(name).getNotFound()) {
+ sync(session).exchangeDelete(arg::exchange=name);
+ }
+}
+
+void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ ExchangeQueryResult result = sync(session).exchangeQuery(name);
+ if (result.getNotFound()) {
+ throw NotFound((boost::format("Exchange not found: %1%") % name).str());
+ } else {
+ if (specifiedType.size() && result.getType() != specifiedType) {
+ throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ % name % specifiedType % result.getType()).str());
+ }
+ if (durable && !result.getDurable()) {
+ throw AssertionFailed((boost::format("Exchange not durable: %1%") % name).str());
+ }
+ //Note: Can't check auto-delete or alternate-exchange via
+ //exchange-query-result as these are not returned
+ //TODO: could use a passive declare to check alternate-exchange
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (i->second != v) {
+ throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ nodeBindings.check(session);
+ }
+ }
+}
+
+Binding::Binding(const Variant::Map& b) :
+ exchange((Opt(b)/EXCHANGE).str()),
+ queue((Opt(b)/QUEUE).str()),
+ key((Opt(b)/KEY).str())
+{
+ (Opt(b)/ARGUMENTS).collect(arguments);
+}
+
+Binding::Binding(const std::string& e, const std::string& q, const std::string& k) : exchange(e), queue(q), key(k) {}
+
+
+void Bindings::add(const Variant::List& list)
+{
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ push_back(Binding(i->asMap()));
+ }
+}
+
+void Bindings::setDefaultExchange(const std::string& exchange)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->exchange.empty()) i->exchange = exchange;
+ }
+}
+
+void Bindings::setDefaultQueue(const std::string& queue)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->queue.empty()) i->queue = queue;
+ }
+}
+
+void Bindings::bind(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeBind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key,
+ arg::arguments=i->arguments);
+ }
+}
+
+void Bindings::unbind(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeUnbind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ }
+}
+
+void Bindings::check(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
+ throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ % i->exchange % i->queue % i->key).str());
+ }
+ }
+}
+
+bool Node::enabled(const Variant& policy, CheckMode mode)
+{
+ bool result = false;
+ switch (mode) {
+ case FOR_RECEIVER:
+ result = in(policy, RECEIVER_MODES);
+ break;
+ case FOR_SENDER:
+ result = in(policy, SENDER_MODES);
+ break;
+ }
+ return result;
+}
+
+bool Node::createEnabled(const Address& address, CheckMode mode)
+{
+ const Variant& policy = getOption(address, CREATE);
+ return enabled(policy, mode);
+}
+
+void Node::convert(const Variant& options, FieldTable& arguments)
+{
+ if (!options.isVoid()) {
+ translate(options.asMap(), arguments);
+ }
+}
+std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
+std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
+
+Verifier::Verifier()
+{
+ defined[CREATE] = true;
+ defined[ASSERT] = true;
+ defined[DELETE] = true;
+ defined[MODE] = true;
+ Variant::Map node;
+ node[TYPE] = true;
+ node[DURABLE] = true;
+ node[X_DECLARE] = true;
+ node[X_BINDINGS] = true;
+ defined[NODE] = node;
+ Variant::Map link;
+ link[NAME] = true;
+ link[DURABLE] = true;
+ link[RELIABILITY] = true;
+ link[X_SUBSCRIBE] = true;
+ link[X_DECLARE] = true;
+ link[X_BINDINGS] = true;
+ defined[LINK] = link;
+}
+void Verifier::verify(const Address& address) const
+{
+ verify(defined, address.getOptions());
+}
+
+void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const
+{
+ for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) {
+ Variant::Map::const_iterator option = allowed.find(i->first);
+ if (option == allowed.end()) {
+ throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str());
+ } else if (option->second.getType() == qpid::types::VAR_MAP) {
+ verify(option->second.asMap(), i->second.asMap());
+ }
+ }
+}
+
+}}} // namespace qpid::client::amqp0_10