diff options
author | Gordon Sim <gsim@apache.org> | 2010-03-24 18:35:11 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-03-24 18:35:11 +0000 |
commit | b5d1ef6d40dced4d81d6e57b3ec748f4ac9284ee (patch) | |
tree | b27010210b5773be5cc3675f32e0b50b0e652ec3 /cpp/src | |
parent | d0cc60e1462d2a8258ebff5e36b19ca709d23d64 (diff) | |
download | qpid-python-b5d1ef6d40dced4d81d6e57b3ec748f4ac9284ee.tar.gz |
QPID-664: Updates to address options to stay in-line with python client changes in r926604
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@927144 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 591 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Address.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 18 |
3 files changed, 329 insertions, 282 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 56499bb458..9f2d4eef78 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -61,40 +61,54 @@ using namespace boost::assign; namespace{ const Variant EMPTY_VARIANT; const FieldTable EMPTY_FIELD_TABLE; + const Variant::List EMPTY_LIST; const std::string EMPTY_STRING; -//option names -const std::string BROWSE("browse"); -const std::string CONSUME("consume"); -const std::string EXCLUSIVE("exclusive"); -const std::string MODE("mode"); -const std::string NO_LOCAL("no-local"); -const std::string FILTER("filter"); -const std::string RELIABILITY("reliability"); -const std::string NAME("subscription-name"); -const std::string NODE_PROPERTIES("node-properties"); -const std::string X_PROPERTIES("x-properties"); - //policy types const std::string CREATE("create"); const std::string ASSERT("assert"); const std::string DELETE("delete"); + +//option names +const std::string NODE("node"); +const std::string LINK("link"); +const std::string MODE("mode"); +const std::string RELIABILITY("reliability"); +const std::string NAME("name"); +const std::string DURABLE("durable"); +const std::string X_DECLARE("x-declare"); +const std::string X_SUBSCRIBE("x-subscribe"); +const std::string X_BINDINGS("x-bindings"); +const std::string EXCHANGE("exchange"); +const std::string QUEUE("queue"); +const std::string KEY("key"); +const std::string ARGUMENTS("arguments"); +const std::string ALTERNATE_EXCHANGE("alternate-exchange"); +const std::string TYPE("type"); +const std::string EXCLUSIVE("exclusive"); +const std::string AUTO_DELETE("auto-delete"); + //policy values const std::string ALWAYS("always"); const std::string NEVER("never"); const std::string RECEIVER("receiver"); const std::string SENDER("sender"); +//address types const std::string QUEUE_ADDRESS("queue"); const std::string TOPIC_ADDRESS("topic"); +//reliability options: const std::string UNRELIABLE("unreliable"); const std::string AT_MOST_ONCE("at-most-once"); const std::string AT_LEAST_ONCE("at-least-once"); const std::string EXACTLY_ONCE("exactly-once"); -const std::string DURABLE_SUBSCRIPTION("durable"); -const std::string DURABLE("durable"); +//receiver modes: +const std::string BROWSE("browse"); +const std::string CONSUME("consume"); + +//0-10 exchange types: const std::string TOPIC_EXCHANGE("topic"); const std::string FANOUT_EXCHANGE("fanout"); const std::string DIRECT_EXCHANGE("direct"); @@ -103,16 +117,26 @@ const std::string XML_EXCHANGE("xml"); const std::string WILDCARD_ANY("*"); } -//some amqp 0-10 specific options -namespace xamqp{ -const std::string AUTO_DELETE("auto-delete"); -const std::string EXCHANGE_TYPE("type"); -const std::string EXCLUSIVE("exclusive"); -const std::string ALTERNATE_EXCHANGE("alternate-exchange"); -const std::string QUEUE_ARGUMENTS("x-queue-arguments"); -const std::string SUBSCRIBE_ARGUMENTS("x-subscribe-arguments"); -const std::string BINDINGS("bindings"); -} +struct Binding +{ + Binding(const Variant::Map&); + Binding(const std::string& exchange, const std::string& queue, const std::string& key); + + std::string exchange; + std::string queue; + std::string key; + FieldTable arguments; +}; + +struct Bindings : std::vector<Binding> +{ + void add(const Variant::List& bindings); + void setDefaultExchange(const std::string&); + void setDefaultQueue(const std::string&); + void bind(qpid::client::AsyncSession& session); + void unbind(qpid::client::AsyncSession& session); + void check(qpid::client::AsyncSession& session); +}; class Node { @@ -125,6 +149,8 @@ class Node Variant createPolicy; Variant assertPolicy; Variant deletePolicy; + Bindings nodeBindings; + Bindings linkBindings; static bool enabled(const Variant& policy, CheckMode mode); static bool createEnabled(const Address& address, CheckMode mode); @@ -133,17 +159,6 @@ class Node static std::vector<std::string> SENDER_MODES; }; -struct Binding -{ - Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); - - std::string exchange; - std::string key; - FieldTable options; -}; - -typedef std::vector<Binding> Bindings; - class Queue : protected Node { @@ -154,16 +169,11 @@ class Queue : protected Node void checkAssert(qpid::client::AsyncSession&, CheckMode); void checkDelete(qpid::client::AsyncSession&, CheckMode); private: - bool durable; - bool autoDelete; - bool exclusive; - std::string alternateExchange; + const bool durable; + const bool autoDelete; + const bool exclusive; + const std::string alternateExchange; FieldTable arguments; - Bindings bindings; - - void configure(const Address&); - void addBindings(const Variant::List&); - void addBinding(const std::string&); }; class Exchange : protected Node @@ -174,17 +184,14 @@ class Exchange : protected Node void checkCreate(qpid::client::AsyncSession&, CheckMode); void checkAssert(qpid::client::AsyncSession&, CheckMode); void checkDelete(qpid::client::AsyncSession&, CheckMode); - const std::string& getDesiredExchangeType() { return type; } + protected: + const std::string specifiedType; private: - std::string type; - bool typeSpecified; - bool durable; - bool autoDelete; - std::string alternateExchange; - FieldTable arguments; - - void configure(const Address&); + const bool durable; + const bool autoDelete; + const std::string alternateExchange; + FieldTable arguments; }; class QueueSource : public Queue, public MessageSource @@ -203,24 +210,22 @@ class QueueSource : public Queue, public MessageSource class Subscription : public Exchange, public MessageSource { public: - Subscription(const Address&, const std::string& exchangeType=""); + Subscription(const Address&, const std::string& actualType); void subscribe(qpid::client::AsyncSession& session, const std::string& destination); void cancel(qpid::client::AsyncSession& session, const std::string& destination); private: const std::string queue; const bool reliable; const bool durable; + const std::string actualType; FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; - void bindSpecial(const std::string& exchangeType); - void bind(const std::string& subject); - void bind(const std::string& subject, const Variant& filter); - void bind(const std::string& subject, const Variant::Map& filter); - void bind(const std::string& subject, const Variant::List& filter); - void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); - static std::string getSubscriptionName(const std::string& base, const Variant& name); + void bindSubject(const std::string& subject); + void bindAll(); + void add(const std::string& exchange, const std::string& key); + static std::string getSubscriptionName(const std::string& base, const std::string& name); }; class ExchangeSink : public Exchange, public MessageSink @@ -267,14 +272,102 @@ bool getSenderPolicy(const Address& address, const std::string& key) return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER)); } +const Variant& getOption(const Variant::Map& options, const std::vector<std::string>& path, size_t index=0) +{ + Variant::Map::const_iterator j = options.find(path[index]); + if (j == options.end()) { + return EMPTY_VARIANT; + } else if (++index < path.size()) { + if (j->second.getType() != qpid::messaging::VAR_MAP) + throw InvalidAddress((boost::format("Expected %1% to be a map") % j->first).str()); + return getOption(j->second.asMap(), path, index); + } else { + return j->second; + } +} + +const Variant& getOption(const Address& address, const std::vector<std::string>& path) +{ + return getOption(address.getOptions(), path); +} + +const Variant& getOption(const Variant::Map& options, const std::string& name) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return EMPTY_VARIANT; + } else { + return j->second; + } +} + +struct Opt +{ + Opt(const Address& address); + Opt(const Variant::Map& base); + Opt& operator/(const std::string& name); + operator bool() const; + std::string str() const; + const Variant::List& asList() const; + void collect(qpid::framing::FieldTable& args) const; + + const Variant::Map* options; + const Variant* value; +}; + +Opt::Opt(const Address& address) : options(&(address.getOptions())), value(0) {} +Opt::Opt(const Variant::Map& base) : options(&base), value(0) {} +Opt& Opt::operator/(const std::string& name) +{ + if (options) { + Variant::Map::const_iterator j = options->find(name); + if (j == options->end()) { + value = 0; + options = 0; + } else { + value = &(j->second); + if (value->getType() == qpid::messaging::VAR_MAP) options = &(value->asMap()); + else options = 0; + } + } + return *this; +} + + +Opt::operator bool() const +{ + return value && !value->isVoid() && value->asBool(); +} + +std::string Opt::str() const +{ + if (value) return value->asString(); + else return EMPTY_STRING; +} + +const Variant::List& Opt::asList() const +{ + if (value) return value->asList(); + else return EMPTY_LIST; +} + +void Opt::collect(qpid::framing::FieldTable& args) const +{ + if (value) { + translate(value->asMap(), args); + } +} + bool AddressResolution::is_unreliable(const Address& address) { - return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); + return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)), + list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); } bool AddressResolution::is_reliable(const Address& address) { - return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE)); + return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)), + list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE)); } std::string checkAddressType(qpid::client::Session session, const Address& address) @@ -282,7 +375,7 @@ std::string checkAddressType(qpid::client::Session session, const Address& addre if (address.getName().empty()) { throw InvalidAddress("Name cannot be null"); } - std::string type = address.getType(); + std::string type = (Opt(address)/NODE/TYPE).str(); if (type.empty()) { ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName()); if (result.getQueueNotFound() && result.getExchangeNotFound()) { @@ -307,7 +400,8 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess { std::string type = checkAddressType(session, address); if (type == TOPIC_ADDRESS) { - std::auto_ptr<MessageSource> source(new Subscription(address)); + std::string exchangeType = sync(session).exchangeQuery(address.getName()).getType(); + std::auto_ptr<MessageSource> source(new Subscription(address, exchangeType)); QPID_LOG(debug, "treating source address as topic: " << address); return source; } else if (type == QUEUE_ADDRESS) { @@ -337,18 +431,6 @@ std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session } } -const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0) -{ - Variant::Map::const_iterator i = options.find(keys[index]); - if (i == options.end()) { - return EMPTY_VARIANT; - } else if (index+1 < keys.size()) { - return getNestedOption(i->second.asMap(), keys, index+1); - } else { - return i->second; - } -} - bool isBrowse(const Address& address) { const Variant& mode = address.getOption(MODE); @@ -366,23 +448,18 @@ QueueSource::QueueSource(const Address& address) : acquireMode(isBrowse(address) ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED), exclusive(false) { - //extract subscription arguments from address options - const Variant& x = address.getOption(X_PROPERTIES); - if (!x.isVoid()) { - const Variant::Map& xProps = x.asMap(); - Variant::Map passthrough; - for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) { - if (i->first == xamqp::EXCLUSIVE) exclusive = i->second; - else passthrough[i->first] = i->second; - } - translate(passthrough, options); - } + //extract subscription arguments from address options (nb: setting + //of accept-mode/acquire-mode/destination controlled though other + //options) + exclusive = Opt(address)/NODE/LINK/X_SUBSCRIBE/EXCLUSIVE; + (Opt(address)/NODE/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options); } void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination) { checkCreate(session, FOR_RECEIVER); checkAssert(session, FOR_RECEIVER); + linkBindings.bind(session); session.messageSubscribe(arg::queue=name, arg::destination=destination, arg::acceptMode=acceptMode, @@ -393,58 +470,72 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination) { + linkBindings.unbind(session); session.messageCancel(destination); checkDelete(session, FOR_RECEIVER); } -std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name) +std::string Subscription::getSubscriptionName(const std::string& base, const std::string& name) { - if (name.isVoid()) { + if (name.empty()) { return (boost::format("%1%_%2%") % base % Uuid(true).str()).str(); } else { - return (boost::format("%1%_%2%") % base % name.asString()).str(); + return (boost::format("%1%_%2%") % base % name).str(); } } -Subscription::Subscription(const Address& address, const std::string& exchangeType) +Subscription::Subscription(const Address& address, const std::string& type) : Exchange(address), - queue(getSubscriptionName(name, address.getOption(NAME))), + queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())), reliable(AddressResolution::is_reliable(address)), - durable(address.getOption(DURABLE_SUBSCRIPTION).asBool()) -{ - if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1); - const Variant& x = address.getOption(X_PROPERTIES); - if (!x.isVoid()) { - const Variant::Map& xProps = x.asMap(); - Variant::Map passthrough; - for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) { - if (i->first == xamqp::QUEUE_ARGUMENTS) convert(i->second.asMap(), queueOptions); - else passthrough[i->first] = i->second; - } - translate(passthrough, subscriptionOptions); - } + durable(Opt(address)/LINK/DURABLE), + actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type) +{ + (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); + (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); + + if (!address.getSubject().empty()) bindSubject(address.getSubject()); + else if (linkBindings.empty()) bindAll(); +} - const Variant& filter = address.getOption(FILTER); - if (!filter.isVoid()) { - bind(address.getSubject(), filter); - } else if (address.hasSubject()) { - //Note: This will not work for headers- or xml- exchange; - //fanout exchange will do no filtering. - //TODO: for headers- or xml- exchange can construct a match - //for the subject in the application-headers - bind(address.getSubject()); +void Subscription::bindSubject(const std::string& subject) +{ + if (actualType == HEADERS_EXCHANGE) { + Binding b(name, queue, subject); + b.arguments.setString("qpid.subject", subject); + b.arguments.setString("x-match", "all"); + bindings.push_back(b); + } else if (actualType == XML_EXCHANGE) { + Binding b(name, queue, subject); + std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'") + % subject).str(); + b.arguments.setString("xquery", query); + bindings.push_back(b); } else { - //Neither a subject nor a filter has been defined, treat this - //as wanting to match all messages (Note: direct exchange is - //currently unable to support this case). - if (!exchangeType.empty()) bindSpecial(exchangeType); - else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType()); + //Note: the fanout exchange doesn't support any filtering, so + //the subject is ignored in that case + add(name, subject); } } -void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options) +void Subscription::bindAll() { - bindings.push_back(Binding(exchange, key, options)); + if (actualType == TOPIC_EXCHANGE) { + add(name, WILDCARD_ANY); + } else if (actualType == FANOUT_EXCHANGE) { + add(name, queue); + } else if (actualType == HEADERS_EXCHANGE) { + Binding b(name, queue, "match-all"); + b.arguments.setString("x-match", "all"); + bindings.push_back(b); + } else { //E.g. direct and xml + throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType)); + } +} + +void Subscription::add(const std::string& exchange, const std::string& key) +{ + bindings.push_back(Binding(exchange, queue, key)); } void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination) @@ -456,10 +547,11 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str //create subscription queue: session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); - //bind subscription queue to exchange: - for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { - session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options); - } + //'default' binding: + bindings.bind(session); + //any explicit bindings: + linkBindings.setDefaultQueue(queue); + linkBindings.bind(session); //subscribe to subscription queue: AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; session.messageSubscribe(arg::queue=queue, arg::destination=destination, @@ -468,20 +560,19 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination) { + linkBindings.unbind(session); session.messageCancel(destination); session.queueDelete(arg::queue=queue); checkDelete(session, FOR_RECEIVER); } -Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o): - exchange(e), key(k), options(o) {} - ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {} void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&) { checkCreate(session, FOR_SENDER); checkAssert(session, FOR_SENDER); + linkBindings.bind(session); } void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) @@ -492,6 +583,7 @@ void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&) { + linkBindings.unbind(session); checkDelete(session, FOR_SENDER); } @@ -501,6 +593,7 @@ void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&) { checkCreate(session, FOR_SENDER); checkAssert(session, FOR_SENDER); + linkBindings.bind(session); } void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { @@ -510,6 +603,7 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&) { + linkBindings.unbind(session); checkDelete(session, FOR_SENDER); } @@ -556,68 +650,24 @@ bool isTopic(qpid::client::Session session, const qpid::messaging::Address& addr } } -void Subscription::bind(const std::string& subject) -{ - add(name, subject); -} - -void Subscription::bind(const std::string& subject, const Variant& filter) -{ - switch (filter.getType()) { - case qpid::messaging::VAR_MAP: - bind(subject, filter.asMap()); - break; - case qpid::messaging::VAR_LIST: - bind(subject, filter.asList()); - break; - default: - //TODO: if both subject _and_ filter are specified, combine in - //some way; for now we just ignore the subject in that case. - add(name, filter.asString()); - break; - } -} - -void Subscription::bind(const std::string& subject, const Variant::Map& filter) -{ - qpid::framing::FieldTable arguments; - translate(filter, arguments); - add(name, subject.empty() ? queue : subject, arguments); -} - -void Subscription::bind(const std::string& subject, const Variant::List& filter) -{ - for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) { - bind(subject, *i); - } -} - -void Subscription::bindSpecial(const std::string& exchangeType) -{ - if (exchangeType == TOPIC_EXCHANGE) { - add(name, WILDCARD_ANY); - } else if (exchangeType == FANOUT_EXCHANGE) { - add(name, queue); - } else if (exchangeType == HEADERS_EXCHANGE) { - //TODO: add special binding for headers exchange to match all messages - } else if (exchangeType == XML_EXCHANGE) { - //TODO: add special binding for xml exchange to match all messages - } else { //E.g. direct - throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType)); - } -} - Node::Node(const Address& address) : name(address.getName()), createPolicy(address.getOption(CREATE)), assertPolicy(address.getOption(ASSERT)), - deletePolicy(address.getOption(DELETE)) {} + deletePolicy(address.getOption(DELETE)) +{ + nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList()); + linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList()); +} Queue::Queue(const Address& a) : Node(a), - durable(false), - autoDelete(false), - exclusive(false) + durable(Opt(a)/NODE/DURABLE), + autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE), + exclusive(Opt(a)/NODE/X_DECLARE/EXCLUSIVE), + alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str()) { - configure(a); + (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments); + nodeBindings.setDefaultQueue(name); + linkBindings.setDefaultQueue(name); } void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) @@ -634,14 +684,7 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) } catch (const qpid::Exception& e) { throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str()); } - try { - for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { - session.exchangeBind(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key); - } - session.sync(); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create queue bindings for %1%; %2%") % name % e.what()).str()); - } + nodeBindings.bind(session); } else { try { sync(session).queueDeclare(arg::queue=name, arg::passive=true); @@ -694,82 +737,38 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) % i->first % name % *(i->second) % *v).str()); } } - for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { - ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key); - if (result.getQueueNotMatched() || result.getKeyNotMatched()) { - throw InvalidAddress((boost::format("Binding %1%/%2% for %3% was not matched") % i->exchange % i->key % name).str()); - } - } - } - } -} - -void Queue::addBinding(const std::string& b) -{ - string::size_type i = b.find('/'); - if (i == string::npos) { - bindings.push_back(Binding(b, EMPTY_STRING)); - } else { - std::string exchange = b.substr(0, i); - if (i+1 < b.size()) { - bindings.push_back(Binding(exchange, b.substr(i+1))); - } else { - bindings.push_back(Binding(exchange, EMPTY_STRING)); - } - } -} - -void Queue::addBindings(const Variant::List& list) -{ - for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { - addBinding(i->asString()); - } -} - -void Queue::configure(const Address& address) -{ - const Variant& v = address.getOption(NODE_PROPERTIES); - if (!v.isVoid()) { - Variant::Map nodeProps = v.asMap(); - durable = nodeProps[DURABLE]; - Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES); - if (x != nodeProps.end()) { - const Variant::Map& xProps = x->second.asMap(); - Variant::Map passthrough; - for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) { - if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second; - else if (i->first == xamqp::EXCLUSIVE) exclusive = i->second; - else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); - else if (i->first == xamqp::BINDINGS) addBindings(i->second.asList()); - else passthrough[i->first] = i->second; - } - translate(passthrough, arguments); + nodeBindings.check(session); } } } Exchange::Exchange(const Address& a) : Node(a), - type(TOPIC_EXCHANGE), - typeSpecified(false), - durable(false), - autoDelete(false) + specifiedType((Opt(a)/NODE/X_DECLARE/TYPE).str()), + durable(Opt(a)/NODE/DURABLE), + autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE), + alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str()) { - configure(a); + (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments); + nodeBindings.setDefaultExchange(name); + linkBindings.setDefaultExchange(name); } void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) { if (enabled(createPolicy, mode)) { try { + std::string type = specifiedType; + if (type.empty()) type = TOPIC_EXCHANGE; sync(session).exchangeDeclare(arg::exchange=name, - arg::type=type, - arg::durable=durable, - arg::autoDelete=autoDelete, - arg::alternateExchange=alternateExchange, - arg::arguments=arguments); + arg::type=type, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); } catch (const qpid::Exception& e) { throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str()); } + nodeBindings.bind(session); } else { try { sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); @@ -800,9 +799,9 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) if (result.getNotFound()) { throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str()); } else { - if (typeSpecified && result.getType() != type) { + if (specifiedType.size() && result.getType() != specifiedType) { throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") - % name % type % result.getType()).str()); + % name % specifiedType % result.getType()).str()); } if (durable && !result.getDurable()) { throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str()); @@ -819,31 +818,79 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) % i->first % name % *(i->second) % *v).str()); } } + nodeBindings.check(session); } } } -void Exchange::configure(const Address& address) -{ - const Variant& v = address.getOption(NODE_PROPERTIES); - if (!v.isVoid()) { - Variant::Map nodeProps = v.asMap(); - durable = nodeProps[DURABLE]; - Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES); - if (x != nodeProps.end()) { - const Variant::Map& xProps = x->second.asMap(); - Variant::Map passthrough; - for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) { - if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second; - else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; } - else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); - else passthrough[i->first] = i->second; - } - translate(passthrough, arguments); +Binding::Binding(const Variant::Map& b) : + exchange((Opt(b)/EXCHANGE).str()), + queue((Opt(b)/QUEUE).str()), + key((Opt(b)/KEY).str()) +{ + (Opt(b)/ARGUMENTS).collect(arguments); +} + +Binding::Binding(const std::string& e, const std::string& q, const std::string& k) : exchange(e), queue(q), key(k) {} + + +void Bindings::add(const Variant::List& list) +{ + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + push_back(Binding(i->asMap())); + } +} + +void Bindings::setDefaultExchange(const std::string& exchange) +{ + for (Bindings::iterator i = begin(); i != end(); ++i) { + if (i->exchange.empty()) i->exchange = exchange; + } +} + +void Bindings::setDefaultQueue(const std::string& queue) +{ + for (Bindings::iterator i = begin(); i != end(); ++i) { + if (i->queue.empty()) i->queue = queue; + } +} + +void Bindings::bind(qpid::client::AsyncSession& session) +{ + try { + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + session.exchangeBind(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key, + arg::arguments=i->arguments); } + session.sync(); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str()); } } +void Bindings::unbind(qpid::client::AsyncSession& session) +{ + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + session.exchangeUnbind(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key); + } +} + +void Bindings::check(qpid::client::AsyncSession& session) +{ + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key); + if (result.getQueueNotMatched() || result.getKeyNotMatched()) { + throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") + % i->exchange % i->queue % i->key).str()); + } + } +} bool Node::enabled(const Variant& policy, CheckMode mode) { diff --git a/cpp/src/qpid/messaging/Address.cpp b/cpp/src/qpid/messaging/Address.cpp index 057196a957..a5d0671360 100644 --- a/cpp/src/qpid/messaging/Address.cpp +++ b/cpp/src/qpid/messaging/Address.cpp @@ -114,7 +114,7 @@ void Address::setOptions(const Variant::Map& options) { impl->options = options; namespace{ const Variant EMPTY_VARIANT; const std::string EMPTY_STRING; -const std::string NODE_PROPERTIES="node-properties"; +const std::string NODE_PROPERTIES="node"; } const Variant& find(const Variant::Map& map, const std::string& key) diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index a1e90f83f3..ef0685a508 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -564,13 +564,13 @@ struct QueueCreatePolicyFixture : public MessagingFixture QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways) { - QueueCreatePolicyFixture fix("#; {create:always, node-properties:{type:queue}}"); + QueueCreatePolicyFixture fix("#; {create:always, node:{type:queue}}"); fix.test(); } QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver) { - QueueCreatePolicyFixture fix("#; {create:receiver, node-properties:{type:queue}}"); + QueueCreatePolicyFixture fix("#; {create:receiver, node:{type:queue}}"); Receiver r = fix.session.createReceiver(fix.address); fix.test(); r.close(); @@ -578,7 +578,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver) QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender) { - QueueCreatePolicyFixture fix("#; {create:sender, node-properties:{type:queue}}"); + QueueCreatePolicyFixture fix("#; {create:sender, node:{type:queue}}"); Sender s = fix.session.createSender(fix.address); fix.test(); s.close(); @@ -608,14 +608,14 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture QPID_AUTO_TEST_CASE(testCreatePolicyTopic) { - ExchangeCreatePolicyFixture fix("#; {create:always, node-properties:{type:topic}}", + ExchangeCreatePolicyFixture fix("#; {create:always, node:{type:topic}}", "topic"); fix.test(); } QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout) { - ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node-properties:{type:topic, x-properties:{type:fanout}}}", "fanout"); + ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node:{type:topic, x-declare:{type:fanout}}}", "fanout"); Receiver r = fix.session.createReceiver(fix.address); fix.test(); r.close(); @@ -623,7 +623,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout) QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect) { - ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node-properties:{type:topic, x-properties:{type:direct}}}", "direct"); + ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node:{type:topic, x-declare:{type:direct}}}", "direct"); Sender s = fix.session.createSender(fix.address); fix.test(); s.close(); @@ -746,18 +746,18 @@ QPID_AUTO_TEST_CASE(testDeletePolicyExchange) QPID_AUTO_TEST_CASE(testAssertPolicyQueue) { MessagingFixture fix; - std::string a1 = "q; {create:always, assert:always, node-properties:{type:queue, durable:false, x-properties:{qpid.max-count:100}}}"; + std::string a1 = "q; {create:always, assert:always, node:{type:queue, durable:false, x-declare:{arguments:{qpid.max-count:100}}}}"; Sender s1 = fix.session.createSender(a1); s1.close(); Receiver r1 = fix.session.createReceiver(a1); r1.close(); - std::string a2 = "q; {assert:receiver, node-properties:{durable:true, x-properties:{qpid.max-count:100}}}"; + std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}"; Sender s2 = fix.session.createSender(a2); s2.close(); BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress); - std::string a3 = "q; {assert:sender, node-properties:{x-properties:{qpid.max-count:99}}}"; + std::string a3 = "q; {assert:sender, node:{x-declare:{arguments:{qpid.max-count:99}}}}"; BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress); Receiver r3 = fix.session.createReceiver(a3); r3.close(); |