summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/include/qpid/sys/ExceptionHolder.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp60
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp30
-rw-r--r--qpid/cpp/src/tests/brokertest.py20
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py30
5 files changed, 122 insertions, 31 deletions
diff --git a/qpid/cpp/include/qpid/sys/ExceptionHolder.h b/qpid/cpp/include/qpid/sys/ExceptionHolder.h
index 9eff1d64c7..4bc934cf75 100644
--- a/qpid/cpp/include/qpid/sys/ExceptionHolder.h
+++ b/qpid/cpp/include/qpid/sys/ExceptionHolder.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,14 +42,11 @@ class ExceptionHolder : public Raisable {
public:
ExceptionHolder() {}
// Use default copy & assign.
-
+
/** Take ownership of ex */
template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); }
- template <class Ex> ExceptionHolder(const boost::shared_ptr<Ex>& ex) { wrap(ex.release()); }
-
template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; }
- template <class Ex> ExceptionHolder& operator=(boost::shared_ptr<Ex> ex) { wrap(ex.release()); return *this; }
-
+
void raise() const { if (wrapper.get()) wrapper->raise() ; }
std::string what() const { return wrapper.get() ? wrapper->what() : std::string(); }
bool empty() const { return !wrapper.get(); }
@@ -66,7 +63,7 @@ class ExceptionHolder : public Raisable {
template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); }
boost::shared_ptr<Raisable> wrapper;
};
-
+
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 2c7589e4b6..3f14d61144 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -19,16 +19,18 @@
*
*/
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/FedOps.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
-#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/ExceptionHolder.h"
+#include <stdexcept>
using namespace qpid::broker;
using namespace qpid::framing;
@@ -70,6 +72,36 @@ Exchange::PreRoute::~PreRoute(){
}
}
+namespace {
+/** Store information about an exception to be thrown later.
+ * If multiple exceptions are stored, save the first of the "most severe"
+ * exceptions, SESSION is les sever than CONNECTION etc.
+ */
+class ExInfo {
+ public:
+ enum Type { NONE, SESSION, CONNECTION, OTHER };
+
+ ExInfo(string exchange) : type(NONE), exchange(exchange) {}
+ void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) {
+ QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue "
+ << queue->getName() << ": " << exception_.what());
+ if (type < type_) { // Replace less severe exception
+ type = type_;
+ exception = exception_;
+ }
+ }
+
+ void raise() {
+ exception.raise();
+ }
+
+ private:
+ Type type;
+ string exchange;
+ qpid::sys::ExceptionHolder exception;
+};
+}
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
int count = 0;
@@ -80,11 +112,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
msg.getMessage().blockContentRelease();
}
+
+ ExInfo error(getName()); // Save exception to throw at the end.
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ try {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ catch (const SessionException& e) {
+ error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue);
+ }
+ catch (const ConnectionException& e) {
+ error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue);
+ }
+ catch (const std::exception& e) {
+ error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue);
+ }
}
+ error.raise();
}
if (mgmtExchange != 0)
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 2059727e7b..34e4592a15 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -303,11 +303,11 @@ QPID_AUTO_TEST_CASE(testSeek){
QueuedMessage qm;
queue->dispatch(consumer);
-
+
BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
queue->dispatch(consumer);
queue->dispatch(consumer); // make sure over-run is safe
-
+
}
QPID_AUTO_TEST_CASE(testSearch){
@@ -325,15 +325,15 @@ QPID_AUTO_TEST_CASE(testSearch){
SequenceNumber seq(2);
QueuedMessage qm = queue->find(seq);
-
+
BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
+
queue->acquire(qm);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
SequenceNumber seq1(3);
QueuedMessage qm1 = queue->find(seq1);
BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
+
}
const std::string nullxid = "";
@@ -875,28 +875,40 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg02(msg02);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ }
msg02->tryReleaseContent();
BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg03(msg03);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ }
msg03->tryReleaseContent();
BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg04(msg04);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ }
msg04->tryReleaseContent();
BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg05(msg05);
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ {
+ ScopedSuppressLogging sl; // suppress expected error messages.
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ }
msg05->tryReleaseContent();
BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 4abe4c2cbe..a19dd305e5 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -484,18 +484,24 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
- def assert_browse(self, session, queue, expect_contents, timeout=0):
+ def browse(self, session, queue, timeout=0):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
-
r = session.receiver("%s;{mode:browse}"%(queue))
- actual_contents = []
try:
- for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
- while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
- except messaging.Empty: pass
- r.close()
+ contents = []
+ try:
+ while True: contents.append(r.fetch(timeout=timeout).content)
+ except messaging.Empty: pass
+ finally: pass #FIXME aconway 2011-04-14: r.close()
+ return contents
+
+ def assert_browse(self, session, queue, expect_contents, timeout=0):
+ """Assert that the contents of messages on queue (as retrieved
+ using session and timeout) exactly match the strings in
+ expect_contents"""
+ actual_contents = self.browse(session, queue, timeout)
self.assertEqual(expect_contents, actual_contents)
def join(thread, timeout=10):
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 42f3ae3d25..73c20d451d 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -449,6 +449,36 @@ acl allow all all
cluster.start()
verify(cluster[1])
+ def test_binding_order(self):
+ """Regression test for binding order inconsistency in cluster"""
+ cluster = self.cluster(1)
+ c0 = cluster[0].connect()
+ s0 = c0.session()
+ # Declare multiple queues bound to same key on amq.topic
+ def declare(q,max=0):
+ if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d}}'%max
+ else: declare = 'x-declare:{}'
+ bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
+ s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
+ declare('d',max=4) # Only one with a limit
+ for q in ['c', 'b','a']: declare(q)
+ # Add a cluster member, send enough messages to exceed the max count
+ cluster.start()
+ try:
+ s = s0.sender('amq.topic/key')
+ for m in xrange(1,6): s.send(Message(str(m)))
+ self.fail("Expected capacity exceeded exception")
+ except messaging.exceptions.TargetCapacityExceeded: pass
+ c1 = cluster[1].connect()
+ s1 = c1.session()
+ s0 = c0.session() # Old session s0 is broken by exception.
+ # Verify queue contents are consistent.
+ for q in ['a','b','c','d']:
+ self.assertEqual(self.browse(s0, q), self.browse(s1, q))
+ # Verify queue contents are "best effort"
+ for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
+ self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):