summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AddressResolution.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp102
1 files changed, 52 insertions, 50 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index f64a46ba01..43b581861f 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -26,7 +26,7 @@
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
#include "qpid/types/Variant.h"
-#include "qpid/Exception.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ExchangeBoundResult.h"
@@ -45,7 +45,10 @@ namespace amqp0_10 {
using qpid::Exception;
using qpid::messaging::Address;
-using qpid::messaging::InvalidAddress;
+using qpid::messaging::MalformedAddress;
+using qpid::messaging::ResolutionError;
+using qpid::messaging::NotFound;
+using qpid::messaging::AssertionFailed;
using qpid::framing::ExchangeBoundResult;
using qpid::framing::ExchangeQueryResult;
using qpid::framing::FieldTable;
@@ -360,7 +363,7 @@ bool AddressResolution::is_reliable(const Address& address)
std::string checkAddressType(qpid::client::Session session, const Address& address)
{
if (address.getName().empty()) {
- throw InvalidAddress("Name cannot be null");
+ throw MalformedAddress("Name cannot be null");
}
std::string type = (Opt(address)/NODE/TYPE).str();
if (type.empty()) {
@@ -376,7 +379,7 @@ std::string checkAddressType(qpid::client::Session session, const Address& addre
type = TOPIC_ADDRESS;
} else {
//both a queue and exchange exist for that name
- throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
+ throw ResolutionError("Ambiguous address, please specify queue or topic as node type");
}
}
return type;
@@ -396,7 +399,7 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess
QPID_LOG(debug, "treating source address as queue: " << address);
return source;
} else {
- throw InvalidAddress("Unrecognised type: " + type);
+ throw ResolutionError("Unrecognised type: " + type);
}
}
@@ -414,7 +417,7 @@ std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session
QPID_LOG(debug, "treating target address as queue: " << address);
return sink;
} else {
- throw InvalidAddress("Unrecognised type: " + type);
+ throw ResolutionError("Unrecognised type: " + type);
}
}
@@ -424,7 +427,7 @@ bool isBrowse(const Address& address)
if (!mode.isVoid()) {
std::string value = mode.asString();
if (value == BROWSE) return true;
- else if (value != CONSUME) throw InvalidAddress("Invalid mode");
+ else if (value != CONSUME) throw ResolutionError("Invalid mode");
}
return false;
}
@@ -516,7 +519,7 @@ void Subscription::bindAll()
b.arguments.setString("x-match", "all");
bindings.push_back(b);
} else { //E.g. direct and xml
- throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType));
+ throw ResolutionError(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType));
}
}
@@ -662,23 +665,26 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
if (enabled(createPolicy, mode)) {
QPID_LOG(debug, "Auto-creating queue '" << name << "'");
try {
- sync(session).queueDeclare(arg::queue=name,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::exclusive=exclusive,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
+ session.queueDeclare(arg::queue=name,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::exclusive=exclusive,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ nodeBindings.bind(session);
+ session.sync();
+ } catch (const qpid::framing::ResourceLockedException& e) {
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotAllowedException& e) {
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {//may be thrown when creating bindings
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
}
- nodeBindings.bind(session);
} else {
try {
sync(session).queueDeclare(arg::queue=name, arg::passive=true);
} catch (const qpid::framing::NotFoundException& /*e*/) {
- throw InvalidAddress((boost::format("Queue %1% does not exist") % name).str());
- } catch (const std::exception& e) {
- throw InvalidAddress(e.what());
+ throw NotFound((boost::format("Queue %1% does not exist") % name).str());
}
}
}
@@ -700,27 +706,27 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
if (enabled(assertPolicy, mode)) {
QueueQueryResult result = sync(session).queueQuery(name);
if (result.getQueue() != name) {
- throw InvalidAddress((boost::format("Queue not found: %1%") % name).str());
+ throw NotFound((boost::format("Queue not found: %1%") % name).str());
} else {
if (durable && !result.getDurable()) {
- throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str());
+ throw AssertionFailed((boost::format("Queue not durable: %1%") % name).str());
}
if (autoDelete && !result.getAutoDelete()) {
- throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str());
+ throw AssertionFailed((boost::format("Queue not set to auto-delete: %1%") % name).str());
}
if (exclusive && !result.getExclusive()) {
- throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str());
+ throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str());
}
if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
- throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
% name % alternateExchange % result.getAlternateExchange()).str());
}
for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
- throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
} else if (*i->second != *v) {
- throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
}
@@ -746,23 +752,24 @@ void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
try {
std::string type = specifiedType;
if (type.empty()) type = TOPIC_EXCHANGE;
- sync(session).exchangeDeclare(arg::exchange=name,
+ session.exchangeDeclare(arg::exchange=name,
arg::type=type,
arg::durable=durable,
arg::autoDelete=autoDelete,
arg::alternateExchange=alternateExchange,
arg::arguments=arguments);
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
+ nodeBindings.bind(session);
+ session.sync();
+ } catch (const qpid::framing::NotAllowedException& e) {
+ throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {//can be caused when creating bindings
+ throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str());
}
- nodeBindings.bind(session);
} else {
try {
sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
} catch (const qpid::framing::NotFoundException& /*e*/) {
- throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str());
- } catch (const std::exception& e) {
- throw InvalidAddress(e.what());
+ throw NotFound((boost::format("Exchange %1% does not exist") % name).str());
}
}
}
@@ -784,14 +791,14 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
if (enabled(assertPolicy, mode)) {
ExchangeQueryResult result = sync(session).exchangeQuery(name);
if (result.getNotFound()) {
- throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
+ throw NotFound((boost::format("Exchange not found: %1%") % name).str());
} else {
if (specifiedType.size() && result.getType() != specifiedType) {
- throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
% name % specifiedType % result.getType()).str());
}
if (durable && !result.getDurable()) {
- throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
+ throw AssertionFailed((boost::format("Exchange not durable: %1%") % name).str());
}
//Note: Can't check auto-delete or alternate-exchange via
//exchange-query-result as these are not returned
@@ -799,9 +806,9 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
- throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
} else if (i->second != v) {
- throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
}
@@ -844,16 +851,11 @@ void Bindings::setDefaultQueue(const std::string& queue)
void Bindings::bind(qpid::client::AsyncSession& session)
{
- try {
- for (Bindings::const_iterator i = begin(); i != end(); ++i) {
- session.exchangeBind(arg::queue=i->queue,
- arg::exchange=i->exchange,
- arg::bindingKey=i->key,
- arg::arguments=i->arguments);
- }
- session.sync();
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str());
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeBind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key,
+ arg::arguments=i->arguments);
}
}
@@ -873,7 +875,7 @@ void Bindings::check(qpid::client::AsyncSession& session)
arg::exchange=i->exchange,
arg::bindingKey=i->key);
if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
% i->exchange % i->queue % i->key).str());
}
}