summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Exchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp59
1 files changed, 56 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 2c7589e4b6..7cf3ec162f 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -29,6 +29,7 @@
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/DeliverableMessage.h"
+#include "stdexcept"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -70,6 +71,44 @@ Exchange::PreRoute::~PreRoute(){
}
}
+namespace {
+/** Store information about an exception to be thrown later.
+ * If multiple exceptions are stored, save the first of the "most severe"
+ * exceptions, SESSION is les sever than CONNECTION etc.
+ */
+class ExInfo {
+ public:
+ enum Type { NONE, SESSION, CONNECTION, OTHER };
+
+ ExInfo(string ex) : type(NONE), code(0), exchange(ex) {}
+
+ void store(Type type_, const char* what_, int code_, const boost::shared_ptr<Queue>& queue) {
+ QPID_LOG(error, "Exchange " << exchange << " cannot deliver to queue "
+ << queue->getName() << ": " << what_);
+ if (type < type_) { // Replace less severe error
+ type = type_;
+ what = what_;
+ code = code_;
+ }
+ }
+
+ void raise() {
+ switch (type) {
+ case NONE: break;
+ case SESSION: throw qpid::SessionException(qpid::framing::execution::ErrorCode(code), what);
+ case CONNECTION: throw qpid::ConnectionException(qpid::framing::connection::CloseCode(code), what);
+ case OTHER: throw std::runtime_error(what);
+ }
+ }
+
+ private:
+ Type type;
+ string what;
+ int code;
+ string exchange;
+};
+}
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
int count = 0;
@@ -80,11 +119,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
msg.getMessage().blockContentRelease();
}
+
+ ExInfo error(getName()); // Save errors to throw at the end.
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ try {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ catch (const SessionException& e) {
+ error.store(ExInfo::SESSION, e.what(), int(e.code), (*i)->queue);
+ }
+ catch (const ConnectionException& e) {
+ error.store(ExInfo::CONNECTION, e.what(), int(e.code), (*i)->queue);
+ }
+ catch (const std::exception& e) {
+ error.store(ExInfo::OTHER, e.what(), 0, (*i)->queue);
+ }
}
+ error.raise();
}
if (mgmtExchange != 0)