summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-04-18 20:40:53 +0000
committerAlan Conway <aconway@apache.org>2011-04-18 20:40:53 +0000
commit6c904379b3ba82dd5d5b46723363f51e5770e17f (patch)
tree1c07031eb92aa33430aca6774d6c514a0c551a59 /qpid/cpp/src/qpid
parent619b99c2ea7529efd8594d59efbbab3d81c8630d (diff)
downloadqpid-python-6c904379b3ba82dd5d5b46723363f51e5770e17f.tar.gz
QPID-3208: Exchanges make best effort to route messages if there is an error.
Previously if multiple queues were bound to the same routing key, then a failure to deliver to one of the queues (e.g. policy limit error) could prevent delivery on some of the other queues. With this commit the exchange delivers to every queue that did not have an error before raising an error. Note: this was originally committed as r1092765, but it caused test failures was reverted as r1092804. The original commit did not create exceptions of the correct type. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1094734 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp60
1 files changed, 53 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 2c7589e4b6..3f14d61144 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -19,16 +19,18 @@
*
*/
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/FedOps.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
-#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/ExceptionHolder.h"
+#include <stdexcept>
using namespace qpid::broker;
using namespace qpid::framing;
@@ -70,6 +72,36 @@ Exchange::PreRoute::~PreRoute(){
}
}
+namespace {
+/** Store information about an exception to be thrown later.
+ * If multiple exceptions are stored, save the first of the "most severe"
+ * exceptions, SESSION is les sever than CONNECTION etc.
+ */
+class ExInfo {
+ public:
+ enum Type { NONE, SESSION, CONNECTION, OTHER };
+
+ ExInfo(string exchange) : type(NONE), exchange(exchange) {}
+ void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) {
+ QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue "
+ << queue->getName() << ": " << exception_.what());
+ if (type < type_) { // Replace less severe exception
+ type = type_;
+ exception = exception_;
+ }
+ }
+
+ void raise() {
+ exception.raise();
+ }
+
+ private:
+ Type type;
+ string exchange;
+ qpid::sys::ExceptionHolder exception;
+};
+}
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
int count = 0;
@@ -80,11 +112,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
msg.getMessage().blockContentRelease();
}
+
+ ExInfo error(getName()); // Save exception to throw at the end.
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ try {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ catch (const SessionException& e) {
+ error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue);
+ }
+ catch (const ConnectionException& e) {
+ error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue);
+ }
+ catch (const std::exception& e) {
+ error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue);
+ }
}
+ error.raise();
}
if (mgmtExchange != 0)