diff options
-rw-r--r-- | qpid/cpp/include/qpid/messaging/Address.h | 76 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 27 |
2 files changed, 94 insertions, 9 deletions
diff --git a/qpid/cpp/include/qpid/messaging/Address.h b/qpid/cpp/include/qpid/messaging/Address.h index eaf5a02782..aa7d366088 100644 --- a/qpid/cpp/include/qpid/messaging/Address.h +++ b/qpid/cpp/include/qpid/messaging/Address.h @@ -51,12 +51,80 @@ class AddressImpl; * All parts of an address can be specified in a string of the * following form: * + * <pre> * <address> [ / <subject> ] ; [ { <key> : <value> , ... } ] + * </pre> * - * Here the <address> is a simple name for the addressed entity and - * <subject> is a subject or subject pattern for messages sent to or - * received from this address. The options are specified as a series - * of key value pairs enclosed in curly brackets (denoting a map). + * Here the <pre><address></pre> is a simple name for the addressed + * entity and <pre><subject></pre> is a subject or subject pattern for + * messages sent to or received from this address. The options are + * specified as a series of key value pairs enclosed in curly brackets + * (denoting a map). Values can be nested maps, or lists (which are + * denoted as a comma separated list of values inside square brackets, + * e.g. [a, b, c]). + * + * The currently supported options are as follows: + * + * create - whether the address should be automatically created or + * not. Can be one of always, never, sender or receiver; the + * properties of the node to be created can be specified via the + * node-properties option. + * + * assert - whether or not to assert any specified node-properties + * match the address. Can be one of always, never, sender or receiver. + * + * delete - whether or not to delete the addressed nide when a sender + * or receiver is cancelled. Can be one of always, never, sender or + * receiver. + * + * node-properties - a nested map of properties of the addressed + * entity or 'node'. These can be used when automatically creating it, + * or to assert certain properties. + * + * The valid node-properties are: + * + * type - queue or topic + * + * durable - true or false + * + * x-properties - a nested map that can contain implementation or + * protocol specifiec extedned properties. For the amqp 0-10 mapping, + * the fields in queue- or exchange- declare can be specified in here; + * anything that is not recognised as one of those will be passed + * through in the arguments field. + * + * For receivers there are some further options of interest: + * + * no-local - (only relevant for topics at present) specifies that the + * receiver does not want to receiver messages published to the topic + * that originate from a sender on the same connection + * + * browse - (only relevant for queues) specifies that the receiver + * does not wish to consume the messages, but merely browse them + * + * durable - (only relevant for topics at present) specifies that a + * durable subscription is required + * + * reliability - indicates the level of reliability that the receiver + * expects. Can be one of unreliable, at-most-once, at-least-once or + * exactly-once (the latter is not yet correctly supported). + * + * filter - (only relevant for topics at present) allows bindings to + * be created for the queue that match the given criteris (or list of + * criteria). + * + * x-properties - allows protocol or implementation specific options + * to be specified for a receiver; this is a nested map and currently + * the implementation only recognises two specific nested properties + * within it (all others are passed through in the arguments of the + * message-subscribe command): + * + * exclusive, which requests an exclusive subscription and + * is only relevant for queues + * + * x-queue-arguments, which ais only relevant for topics and + * allows arguments to the queue-declare for the subscription + * queue to be specified */ class Address { diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 89cc053ff7..6cc7fcc587 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -179,7 +179,7 @@ class QueueSource : public Queue, public MessageSource private: const AcceptMode acceptMode; const AcquireMode acquireMode; - const bool exclusive; + bool exclusive; FieldTable options; }; @@ -344,10 +344,19 @@ QueueSource::QueueSource(const Address& address) : Queue(address), acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT), acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED), - exclusive(getNestedOption(address.getOptions(), list_of<std::string>(X_PROPERTIES)(EXCLUSIVE)).asBool()) + exclusive(false) { //extract subscription arguments from address options - convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options); + const Variant& x = address.getOption(X_PROPERTIES); + if (!x.isVoid()) { + const Variant::Map& xProps = x.asMap(); + Variant::Map passthrough; + for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) { + if (i->first == xamqp::EXCLUSIVE) exclusive = i->second; + else passthrough[i->first] = i->second; + } + translate(passthrough, options); + } } void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination) @@ -384,8 +393,16 @@ Subscription::Subscription(const Address& address, const std::string& exchangeTy durable(address.getOption(DURABLE_SUBSCRIPTION).asBool()) { if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1); - convert(address.getOption(xamqp::QUEUE_ARGUMENTS), queueOptions); - convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), subscriptionOptions); + const Variant& x = address.getOption(X_PROPERTIES); + if (!x.isVoid()) { + const Variant::Map& xProps = x.asMap(); + Variant::Map passthrough; + for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) { + if (i->first == xamqp::QUEUE_ARGUMENTS) convert(i->second.asMap(), queueOptions); + else passthrough[i->first] = i->second; + } + translate(passthrough, subscriptionOptions); + } const Variant& filter = address.getOption(FILTER); if (!filter.isVoid()) { |