summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-04-18 20:40:53 +0000
committerAlan Conway <aconway@apache.org>2011-04-18 20:40:53 +0000
commit6c904379b3ba82dd5d5b46723363f51e5770e17f (patch)
tree1c07031eb92aa33430aca6774d6c514a0c551a59
parent619b99c2ea7529efd8594d59efbbab3d81c8630d (diff)
downloadqpid-python-6c904379b3ba82dd5d5b46723363f51e5770e17f.tar.gz
QPID-3208: Exchanges make best effort to route messages if there is an error.
Previously if multiple queues were bound to the same routing key, then a failure to deliver to one of the queues (e.g. policy limit error) could prevent delivery on some of the other queues. With this commit the exchange delivers to every queue that did not have an error before raising an error. Note: this was originally committed as r1092765, but it caused test failures was reverted as r1092804. The original commit did not create exceptions of the correct type. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1094734 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/sys/ExceptionHolder.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp60
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp30
-rw-r--r--qpid/cpp/src/tests/brokertest.py20
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py30
5 files changed, 122 insertions, 31 deletions
diff --git a/qpid/cpp/include/qpid/sys/ExceptionHolder.h b/qpid/cpp/include/qpid/sys/ExceptionHolder.h
index 9eff1d64c7..4bc934cf75 100644
--- a/qpid/cpp/include/qpid/sys/ExceptionHolder.h
+++ b/qpid/cpp/include/qpid/sys/ExceptionHolder.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,14 +42,11 @@ class ExceptionHolder : public Raisable {
public:
ExceptionHolder() {}
// Use default copy & assign.
-
+
/** Take ownership of ex */
template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); }
- template <class Ex> ExceptionHolder(const boost::shared_ptr<Ex>& ex) { wrap(ex.release()); }
-
template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; }
- template <class Ex> ExceptionHolder& operator=(boost::shared_ptr<Ex> ex) { wrap(ex.release()); return *this; }
-
+
void raise() const { if (wrapper.get()) wrapper->raise() ; }
std::string what() const { return wrapper.get() ? wrapper->what() : std::string(); }
bool empty() const { return !wrapper.get(); }
@@ -66,7 +63,7 @@ class ExceptionHolder : public Raisable {
template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); }
boost::shared_ptr<Raisable> wrapper;
};
-
+
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 2c7589e4b6..3f14d61144 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -19,16 +19,18 @@
*
*/
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/FedOps.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
-#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/ExceptionHolder.h"
+#include <stdexcept>
using namespace qpid::broker;
using namespace qpid::framing;
@@ -70,6 +72,36 @@ Exchange::PreRoute::~PreRoute(){
}
}
+namespace {
+/** Store information about an exception to be thrown later.
+ * If multiple exceptions are stored, save the first of the "most severe"
+ * exceptions, SESSION is les sever than CONNECTION etc.
+ */
+class ExInfo {
+ public:
+ enum Type { NONE, SESSION, CONNECTION, OTHER };
+
+ ExInfo(string exchange) : type(NONE), exchange(exchange) {}
+ void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) {
+ QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue "
+ << queue->getName() << ": " << exception_.what());
+ if (type < type_) { // Replace less severe exception
+ type = type_;
+ exception = exception_;
+ }
+ }
+
+ void raise() {
+ exception.raise();
+ }
+
+ private:
+ Type type;
+ string exchange;
+ qpid::sys::ExceptionHolder exception;
+};
+}
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
int count = 0;
@@ -80,11 +112,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
msg.getMessage().blockContentRelease();
}
+
+ ExInfo error(getName()); // Save exception to throw at the end.
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ try {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ catch (const SessionException& e) {
+ error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue);
+ }
+ catch (const ConnectionException& e) {
+ error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue);
+ }
+ catch (const std::exception& e) {
+ error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue);
+ }
}
+ error.raise();
}
if (mgmtExchange != 0)
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 2059727e7b..34e4592a15 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -303,11 +303,11 @@ QPID_AUTO_TEST_CASE(testSeek){
QueuedMessage qm;
queue->dispatch(consumer);
-
+
BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
queue->dispatch(consumer);
queue->dispatch(consumer); // make sure over-run is safe
-
+
}
QPID_AUTO_TEST_CASE(testSearch){
@@ -325,15 +325,15 @@ QPID_AUTO_TEST_CASE(testSearch){
SequenceNumber seq(2);
QueuedMessage qm = queue->find(seq);
-
+
BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
+
queue->acquire(qm);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
SequenceNumber seq1(3);
QueuedMessage qm1 = queue->find(seq1);
BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
+
}
const std::string nullxid = "";
@@ -875,28 +875,40 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg02(msg02);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ }
msg02->tryReleaseContent();
BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg03(msg03);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ }
msg03->tryReleaseContent();
BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg04(msg04);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ }
msg04->tryReleaseContent();
BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg05(msg05);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ }
msg05->tryReleaseContent();
BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 4abe4c2cbe..a19dd305e5 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -484,18 +484,24 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
- def assert_browse(self, session, queue, expect_contents, timeout=0):
+ def browse(self, session, queue, timeout=0):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
-
r = session.receiver("%s;{mode:browse}"%(queue))
- actual_contents = []
try:
- for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
- while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
- except messaging.Empty: pass
- r.close()
+ contents = []
+ try:
+ while True: contents.append(r.fetch(timeout=timeout).content)
+ except messaging.Empty: pass
+ finally: pass #FIXME aconway 2011-04-14: r.close()
+ return contents
+
+ def assert_browse(self, session, queue, expect_contents, timeout=0):
+ """Assert that the contents of messages on queue (as retrieved
+ using session and timeout) exactly match the strings in
+ expect_contents"""
+ actual_contents = self.browse(session, queue, timeout)
self.assertEqual(expect_contents, actual_contents)
def join(thread, timeout=10):
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 42f3ae3d25..73c20d451d 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -449,6 +449,36 @@ acl allow all all
cluster.start()
verify(cluster[1])
+ def test_binding_order(self):
+ """Regression test for binding order inconsistency in cluster"""
+ cluster = self.cluster(1)
+ c0 = cluster[0].connect()
+ s0 = c0.session()
+ # Declare multiple queues bound to same key on amq.topic
+ def declare(q,max=0):
+ if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d}}'%max
+ else: declare = 'x-declare:{}'
+ bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
+ s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
+ declare('d',max=4) # Only one with a limit
+ for q in ['c', 'b','a']: declare(q)
+ # Add a cluster member, send enough messages to exceed the max count
+ cluster.start()
+ try:
+ s = s0.sender('amq.topic/key')
+ for m in xrange(1,6): s.send(Message(str(m)))
+ self.fail("Expected capacity exceeded exception")
+ except messaging.exceptions.TargetCapacityExceeded: pass
+ c1 = cluster[1].connect()
+ s1 = c1.session()
+ s0 = c0.session() # Old session s0 is broken by exception.
+ # Verify queue contents are consistent.
+ for q in ['a','b','c','d']:
+ self.assertEqual(self.browse(s0, q), self.browse(s1, q))
+ # Verify queue contents are "best effort"
+ for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
+ self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):