diff options
-rw-r--r-- | qpid/cpp/include/qpid/sys/ExceptionHolder.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 60 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 20 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 30 |
5 files changed, 122 insertions, 31 deletions
diff --git a/qpid/cpp/include/qpid/sys/ExceptionHolder.h b/qpid/cpp/include/qpid/sys/ExceptionHolder.h index 9eff1d64c7..4bc934cf75 100644 --- a/qpid/cpp/include/qpid/sys/ExceptionHolder.h +++ b/qpid/cpp/include/qpid/sys/ExceptionHolder.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -42,14 +42,11 @@ class ExceptionHolder : public Raisable { public: ExceptionHolder() {} // Use default copy & assign. - + /** Take ownership of ex */ template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); } - template <class Ex> ExceptionHolder(const boost::shared_ptr<Ex>& ex) { wrap(ex.release()); } - template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; } - template <class Ex> ExceptionHolder& operator=(boost::shared_ptr<Ex> ex) { wrap(ex.release()); return *this; } - + void raise() const { if (wrapper.get()) wrapper->raise() ; } std::string what() const { return wrapper.get() ? wrapper->what() : std::string(); } bool empty() const { return !wrapper.get(); } @@ -66,7 +63,7 @@ class ExceptionHolder : public Raisable { template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); } boost::shared_ptr<Raisable> wrapper; }; - + }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 2c7589e4b6..3f14d61144 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -19,16 +19,18 @@ * */ +#include "qpid/broker/Broker.h" +#include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/FedOps.h" -#include "qpid/broker/Broker.h" -#include "qpid/management/ManagementAgent.h" #include "qpid/broker/Queue.h" -#include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/sys/ExceptionHolder.h" +#include <stdexcept> using namespace qpid::broker; using namespace qpid::framing; @@ -70,6 +72,36 @@ Exchange::PreRoute::~PreRoute(){ } } +namespace { +/** Store information about an exception to be thrown later. + * If multiple exceptions are stored, save the first of the "most severe" + * exceptions, SESSION is les sever than CONNECTION etc. + */ +class ExInfo { + public: + enum Type { NONE, SESSION, CONNECTION, OTHER }; + + ExInfo(string exchange) : type(NONE), exchange(exchange) {} + void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) { + QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue " + << queue->getName() << ": " << exception_.what()); + if (type < type_) { // Replace less severe exception + type = type_; + exception = exception_; + } + } + + void raise() { + exception.raise(); + } + + private: + Type type; + string exchange; + qpid::sys::ExceptionHolder exception; +}; +} + void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { int count = 0; @@ -80,11 +112,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) msg.getMessage().blockContentRelease(); } + + ExInfo error(getName()); // Save exception to throw at the end. for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); + try { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + catch (const SessionException& e) { + error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue); + } + catch (const ConnectionException& e) { + error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue); + } + catch (const std::exception& e) { + error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue); + } } + error.raise(); } if (mgmtExchange != 0) diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 2059727e7b..34e4592a15 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -303,11 +303,11 @@ QPID_AUTO_TEST_CASE(testSeek){ QueuedMessage qm; queue->dispatch(consumer); - + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); queue->dispatch(consumer); queue->dispatch(consumer); // make sure over-run is safe - + } QPID_AUTO_TEST_CASE(testSearch){ @@ -325,15 +325,15 @@ QPID_AUTO_TEST_CASE(testSearch){ SequenceNumber seq(2); QueuedMessage qm = queue->find(seq); - + BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue()); - + queue->acquire(qm); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); SequenceNumber seq1(3); QueuedMessage qm1 = queue->find(seq1); BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue()); - + } const std::string nullxid = ""; @@ -875,28 +875,40 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content DeliverableMessage dmsg02(msg02); - BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException); + { + ScopedSuppressLogging sl; // suppress expected error messages. + BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException); + } msg02->tryReleaseContent(); BOOST_CHECK_EQUAL(msg02->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content DeliverableMessage dmsg03(msg03); - BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException); + { + ScopedSuppressLogging sl; // suppress expected error messages. + BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException); + } msg03->tryReleaseContent(); BOOST_CHECK_EQUAL(msg03->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content DeliverableMessage dmsg04(msg04); - BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException); + { + ScopedSuppressLogging sl; // suppress expected error messages. + BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException); + } msg04->tryReleaseContent(); BOOST_CHECK_EQUAL(msg04->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content DeliverableMessage dmsg05(msg05); - BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException); + { + ScopedSuppressLogging sl; // suppress expected error messages. + BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException); + } msg05->tryReleaseContent(); BOOST_CHECK_EQUAL(msg05->isContentReleased(), false); BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 4abe4c2cbe..a19dd305e5 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -484,18 +484,24 @@ class BrokerTest(TestCase): cluster = Cluster(self, count, args, expect=expect, wait=wait) return cluster - def assert_browse(self, session, queue, expect_contents, timeout=0): + def browse(self, session, queue, timeout=0): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" - r = session.receiver("%s;{mode:browse}"%(queue)) - actual_contents = [] try: - for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content) - while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages. - except messaging.Empty: pass - r.close() + contents = [] + try: + while True: contents.append(r.fetch(timeout=timeout).content) + except messaging.Empty: pass + finally: pass #FIXME aconway 2011-04-14: r.close() + return contents + + def assert_browse(self, session, queue, expect_contents, timeout=0): + """Assert that the contents of messages on queue (as retrieved + using session and timeout) exactly match the strings in + expect_contents""" + actual_contents = self.browse(session, queue, timeout) self.assertEqual(expect_contents, actual_contents) def join(thread, timeout=10): diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 42f3ae3d25..73c20d451d 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -449,6 +449,36 @@ acl allow all all cluster.start() verify(cluster[1]) + def test_binding_order(self): + """Regression test for binding order inconsistency in cluster""" + cluster = self.cluster(1) + c0 = cluster[0].connect() + s0 = c0.session() + # Declare multiple queues bound to same key on amq.topic + def declare(q,max=0): + if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d}}'%max + else: declare = 'x-declare:{}' + bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q) + s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind)) + declare('d',max=4) # Only one with a limit + for q in ['c', 'b','a']: declare(q) + # Add a cluster member, send enough messages to exceed the max count + cluster.start() + try: + s = s0.sender('amq.topic/key') + for m in xrange(1,6): s.send(Message(str(m))) + self.fail("Expected capacity exceeded exception") + except messaging.exceptions.TargetCapacityExceeded: pass + c1 = cluster[1].connect() + s1 = c1.session() + s0 = c0.session() # Old session s0 is broken by exception. + # Verify queue contents are consistent. + for q in ['a','b','c','d']: + self.assertEqual(self.browse(s0, q), self.browse(s1, q)) + # Verify queue contents are "best effort" + for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) + self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): |