summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-11-19 23:07:37 +0000
committerGordon Sim <gsim@apache.org>2009-11-19 23:07:37 +0000
commitefb41eeabf7f22cf400a6bc6ab809af610af2664 (patch)
tree3f0f31debd64d95303dd1c499e34a1d207301042 /qpid/cpp
parent68dd9f9ad022465bb36a8861d8acbd7329473c3d (diff)
downloadqpid-python-efb41eeabf7f22cf400a6bc6ab809af610af2664.tar.gz
QPID-664: Refactored address resolution; ensure type is asserted on when required; moved exclusive option for subscribe.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@882349 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp135
1 files changed, 58 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index edf1ed74f5..89cc053ff7 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -60,6 +60,7 @@ using namespace boost::assign;
namespace{
+const Variant EMPTY_VARIANT;
const FieldTable EMPTY_FIELD_TABLE;
const std::string EMPTY_STRING;
@@ -160,6 +161,7 @@ class Exchange : protected Node
private:
std::string type;
+ bool typeSpecified;
bool durable;
bool autoDelete;
std::string alternateExchange;
@@ -269,50 +271,42 @@ bool is_reliable(const Address& address)
return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
+std::string checkAddressType(qpid::client::Session session, const Address& address)
+{
+ std::string type = address.getType();
+ if (type.empty()) {
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name; treat it as a queue
+ type = QUEUE_ADDRESS;
+ } else if (result.getExchangeNotFound()) {
+ //name refers to a queue
+ type = QUEUE_ADDRESS;
+ } else if (result.getQueueNotFound()) {
+ //name refers to an exchange
+ type = TOPIC_ADDRESS;
+ } else {
+ //both a queue and exchange exist for that name
+ throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
+ }
+ }
+ return type;
+}
+
std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
const Address& address)
{
- ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
- if (result.getQueueNotFound() && result.getExchangeNotFound()) {
- //neither a queue nor an exchange exists with that name
- if (address.getType() == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSource> source(new Subscription(address));
- QPID_LOG(debug, "treating source address as topic: " << address);
- return source;
- } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
- std::auto_ptr<MessageSource> source(new QueueSource(address));
- QPID_LOG(debug, "treating source address as queue: " << address);
- return source;
- } else {
- throw InvalidAddress("Unrecognised type: " + address.getType());
- }
- } else if (result.getQueueNotFound()) {
- //only an exchange exists with that name
- qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
- std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
- QPID_LOG(debug, "resolved source address as topic: " << address);
+ std::string type = checkAddressType(session, address);
+ if (type == TOPIC_ADDRESS) {
+ std::auto_ptr<MessageSource> source(new Subscription(address));
+ QPID_LOG(debug, "treating source address as topic: " << address);
return source;
- } else if (result.getExchangeNotFound()) {
- //only an queue exists with that name
+ } else if (type == QUEUE_ADDRESS) {
std::auto_ptr<MessageSource> source(new QueueSource(address));
- QPID_LOG(debug, "resolved source address as queue: " << address);
+ QPID_LOG(debug, "treating source address as queue: " << address);
return source;
} else {
- //both a queue and exchange exist for that name
- if (address.getType() == TOPIC_ADDRESS) {
- qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
- std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
- QPID_LOG(debug, "resolved source address as topic: " << address);
- return source;
- } else if (address.getType() == QUEUE_ADDRESS) {
- std::auto_ptr<MessageSource> source(new QueueSource(address));
- QPID_LOG(debug, "resolved source address as queue: " << address);
- return source;
- } else if (address.getType().empty()) {
- throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
- } else {
- throw InvalidAddress("Unrecognised type: " + address.getType());
- }
+ throw InvalidAddress("Unrecognised type: " + type);
}
}
@@ -320,45 +314,29 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess
std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
const qpid::messaging::Address& address)
{
- ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
- if (result.getQueueNotFound() && result.getExchangeNotFound()) {
- //neither a queue nor an exchange exists with that name
- if (address.getType() == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
- QPID_LOG(debug, "treating target address as topic: " << address);
- return sink;
- } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
- std::auto_ptr<MessageSink> sink(new QueueSink(address));
- QPID_LOG(debug, "treating target address as queue: " << address);
- return sink;
- } else {
- throw InvalidAddress("Unrecognised type: " + address.getType());
- }
- } else if (result.getQueueNotFound()) {
- //only an exchange exists with that name
+ std::string type = checkAddressType(session, address);
+ if (type == TOPIC_ADDRESS) {
std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
- QPID_LOG(debug, "resolved target address as topic: " << address);
+ QPID_LOG(debug, "treating target address as topic: " << address);
return sink;
- } else if (result.getExchangeNotFound()) {
- //only an queue exists with that name
+ } else if (type == QUEUE_ADDRESS) {
std::auto_ptr<MessageSink> sink(new QueueSink(address));
- QPID_LOG(debug, "resolved target address as queue: " << address);
+ QPID_LOG(debug, "treating target address as queue: " << address);
return sink;
} else {
- //both a queue and exchange exist for that name
- if (address.getType() == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSink> sink(new ExchangeSink(address));
- QPID_LOG(debug, "resolved target address as topic: " << address);
- return sink;
- } else if (address.getType() == QUEUE_ADDRESS) {
- std::auto_ptr<MessageSink> sink(new QueueSink(address));
- QPID_LOG(debug, "resolved target address as queue: " << address);
- return sink;
- } else if (address.getType().empty()) {
- throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
- } else {
- throw InvalidAddress("Unrecognised type: " + address.getType());
- }
+ throw InvalidAddress("Unrecognised type: " + type);
+ }
+}
+
+const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0)
+{
+ Variant::Map::const_iterator i = options.find(keys[index]);
+ if (i == options.end()) {
+ return EMPTY_VARIANT;
+ } else if (index+1 < keys.size()) {
+ return getNestedOption(i->second.asMap(), keys, index+1);
+ } else {
+ return i->second;
}
}
@@ -366,7 +344,7 @@ QueueSource::QueueSource(const Address& address) :
Queue(address),
acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
- exclusive(address.getOption(EXCLUSIVE).asBool())
+ exclusive(getNestedOption(address.getOptions(), list_of<std::string>(X_PROPERTIES)(EXCLUSIVE)).asBool())
{
//extract subscription arguments from address options
convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options);
@@ -713,6 +691,7 @@ void Queue::configure(const Address& address)
Exchange::Exchange(const Address& a) : Node(a),
type(TOPIC_EXCHANGE),
+ typeSpecified(false),
durable(false),
autoDelete(false)
{
@@ -735,8 +714,10 @@ void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
} else {
try {
sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {
+ throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str());
+ } catch (const std::exception& e) {
+ throw InvalidAddress(e.what());
}
}
}
@@ -756,11 +737,11 @@ void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(assertPolicy, mode)) {
- ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name);
+ ExchangeQueryResult result = sync(session).exchangeQuery(name);
if (result.getNotFound()) {
throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
} else {
- if (!type.empty() && result.getType() != type) {
+ if (typeSpecified && result.getType() != type) {
throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
% name % type % result.getType()).str());
}
@@ -795,7 +776,7 @@ void Exchange::configure(const Address& address)
Variant::Map passthrough;
for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
- else if (i->first == xamqp::EXCHANGE_TYPE) type = i->second.asString();
+ else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; }
else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
else passthrough[i->first] = i->second;
}