diff options
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 4 | ||||
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/message.py | 16 |
4 files changed, 22 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index cfc379f47c..a1f206e25d 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -116,7 +116,8 @@ void SemanticState::consume(const string& tag, consumers[tag] = c; } -void SemanticState::cancel(const string& tag){ +bool SemanticState::cancel(const string& tag) +{ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { cancel(i->second); @@ -124,7 +125,9 @@ void SemanticState::cancel(const string& tag){ //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); - + return true; + } else { + return false; } } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index b2e648410a..8c69d6b89b 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -205,7 +205,7 @@ class SemanticState : private boost::noncopyable { const std::string& resumeId=std::string(), uint64_t resumeTtl=0, const framing::FieldTable& = framing::FieldTable()); - void cancel(const std::string& tag); + bool cancel(const std::string& tag); void setWindowMode(const std::string& destination); void setCreditMode(const std::string& destination); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index b95bb196fd..68aa26b270 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -431,7 +431,9 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, void SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) { - state.cancel(destination); + if (!state.cancel(destination)) { + throw NotFoundException(QPID_MSG("No such subscription: " << destination)); + } ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py index e80333a1e6..b46c446833 100644 --- a/tests/src/py/qpid_tests/broker_0_10/message.py +++ b/tests/src/py/qpid_tests/broker_0_10/message.py @@ -245,9 +245,19 @@ class MessageTests(TestBase010): self.fail("Got message after cancellation: " + msg) except Empty: None - #cancellation of non-existant consumers should be handled without error - session.message_cancel(destination="my-consumer") - session.message_cancel(destination="this-never-existed") + #cancellation of non-existant consumers should be result in 404s + try: + session.message_cancel(destination="my-consumer") + self.fail("Expected 404 for recancellation of subscription.") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + session = self.conn.session("alternate-session", timeout=10) + try: + session.message_cancel(destination="this-never-existed") + self.fail("Expected 404 for cancellation of unknown subscription.") + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) def test_ack(self): |