summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/include/qpid/messaging/Address.h76
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp27
2 files changed, 94 insertions, 9 deletions
diff --git a/qpid/cpp/include/qpid/messaging/Address.h b/qpid/cpp/include/qpid/messaging/Address.h
index eaf5a02782..aa7d366088 100644
--- a/qpid/cpp/include/qpid/messaging/Address.h
+++ b/qpid/cpp/include/qpid/messaging/Address.h
@@ -51,12 +51,80 @@ class AddressImpl;
* All parts of an address can be specified in a string of the
* following form:
*
+ * <pre>
* <address> [ / <subject> ] ; [ { <key> : <value> , ... } ]
+ * </pre>
*
- * Here the <address> is a simple name for the addressed entity and
- * <subject> is a subject or subject pattern for messages sent to or
- * received from this address. The options are specified as a series
- * of key value pairs enclosed in curly brackets (denoting a map).
+ * Here the <pre><address></pre> is a simple name for the addressed
+ * entity and <pre><subject></pre> is a subject or subject pattern for
+ * messages sent to or received from this address. The options are
+ * specified as a series of key value pairs enclosed in curly brackets
+ * (denoting a map). Values can be nested maps, or lists (which are
+ * denoted as a comma separated list of values inside square brackets,
+ * e.g. [a, b, c]).
+ *
+ * The currently supported options are as follows:
+ *
+ * create - whether the address should be automatically created or
+ * not. Can be one of always, never, sender or receiver; the
+ * properties of the node to be created can be specified via the
+ * node-properties option.
+ *
+ * assert - whether or not to assert any specified node-properties
+ * match the address. Can be one of always, never, sender or receiver.
+ *
+ * delete - whether or not to delete the addressed nide when a sender
+ * or receiver is cancelled. Can be one of always, never, sender or
+ * receiver.
+ *
+ * node-properties - a nested map of properties of the addressed
+ * entity or 'node'. These can be used when automatically creating it,
+ * or to assert certain properties.
+ *
+ * The valid node-properties are:
+ *
+ * type - queue or topic
+ *
+ * durable - true or false
+ *
+ * x-properties - a nested map that can contain implementation or
+ * protocol specifiec extedned properties. For the amqp 0-10 mapping,
+ * the fields in queue- or exchange- declare can be specified in here;
+ * anything that is not recognised as one of those will be passed
+ * through in the arguments field.
+ *
+ * For receivers there are some further options of interest:
+ *
+ * no-local - (only relevant for topics at present) specifies that the
+ * receiver does not want to receiver messages published to the topic
+ * that originate from a sender on the same connection
+ *
+ * browse - (only relevant for queues) specifies that the receiver
+ * does not wish to consume the messages, but merely browse them
+ *
+ * durable - (only relevant for topics at present) specifies that a
+ * durable subscription is required
+ *
+ * reliability - indicates the level of reliability that the receiver
+ * expects. Can be one of unreliable, at-most-once, at-least-once or
+ * exactly-once (the latter is not yet correctly supported).
+ *
+ * filter - (only relevant for topics at present) allows bindings to
+ * be created for the queue that match the given criteris (or list of
+ * criteria).
+ *
+ * x-properties - allows protocol or implementation specific options
+ * to be specified for a receiver; this is a nested map and currently
+ * the implementation only recognises two specific nested properties
+ * within it (all others are passed through in the arguments of the
+ * message-subscribe command):
+ *
+ * exclusive, which requests an exclusive subscription and
+ * is only relevant for queues
+ *
+ * x-queue-arguments, which ais only relevant for topics and
+ * allows arguments to the queue-declare for the subscription
+ * queue to be specified
*/
class Address
{
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 89cc053ff7..6cc7fcc587 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -179,7 +179,7 @@ class QueueSource : public Queue, public MessageSource
private:
const AcceptMode acceptMode;
const AcquireMode acquireMode;
- const bool exclusive;
+ bool exclusive;
FieldTable options;
};
@@ -344,10 +344,19 @@ QueueSource::QueueSource(const Address& address) :
Queue(address),
acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
- exclusive(getNestedOption(address.getOptions(), list_of<std::string>(X_PROPERTIES)(EXCLUSIVE)).asBool())
+ exclusive(false)
{
//extract subscription arguments from address options
- convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options);
+ const Variant& x = address.getOption(X_PROPERTIES);
+ if (!x.isVoid()) {
+ const Variant::Map& xProps = x.asMap();
+ Variant::Map passthrough;
+ for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
+ if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
+ else passthrough[i->first] = i->second;
+ }
+ translate(passthrough, options);
+ }
}
void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
@@ -384,8 +393,16 @@ Subscription::Subscription(const Address& address, const std::string& exchangeTy
durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
{
if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
- convert(address.getOption(xamqp::QUEUE_ARGUMENTS), queueOptions);
- convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), subscriptionOptions);
+ const Variant& x = address.getOption(X_PROPERTIES);
+ if (!x.isVoid()) {
+ const Variant::Map& xProps = x.asMap();
+ Variant::Map passthrough;
+ for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
+ if (i->first == xamqp::QUEUE_ARGUMENTS) convert(i->second.asMap(), queueOptions);
+ else passthrough[i->first] = i->second;
+ }
+ translate(passthrough, subscriptionOptions);
+ }
const Variant& filter = address.getOption(FILTER);
if (!filter.isVoid()) {