summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-04-15 17:03:17 +0000
committerAlan Conway <aconway@apache.org>2011-04-15 17:03:17 +0000
commitd53fc2181c44aa6a19c827aaacdc959d338bcdf8 (patch)
tree3e57ca07748d726addfe32e00938ed83456b58d1 /qpid/cpp/src/qpid
parentb89a06f8579d703a9edee732c67f9496bac54d81 (diff)
downloadqpid-python-d53fc2181c44aa6a19c827aaacdc959d338bcdf8.tar.gz
QPID-3208: Exchanges make best effort to route messages if there is an error.
Previously if multiple queues were bound to the same routing key, then a failure to deliver to one of the queues (e.g. policy limit error) could prevent delivery on some of the other queues. With this commit the exchange delivers to every queue that did not have an error before raising an error. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1092765 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp59
1 files changed, 56 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 2c7589e4b6..7cf3ec162f 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -29,6 +29,7 @@
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/DeliverableMessage.h"
+#include "stdexcept"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -70,6 +71,44 @@ Exchange::PreRoute::~PreRoute(){
}
}
+namespace {
+/** Store information about an exception to be thrown later.
+ * If multiple exceptions are stored, save the first of the "most severe"
+ * exceptions, SESSION is les sever than CONNECTION etc.
+ */
+class ExInfo {
+ public:
+ enum Type { NONE, SESSION, CONNECTION, OTHER };
+
+ ExInfo(string ex) : type(NONE), code(0), exchange(ex) {}
+
+ void store(Type type_, const char* what_, int code_, const boost::shared_ptr<Queue>& queue) {
+ QPID_LOG(error, "Exchange " << exchange << " cannot deliver to queue "
+ << queue->getName() << ": " << what_);
+ if (type < type_) { // Replace less severe error
+ type = type_;
+ what = what_;
+ code = code_;
+ }
+ }
+
+ void raise() {
+ switch (type) {
+ case NONE: break;
+ case SESSION: throw qpid::SessionException(qpid::framing::execution::ErrorCode(code), what);
+ case CONNECTION: throw qpid::ConnectionException(qpid::framing::connection::CloseCode(code), what);
+ case OTHER: throw std::runtime_error(what);
+ }
+ }
+
+ private:
+ Type type;
+ string what;
+ int code;
+ string exchange;
+};
+}
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
int count = 0;
@@ -80,11 +119,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
msg.getMessage().blockContentRelease();
}
+
+ ExInfo error(getName()); // Save errors to throw at the end.
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ try {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ catch (const SessionException& e) {
+ error.store(ExInfo::SESSION, e.what(), int(e.code), (*i)->queue);
+ }
+ catch (const ConnectionException& e) {
+ error.store(ExInfo::CONNECTION, e.what(), int(e.code), (*i)->queue);
+ }
+ catch (const std::exception& e) {
+ error.store(ExInfo::OTHER, e.what(), 0, (*i)->queue);
+ }
}
+ error.raise();
}
if (mgmtExchange != 0)