summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp59
-rw-r--r--qpid/cpp/src/tests/brokertest.py20
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py30
3 files changed, 99 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 2c7589e4b6..7cf3ec162f 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -29,6 +29,7 @@
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/DeliverableMessage.h"
+#include "stdexcept"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -70,6 +71,44 @@ Exchange::PreRoute::~PreRoute(){
}
}
+namespace {
+/** Store information about an exception to be thrown later.
+ * If multiple exceptions are stored, save the first of the "most severe"
+ * exceptions, SESSION is les sever than CONNECTION etc.
+ */
+class ExInfo {
+ public:
+ enum Type { NONE, SESSION, CONNECTION, OTHER };
+
+ ExInfo(string ex) : type(NONE), code(0), exchange(ex) {}
+
+ void store(Type type_, const char* what_, int code_, const boost::shared_ptr<Queue>& queue) {
+ QPID_LOG(error, "Exchange " << exchange << " cannot deliver to queue "
+ << queue->getName() << ": " << what_);
+ if (type < type_) { // Replace less severe error
+ type = type_;
+ what = what_;
+ code = code_;
+ }
+ }
+
+ void raise() {
+ switch (type) {
+ case NONE: break;
+ case SESSION: throw qpid::SessionException(qpid::framing::execution::ErrorCode(code), what);
+ case CONNECTION: throw qpid::ConnectionException(qpid::framing::connection::CloseCode(code), what);
+ case OTHER: throw std::runtime_error(what);
+ }
+ }
+
+ private:
+ Type type;
+ string what;
+ int code;
+ string exchange;
+};
+}
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
int count = 0;
@@ -80,11 +119,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
msg.getMessage().blockContentRelease();
}
+
+ ExInfo error(getName()); // Save errors to throw at the end.
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ try {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ catch (const SessionException& e) {
+ error.store(ExInfo::SESSION, e.what(), int(e.code), (*i)->queue);
+ }
+ catch (const ConnectionException& e) {
+ error.store(ExInfo::CONNECTION, e.what(), int(e.code), (*i)->queue);
+ }
+ catch (const std::exception& e) {
+ error.store(ExInfo::OTHER, e.what(), 0, (*i)->queue);
+ }
}
+ error.raise();
}
if (mgmtExchange != 0)
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 4abe4c2cbe..a19dd305e5 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -484,18 +484,24 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
- def assert_browse(self, session, queue, expect_contents, timeout=0):
+ def browse(self, session, queue, timeout=0):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
-
r = session.receiver("%s;{mode:browse}"%(queue))
- actual_contents = []
try:
- for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
- while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
- except messaging.Empty: pass
- r.close()
+ contents = []
+ try:
+ while True: contents.append(r.fetch(timeout=timeout).content)
+ except messaging.Empty: pass
+ finally: pass #FIXME aconway 2011-04-14: r.close()
+ return contents
+
+ def assert_browse(self, session, queue, expect_contents, timeout=0):
+ """Assert that the contents of messages on queue (as retrieved
+ using session and timeout) exactly match the strings in
+ expect_contents"""
+ actual_contents = self.browse(session, queue, timeout)
self.assertEqual(expect_contents, actual_contents)
def join(thread, timeout=10):
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 42f3ae3d25..73c20d451d 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -449,6 +449,36 @@ acl allow all all
cluster.start()
verify(cluster[1])
+ def test_binding_order(self):
+ """Regression test for binding order inconsistency in cluster"""
+ cluster = self.cluster(1)
+ c0 = cluster[0].connect()
+ s0 = c0.session()
+ # Declare multiple queues bound to same key on amq.topic
+ def declare(q,max=0):
+ if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d}}'%max
+ else: declare = 'x-declare:{}'
+ bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
+ s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
+ declare('d',max=4) # Only one with a limit
+ for q in ['c', 'b','a']: declare(q)
+ # Add a cluster member, send enough messages to exceed the max count
+ cluster.start()
+ try:
+ s = s0.sender('amq.topic/key')
+ for m in xrange(1,6): s.send(Message(str(m)))
+ self.fail("Expected capacity exceeded exception")
+ except messaging.exceptions.TargetCapacityExceeded: pass
+ c1 = cluster[1].connect()
+ s1 = c1.session()
+ s0 = c0.session() # Old session s0 is broken by exception.
+ # Verify queue contents are consistent.
+ for q in ['a','b','c','d']:
+ self.assertEqual(self.browse(s0, q), self.browse(s1, q))
+ # Verify queue contents are "best effort"
+ for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
+ self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):