diff options
author | Gordon Sim <gsim@apache.org> | 2010-01-26 11:13:26 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-01-26 11:13:26 +0000 |
commit | 988872f0eb969a68d53ae303cc0a2aaddd87420f (patch) | |
tree | 79d23859e9a123811d2698128edcae039d856209 /cpp/src | |
parent | 220cd21a6907b86c5b3e75ec6c5fd91d27599b56 (diff) | |
download | qpid-python-988872f0eb969a68d53ae303cc0a2aaddd87420f.tar.gz |
QPID-2288: add support for bindings in address options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903171 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 68 |
1 files changed, 55 insertions, 13 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index b70e67d12f..3e53f40ba4 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -109,6 +109,7 @@ const std::string EXCLUSIVE("exclusive"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); const std::string QUEUE_ARGUMENTS("x-queue-arguments"); const std::string SUBSCRIBE_ARGUMENTS("x-subscribe-arguments"); +const std::string BINDINGS("bindings"); } class Node @@ -130,6 +131,18 @@ class Node static std::vector<std::string> SENDER_MODES; }; +struct Binding +{ + Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + + std::string exchange; + std::string key; + FieldTable options; +}; + +typedef std::vector<Binding> Bindings; + + class Queue : protected Node { public: @@ -143,9 +156,12 @@ class Queue : protected Node bool autoDelete; bool exclusive; std::string alternateExchange; - FieldTable arguments; + FieldTable arguments; + Bindings bindings; void configure(const Address&); + void addBindings(const Variant::List&); + void addBinding(const std::string&); }; class Exchange : protected Node @@ -189,17 +205,6 @@ class Subscription : public Exchange, public MessageSource void subscribe(qpid::client::AsyncSession& session, const std::string& destination); void cancel(qpid::client::AsyncSession& session, const std::string& destination); private: - struct Binding - { - Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); - - std::string exchange; - std::string key; - FieldTable options; - }; - - typedef std::vector<Binding> Bindings; - const std::string queue; const bool reliable; const bool durable; @@ -452,7 +457,7 @@ void Subscription::cancel(qpid::client::AsyncSession& session, const std::string checkDelete(session, FOR_RECEIVER); } -Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o): +Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o): exchange(e), key(k), options(o) {} ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {} @@ -613,6 +618,14 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) } catch (const qpid::Exception& e) { throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str()); } + try { + for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { + session.exchangeBind(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key); + } + session.sync(); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Could not create queue bindings for %1%; %2%") % name % e.what()).str()); + } } else { try { sync(session).queueDeclare(arg::queue=name, arg::passive=true); @@ -665,10 +678,38 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) % i->first % name % *(i->second) % *v).str()); } } + for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { + ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key); + if (result.getQueueNotMatched() || result.getKeyNotMatched()) { + throw InvalidAddress((boost::format("Binding %1%/%2% for %3% was not matched") % i->exchange % i->key % name).str()); + } + } + } + } +} + +void Queue::addBinding(const std::string& b) +{ + string::size_type i = b.find('/'); + if (i == string::npos) { + bindings.push_back(Binding(b, EMPTY_STRING)); + } else { + std::string exchange = b.substr(0, i); + if (i+1 < b.size()) { + bindings.push_back(Binding(exchange, b.substr(i+1))); + } else { + bindings.push_back(Binding(exchange, EMPTY_STRING)); } } } +void Queue::addBindings(const Variant::List& list) +{ + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + addBinding(i->asString()); + } +} + void Queue::configure(const Address& address) { const Variant& v = address.getOption(NODE_PROPERTIES); @@ -683,6 +724,7 @@ void Queue::configure(const Address& address) if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second; else if (i->first == xamqp::EXCLUSIVE) exclusive = i->second; else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); + else if (i->first == xamqp::BINDINGS) addBindings(i->second.asList()); else passthrough[i->first] = i->second; } translate(passthrough, arguments); |