diff options
author | Alan Conway <aconway@apache.org> | 2011-04-15 20:40:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-04-15 20:40:06 +0000 |
commit | e19102eab2df74b491000e7376b459bead70d173 (patch) | |
tree | 248feb518770ef5776dcfa4e05d080212dbf9c5a | |
parent | 8f1c852464651469d4a71c2cd7f0d64863f99974 (diff) | |
download | qpid-python-e19102eab2df74b491000e7376b459bead70d173.tar.gz |
Revert "QPID-3208: Exchanges make best effort to route messages if there is an error."
This reverts commit r1092765 which introduced test failures in make check.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1092804 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 59 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 20 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 30 |
3 files changed, 10 insertions, 99 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 7cf3ec162f..2c7589e4b6 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -29,7 +29,6 @@ #include "qpid/framing/MessageProperties.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/DeliverableMessage.h" -#include "stdexcept" using namespace qpid::broker; using namespace qpid::framing; @@ -71,44 +70,6 @@ Exchange::PreRoute::~PreRoute(){ } } -namespace { -/** Store information about an exception to be thrown later. - * If multiple exceptions are stored, save the first of the "most severe" - * exceptions, SESSION is les sever than CONNECTION etc. - */ -class ExInfo { - public: - enum Type { NONE, SESSION, CONNECTION, OTHER }; - - ExInfo(string ex) : type(NONE), code(0), exchange(ex) {} - - void store(Type type_, const char* what_, int code_, const boost::shared_ptr<Queue>& queue) { - QPID_LOG(error, "Exchange " << exchange << " cannot deliver to queue " - << queue->getName() << ": " << what_); - if (type < type_) { // Replace less severe error - type = type_; - what = what_; - code = code_; - } - } - - void raise() { - switch (type) { - case NONE: break; - case SESSION: throw qpid::SessionException(qpid::framing::execution::ErrorCode(code), what); - case CONNECTION: throw qpid::ConnectionException(qpid::framing::connection::CloseCode(code), what); - case OTHER: throw std::runtime_error(what); - } - } - - private: - Type type; - string what; - int code; - string exchange; -}; -} - void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { int count = 0; @@ -119,25 +80,11 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) msg.getMessage().blockContentRelease(); } - - ExInfo error(getName()); // Save errors to throw at the end. for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { - try { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); - } - catch (const SessionException& e) { - error.store(ExInfo::SESSION, e.what(), int(e.code), (*i)->queue); - } - catch (const ConnectionException& e) { - error.store(ExInfo::CONNECTION, e.what(), int(e.code), (*i)->queue); - } - catch (const std::exception& e) { - error.store(ExInfo::OTHER, e.what(), 0, (*i)->queue); - } + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); } - error.raise(); } if (mgmtExchange != 0) diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index a19dd305e5..4abe4c2cbe 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -484,24 +484,18 @@ class BrokerTest(TestCase): cluster = Cluster(self, count, args, expect=expect, wait=wait) return cluster - def browse(self, session, queue, timeout=0): + def assert_browse(self, session, queue, expect_contents, timeout=0): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" + r = session.receiver("%s;{mode:browse}"%(queue)) + actual_contents = [] try: - contents = [] - try: - while True: contents.append(r.fetch(timeout=timeout).content) - except messaging.Empty: pass - finally: pass #FIXME aconway 2011-04-14: r.close() - return contents - - def assert_browse(self, session, queue, expect_contents, timeout=0): - """Assert that the contents of messages on queue (as retrieved - using session and timeout) exactly match the strings in - expect_contents""" - actual_contents = self.browse(session, queue, timeout) + for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content) + while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages. + except messaging.Empty: pass + r.close() self.assertEqual(expect_contents, actual_contents) def join(thread, timeout=10): diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 73c20d451d..42f3ae3d25 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -449,36 +449,6 @@ acl allow all all cluster.start() verify(cluster[1]) - def test_binding_order(self): - """Regression test for binding order inconsistency in cluster""" - cluster = self.cluster(1) - c0 = cluster[0].connect() - s0 = c0.session() - # Declare multiple queues bound to same key on amq.topic - def declare(q,max=0): - if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d}}'%max - else: declare = 'x-declare:{}' - bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q) - s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind)) - declare('d',max=4) # Only one with a limit - for q in ['c', 'b','a']: declare(q) - # Add a cluster member, send enough messages to exceed the max count - cluster.start() - try: - s = s0.sender('amq.topic/key') - for m in xrange(1,6): s.send(Message(str(m))) - self.fail("Expected capacity exceeded exception") - except messaging.exceptions.TargetCapacityExceeded: pass - c1 = cluster[1].connect() - s1 = c1.session() - s0 = c0.session() # Old session s0 is broken by exception. - # Verify queue contents are consistent. - for q in ['a','b','c','d']: - self.assertEqual(self.browse(s0, q), self.browse(s1, q)) - # Verify queue contents are "best effort" - for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) - self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) - class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): |