diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AddressResolution.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 102 |
1 files changed, 52 insertions, 50 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index f64a46ba01..43b581861f 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -26,7 +26,7 @@ #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/types/Variant.h" -#include "qpid/Exception.h" +#include "qpid/messaging/exceptions.h" #include "qpid/log/Statement.h" #include "qpid/framing/enum.h" #include "qpid/framing/ExchangeBoundResult.h" @@ -45,7 +45,10 @@ namespace amqp0_10 { using qpid::Exception; using qpid::messaging::Address; -using qpid::messaging::InvalidAddress; +using qpid::messaging::MalformedAddress; +using qpid::messaging::ResolutionError; +using qpid::messaging::NotFound; +using qpid::messaging::AssertionFailed; using qpid::framing::ExchangeBoundResult; using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; @@ -360,7 +363,7 @@ bool AddressResolution::is_reliable(const Address& address) std::string checkAddressType(qpid::client::Session session, const Address& address) { if (address.getName().empty()) { - throw InvalidAddress("Name cannot be null"); + throw MalformedAddress("Name cannot be null"); } std::string type = (Opt(address)/NODE/TYPE).str(); if (type.empty()) { @@ -376,7 +379,7 @@ std::string checkAddressType(qpid::client::Session session, const Address& addre type = TOPIC_ADDRESS; } else { //both a queue and exchange exist for that name - throw InvalidAddress("Ambiguous address, please specify queue or topic as node type"); + throw ResolutionError("Ambiguous address, please specify queue or topic as node type"); } } return type; @@ -396,7 +399,7 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess QPID_LOG(debug, "treating source address as queue: " << address); return source; } else { - throw InvalidAddress("Unrecognised type: " + type); + throw ResolutionError("Unrecognised type: " + type); } } @@ -414,7 +417,7 @@ std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session QPID_LOG(debug, "treating target address as queue: " << address); return sink; } else { - throw InvalidAddress("Unrecognised type: " + type); + throw ResolutionError("Unrecognised type: " + type); } } @@ -424,7 +427,7 @@ bool isBrowse(const Address& address) if (!mode.isVoid()) { std::string value = mode.asString(); if (value == BROWSE) return true; - else if (value != CONSUME) throw InvalidAddress("Invalid mode"); + else if (value != CONSUME) throw ResolutionError("Invalid mode"); } return false; } @@ -516,7 +519,7 @@ void Subscription::bindAll() b.arguments.setString("x-match", "all"); bindings.push_back(b); } else { //E.g. direct and xml - throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType)); + throw ResolutionError(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType)); } } @@ -662,23 +665,26 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(createPolicy, mode)) { QPID_LOG(debug, "Auto-creating queue '" << name << "'"); try { - sync(session).queueDeclare(arg::queue=name, - arg::durable=durable, - arg::autoDelete=autoDelete, - arg::exclusive=exclusive, - arg::alternateExchange=alternateExchange, - arg::arguments=arguments); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str()); + session.queueDeclare(arg::queue=name, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::exclusive=exclusive, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + nodeBindings.bind(session); + session.sync(); + } catch (const qpid::framing::ResourceLockedException& e) { + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotAllowedException& e) { + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) {//may be thrown when creating bindings + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); } - nodeBindings.bind(session); } else { try { sync(session).queueDeclare(arg::queue=name, arg::passive=true); } catch (const qpid::framing::NotFoundException& /*e*/) { - throw InvalidAddress((boost::format("Queue %1% does not exist") % name).str()); - } catch (const std::exception& e) { - throw InvalidAddress(e.what()); + throw NotFound((boost::format("Queue %1% does not exist") % name).str()); } } } @@ -700,27 +706,27 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(assertPolicy, mode)) { QueueQueryResult result = sync(session).queueQuery(name); if (result.getQueue() != name) { - throw InvalidAddress((boost::format("Queue not found: %1%") % name).str()); + throw NotFound((boost::format("Queue not found: %1%") % name).str()); } else { if (durable && !result.getDurable()) { - throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not durable: %1%") % name).str()); } if (autoDelete && !result.getAutoDelete()) { - throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not set to auto-delete: %1%") % name).str()); } if (exclusive && !result.getExclusive()) { - throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str()); } if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) { - throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") + throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") % name % alternateExchange % result.getAlternateExchange()).str()); } for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { - throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); } else if (*i->second != *v) { - throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } } @@ -746,23 +752,24 @@ void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) try { std::string type = specifiedType; if (type.empty()) type = TOPIC_EXCHANGE; - sync(session).exchangeDeclare(arg::exchange=name, + session.exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::autoDelete=autoDelete, arg::alternateExchange=alternateExchange, arg::arguments=arguments); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str()); + nodeBindings.bind(session); + session.sync(); + } catch (const qpid::framing::NotAllowedException& e) { + throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) {//can be caused when creating bindings + throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str()); } - nodeBindings.bind(session); } else { try { sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); } catch (const qpid::framing::NotFoundException& /*e*/) { - throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str()); - } catch (const std::exception& e) { - throw InvalidAddress(e.what()); + throw NotFound((boost::format("Exchange %1% does not exist") % name).str()); } } } @@ -784,14 +791,14 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(assertPolicy, mode)) { ExchangeQueryResult result = sync(session).exchangeQuery(name); if (result.getNotFound()) { - throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str()); + throw NotFound((boost::format("Exchange not found: %1%") % name).str()); } else { if (specifiedType.size() && result.getType() != specifiedType) { - throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") + throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") % name % specifiedType % result.getType()).str()); } if (durable && !result.getDurable()) { - throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str()); + throw AssertionFailed((boost::format("Exchange not durable: %1%") % name).str()); } //Note: Can't check auto-delete or alternate-exchange via //exchange-query-result as these are not returned @@ -799,9 +806,9 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { - throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); } else if (i->second != v) { - throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } } @@ -844,16 +851,11 @@ void Bindings::setDefaultQueue(const std::string& queue) void Bindings::bind(qpid::client::AsyncSession& session) { - try { - for (Bindings::const_iterator i = begin(); i != end(); ++i) { - session.exchangeBind(arg::queue=i->queue, - arg::exchange=i->exchange, - arg::bindingKey=i->key, - arg::arguments=i->arguments); - } - session.sync(); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str()); + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + session.exchangeBind(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key, + arg::arguments=i->arguments); } } @@ -873,7 +875,7 @@ void Bindings::check(qpid::client::AsyncSession& session) arg::exchange=i->exchange, arg::bindingKey=i->key); if (result.getQueueNotMatched() || result.getKeyNotMatched()) { - throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") + throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") % i->exchange % i->queue % i->key).str()); } } |