summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-01-26 11:13:26 +0000
committerGordon Sim <gsim@apache.org>2010-01-26 11:13:26 +0000
commit988872f0eb969a68d53ae303cc0a2aaddd87420f (patch)
tree79d23859e9a123811d2698128edcae039d856209 /cpp/src
parent220cd21a6907b86c5b3e75ec6c5fd91d27599b56 (diff)
downloadqpid-python-988872f0eb969a68d53ae303cc0a2aaddd87420f.tar.gz
QPID-2288: add support for bindings in address options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903171 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp68
1 files changed, 55 insertions, 13 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index b70e67d12f..3e53f40ba4 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -109,6 +109,7 @@ const std::string EXCLUSIVE("exclusive");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
const std::string QUEUE_ARGUMENTS("x-queue-arguments");
const std::string SUBSCRIBE_ARGUMENTS("x-subscribe-arguments");
+const std::string BINDINGS("bindings");
}
class Node
@@ -130,6 +131,18 @@ class Node
static std::vector<std::string> SENDER_MODES;
};
+struct Binding
+{
+ Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+
+ std::string exchange;
+ std::string key;
+ FieldTable options;
+};
+
+typedef std::vector<Binding> Bindings;
+
+
class Queue : protected Node
{
public:
@@ -143,9 +156,12 @@ class Queue : protected Node
bool autoDelete;
bool exclusive;
std::string alternateExchange;
- FieldTable arguments;
+ FieldTable arguments;
+ Bindings bindings;
void configure(const Address&);
+ void addBindings(const Variant::List&);
+ void addBinding(const std::string&);
};
class Exchange : protected Node
@@ -189,17 +205,6 @@ class Subscription : public Exchange, public MessageSource
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
- struct Binding
- {
- Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
-
- std::string exchange;
- std::string key;
- FieldTable options;
- };
-
- typedef std::vector<Binding> Bindings;
-
const std::string queue;
const bool reliable;
const bool durable;
@@ -452,7 +457,7 @@ void Subscription::cancel(qpid::client::AsyncSession& session, const std::string
checkDelete(session, FOR_RECEIVER);
}
-Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
+Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
exchange(e), key(k), options(o) {}
ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {}
@@ -613,6 +618,14 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
} catch (const qpid::Exception& e) {
throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
}
+ try {
+ for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ session.exchangeBind(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
+ }
+ session.sync();
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create queue bindings for %1%; %2%") % name % e.what()).str());
+ }
} else {
try {
sync(session).queueDeclare(arg::queue=name, arg::passive=true);
@@ -665,10 +678,38 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
% i->first % name % *(i->second) % *v).str());
}
}
+ for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
+ if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
+ throw InvalidAddress((boost::format("Binding %1%/%2% for %3% was not matched") % i->exchange % i->key % name).str());
+ }
+ }
+ }
+ }
+}
+
+void Queue::addBinding(const std::string& b)
+{
+ string::size_type i = b.find('/');
+ if (i == string::npos) {
+ bindings.push_back(Binding(b, EMPTY_STRING));
+ } else {
+ std::string exchange = b.substr(0, i);
+ if (i+1 < b.size()) {
+ bindings.push_back(Binding(exchange, b.substr(i+1)));
+ } else {
+ bindings.push_back(Binding(exchange, EMPTY_STRING));
}
}
}
+void Queue::addBindings(const Variant::List& list)
+{
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ addBinding(i->asString());
+ }
+}
+
void Queue::configure(const Address& address)
{
const Variant& v = address.getOption(NODE_PROPERTIES);
@@ -683,6 +724,7 @@ void Queue::configure(const Address& address)
if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
else if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+ else if (i->first == xamqp::BINDINGS) addBindings(i->second.asList());
else passthrough[i->first] = i->second;
}
translate(passthrough, arguments);