diff options
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 59 |
1 files changed, 56 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 2c7589e4b6..7cf3ec162f 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -29,6 +29,7 @@ #include "qpid/framing/MessageProperties.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/DeliverableMessage.h" +#include "stdexcept" using namespace qpid::broker; using namespace qpid::framing; @@ -70,6 +71,44 @@ Exchange::PreRoute::~PreRoute(){ } } +namespace { +/** Store information about an exception to be thrown later. + * If multiple exceptions are stored, save the first of the "most severe" + * exceptions, SESSION is les sever than CONNECTION etc. + */ +class ExInfo { + public: + enum Type { NONE, SESSION, CONNECTION, OTHER }; + + ExInfo(string ex) : type(NONE), code(0), exchange(ex) {} + + void store(Type type_, const char* what_, int code_, const boost::shared_ptr<Queue>& queue) { + QPID_LOG(error, "Exchange " << exchange << " cannot deliver to queue " + << queue->getName() << ": " << what_); + if (type < type_) { // Replace less severe error + type = type_; + what = what_; + code = code_; + } + } + + void raise() { + switch (type) { + case NONE: break; + case SESSION: throw qpid::SessionException(qpid::framing::execution::ErrorCode(code), what); + case CONNECTION: throw qpid::ConnectionException(qpid::framing::connection::CloseCode(code), what); + case OTHER: throw std::runtime_error(what); + } + } + + private: + Type type; + string what; + int code; + string exchange; +}; +} + void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { int count = 0; @@ -80,11 +119,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) msg.getMessage().blockContentRelease(); } + + ExInfo error(getName()); // Save errors to throw at the end. for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); + try { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + catch (const SessionException& e) { + error.store(ExInfo::SESSION, e.what(), int(e.code), (*i)->queue); + } + catch (const ConnectionException& e) { + error.store(ExInfo::CONNECTION, e.what(), int(e.code), (*i)->queue); + } + catch (const std::exception& e) { + error.store(ExInfo::OTHER, e.what(), 0, (*i)->queue); + } } + error.raise(); } if (mgmtExchange != 0) |