diff options
author | Gordon Sim <gsim@apache.org> | 2009-11-19 23:07:37 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-11-19 23:07:37 +0000 |
commit | efb41eeabf7f22cf400a6bc6ab809af610af2664 (patch) | |
tree | 3f0f31debd64d95303dd1c499e34a1d207301042 | |
parent | 68dd9f9ad022465bb36a8861d8acbd7329473c3d (diff) | |
download | qpid-python-efb41eeabf7f22cf400a6bc6ab809af610af2664.tar.gz |
QPID-664: Refactored address resolution; ensure type is asserted on when required; moved exclusive option for subscribe.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@882349 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 135 |
1 files changed, 58 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index edf1ed74f5..89cc053ff7 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -60,6 +60,7 @@ using namespace boost::assign; namespace{ +const Variant EMPTY_VARIANT; const FieldTable EMPTY_FIELD_TABLE; const std::string EMPTY_STRING; @@ -160,6 +161,7 @@ class Exchange : protected Node private: std::string type; + bool typeSpecified; bool durable; bool autoDelete; std::string alternateExchange; @@ -269,50 +271,42 @@ bool is_reliable(const Address& address) return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE)); } +std::string checkAddressType(qpid::client::Session session, const Address& address) +{ + std::string type = address.getType(); + if (type.empty()) { + ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName()); + if (result.getQueueNotFound() && result.getExchangeNotFound()) { + //neither a queue nor an exchange exists with that name; treat it as a queue + type = QUEUE_ADDRESS; + } else if (result.getExchangeNotFound()) { + //name refers to a queue + type = QUEUE_ADDRESS; + } else if (result.getQueueNotFound()) { + //name refers to an exchange + type = TOPIC_ADDRESS; + } else { + //both a queue and exchange exist for that name + throw InvalidAddress("Ambiguous address, please specify queue or topic as node type"); + } + } + return type; +} + std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session, const Address& address) { - ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName()); - if (result.getQueueNotFound() && result.getExchangeNotFound()) { - //neither a queue nor an exchange exists with that name - if (address.getType() == TOPIC_ADDRESS) { - std::auto_ptr<MessageSource> source(new Subscription(address)); - QPID_LOG(debug, "treating source address as topic: " << address); - return source; - } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) { - std::auto_ptr<MessageSource> source(new QueueSource(address)); - QPID_LOG(debug, "treating source address as queue: " << address); - return source; - } else { - throw InvalidAddress("Unrecognised type: " + address.getType()); - } - } else if (result.getQueueNotFound()) { - //only an exchange exists with that name - qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName()); - std::auto_ptr<MessageSource> source(new Subscription(address, result.getType())); - QPID_LOG(debug, "resolved source address as topic: " << address); + std::string type = checkAddressType(session, address); + if (type == TOPIC_ADDRESS) { + std::auto_ptr<MessageSource> source(new Subscription(address)); + QPID_LOG(debug, "treating source address as topic: " << address); return source; - } else if (result.getExchangeNotFound()) { - //only an queue exists with that name + } else if (type == QUEUE_ADDRESS) { std::auto_ptr<MessageSource> source(new QueueSource(address)); - QPID_LOG(debug, "resolved source address as queue: " << address); + QPID_LOG(debug, "treating source address as queue: " << address); return source; } else { - //both a queue and exchange exist for that name - if (address.getType() == TOPIC_ADDRESS) { - qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName()); - std::auto_ptr<MessageSource> source(new Subscription(address, result.getType())); - QPID_LOG(debug, "resolved source address as topic: " << address); - return source; - } else if (address.getType() == QUEUE_ADDRESS) { - std::auto_ptr<MessageSource> source(new QueueSource(address)); - QPID_LOG(debug, "resolved source address as queue: " << address); - return source; - } else if (address.getType().empty()) { - throw InvalidAddress("Ambiguous address, please specify queue or topic as node type"); - } else { - throw InvalidAddress("Unrecognised type: " + address.getType()); - } + throw InvalidAddress("Unrecognised type: " + type); } } @@ -320,45 +314,29 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session, const qpid::messaging::Address& address) { - ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName()); - if (result.getQueueNotFound() && result.getExchangeNotFound()) { - //neither a queue nor an exchange exists with that name - if (address.getType() == TOPIC_ADDRESS) { - std::auto_ptr<MessageSink> sink(new ExchangeSink(address)); - QPID_LOG(debug, "treating target address as topic: " << address); - return sink; - } else if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) { - std::auto_ptr<MessageSink> sink(new QueueSink(address)); - QPID_LOG(debug, "treating target address as queue: " << address); - return sink; - } else { - throw InvalidAddress("Unrecognised type: " + address.getType()); - } - } else if (result.getQueueNotFound()) { - //only an exchange exists with that name + std::string type = checkAddressType(session, address); + if (type == TOPIC_ADDRESS) { std::auto_ptr<MessageSink> sink(new ExchangeSink(address)); - QPID_LOG(debug, "resolved target address as topic: " << address); + QPID_LOG(debug, "treating target address as topic: " << address); return sink; - } else if (result.getExchangeNotFound()) { - //only an queue exists with that name + } else if (type == QUEUE_ADDRESS) { std::auto_ptr<MessageSink> sink(new QueueSink(address)); - QPID_LOG(debug, "resolved target address as queue: " << address); + QPID_LOG(debug, "treating target address as queue: " << address); return sink; } else { - //both a queue and exchange exist for that name - if (address.getType() == TOPIC_ADDRESS) { - std::auto_ptr<MessageSink> sink(new ExchangeSink(address)); - QPID_LOG(debug, "resolved target address as topic: " << address); - return sink; - } else if (address.getType() == QUEUE_ADDRESS) { - std::auto_ptr<MessageSink> sink(new QueueSink(address)); - QPID_LOG(debug, "resolved target address as queue: " << address); - return sink; - } else if (address.getType().empty()) { - throw InvalidAddress("Ambiguous address, please specify queue or topic as node type"); - } else { - throw InvalidAddress("Unrecognised type: " + address.getType()); - } + throw InvalidAddress("Unrecognised type: " + type); + } +} + +const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0) +{ + Variant::Map::const_iterator i = options.find(keys[index]); + if (i == options.end()) { + return EMPTY_VARIANT; + } else if (index+1 < keys.size()) { + return getNestedOption(i->second.asMap(), keys, index+1); + } else { + return i->second; } } @@ -366,7 +344,7 @@ QueueSource::QueueSource(const Address& address) : Queue(address), acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT), acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED), - exclusive(address.getOption(EXCLUSIVE).asBool()) + exclusive(getNestedOption(address.getOptions(), list_of<std::string>(X_PROPERTIES)(EXCLUSIVE)).asBool()) { //extract subscription arguments from address options convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options); @@ -713,6 +691,7 @@ void Queue::configure(const Address& address) Exchange::Exchange(const Address& a) : Node(a), type(TOPIC_EXCHANGE), + typeSpecified(false), durable(false), autoDelete(false) { @@ -735,8 +714,10 @@ void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) } else { try { sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) { + throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str()); + } catch (const std::exception& e) { + throw InvalidAddress(e.what()); } } } @@ -756,11 +737,11 @@ void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode) void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) { if (enabled(assertPolicy, mode)) { - ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name); + ExchangeQueryResult result = sync(session).exchangeQuery(name); if (result.getNotFound()) { throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str()); } else { - if (!type.empty() && result.getType() != type) { + if (typeSpecified && result.getType() != type) { throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") % name % type % result.getType()).str()); } @@ -795,7 +776,7 @@ void Exchange::configure(const Address& address) Variant::Map passthrough; for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) { if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second; - else if (i->first == xamqp::EXCHANGE_TYPE) type = i->second.asString(); + else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; } else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); else passthrough[i->first] = i->second; } |