summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/Exception.h25
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp29
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h23
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp7
-rw-r--r--python/cpp_failing_0-10.txt5
-rw-r--r--python/tests_0-10/queue.py71
6 files changed, 99 insertions, 61 deletions
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index 00365018ba..ff62817719 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -63,16 +63,29 @@ class Exception : public std::exception
const std::string whatStr;
};
-struct ChannelException : public Exception {
+/**
+ * I have made SessionException a common base for Channel- and
+ * Connection- Exceptions. This is not strictly correct but allows all
+ * model layer exceptions to be handled as SessionExceptions which is
+ * how they are classified in the final 0-10 specification. I.e. this
+ * is a temporary hack to allow the preview and final codepaths to
+ * co-exist with minimal changes.
+ */
+
+struct SessionException : public Exception {
const framing::ReplyCode code;
- ChannelException(framing::ReplyCode code_, const std::string& message)
+ SessionException(framing::ReplyCode code_, const std::string& message)
: Exception(message), code(code_) {}
};
-struct ConnectionException : public Exception {
- const framing::ReplyCode code;
- ConnectionException(framing::ReplyCode code_, const std::string& message)
- : Exception(message), code(code_) {}
+struct ChannelException : public SessionException {
+ ChannelException(framing::ReplyCode code, const std::string& message)
+ : SessionException(code, message) {}
+};
+
+struct ConnectionException : public SessionException {
+ ConnectionException(framing::ReplyCode code, const std::string& message)
+ : SessionException(code, message) {}
};
struct ClosedException : public Exception {
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 663565c26c..990727dda5 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -114,7 +114,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -135,7 +135,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
@@ -181,7 +181,7 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str
Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = state.getQueue(name);
+ Queue::shared_ptr queue = getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return Queue010QueryResult(queue->getName(),
@@ -204,7 +204,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = state.getQueue(name);
+ queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
@@ -245,12 +245,12 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
- state.getQueue(queue)->purge();
+ getQueue(queue)->purge();
}
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = state.getQueue(queue);
+ Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -269,7 +269,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
- HandlerImpl(s),
+ HandlerHelper(s),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
@@ -301,7 +301,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
uint64_t /*resumeTtl*/,
const FieldTable& arguments)
{
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
if(!destination.empty() && state.exists(destination))
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
@@ -404,6 +404,19 @@ void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
//TODO
}
+
+Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const {
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ throw SessionException(531, QPID_MSG("No queue name specified."));
+ } else {
+ queue = session.getBroker().getQueues().find(name);
+ if (!queue)
+ throw NotFoundException(QPID_MSG("Queue not found: "<<name));
+ }
+ return queue;
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index 23cc1beb93..e83d58bc2f 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -77,12 +77,21 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
private:
+ //common base for utility methods etc that are specific to this adapter
+ struct HandlerHelper : public HandlerImpl
+ {
+ HandlerHelper(SemanticState& s) : HandlerImpl(s) {}
+
+ Queue::shared_ptr getQueue(const string& name) const;
+ };
+
+
class ExchangeHandlerImpl :
public Exchange010Handler,
- public HandlerImpl
+ public HandlerHelper
{
public:
- ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+ ExchangeHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void declare(const std::string& exchange, const std::string& type,
const std::string& alternateExchange,
@@ -109,10 +118,10 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
class QueueHandlerImpl :
public Queue010Handler,
- public HandlerImpl
+ public HandlerHelper
{
public:
- QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+ QueueHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void declare(const std::string& queue,
const std::string& alternateExchange,
@@ -127,7 +136,7 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
class MessageHandlerImpl :
public Message010Handler,
- public HandlerImpl
+ public HandlerHelper
{
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
RangedOperation releaseOp;
@@ -175,10 +184,10 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
};
- class ExecutionHandlerImpl : public Execution010Handler, public HandlerImpl
+ class ExecutionHandlerImpl : public Execution010Handler, public HandlerHelper
{
public:
- ExecutionHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+ ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void sync();
void result(uint32_t commandId, const string& value);
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 7d0d0dacfa..e2f18bc927 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -223,8 +223,13 @@ void SessionState::handle(AMQFrame& frame)
} else {
handleContent(frame, commandId);
}
- } catch(const ChannelException& e) {
+ } catch(const SessionException& e) {
//TODO: better implementation of new exception handling mechanism
+
+ //0-10 final changes the types of exceptions, 'model layer'
+ //exceptions will all be session exceptions regardless of
+ //current channel/connection classification
+
AMQMethodBody* m = frame.getMethod();
if (m) {
getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 3b2c560d8a..8930f6e05b 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -54,10 +54,5 @@ tests_0-10.testlib.TestBaseTest.testAssertEmptyPass
tests_0-10.testlib.TestBaseTest.testMessageProperties
tests_0-10.queue.QueueTests.test_autodelete_shared
tests_0-10.queue.QueueTests.test_declare_exclusive
-tests_0-10.queue.QueueTests.test_declare_passive
-tests_0-10.queue.QueueTests.test_delete_ifempty
-tests_0-10.queue.QueueTests.test_delete_ifunused
-tests_0-10.queue.QueueTests.test_delete_simple
tests_0-10.queue.QueueTests.test_bind
tests_0-10.queue.QueueTests.test_unbind_headers
-tests_0-10.queue.QueueTests.test_purge_empty_name \ No newline at end of file
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index 97e17048d3..dba732d415 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -99,7 +99,7 @@ class QueueTests(TestBase010):
c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
self.fail("Expected second exclusive queue_declare to raise a channel exception")
except Closed, e:
- self.assertChannelException(405, e.args[0])
+ self.assertEquals(405, e.args[0].error_code)
def test_declare_passive(self):
@@ -114,8 +114,8 @@ class QueueTests(TestBase010):
#other connection should not be allowed to declare this:
session.queue_declare(queue="passive-queue-2", passive=True)
self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code) #not-found
def test_bind(self):
@@ -136,7 +136,7 @@ class QueueTests(TestBase010):
session.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
self.fail("Expected bind to non-existant exchange to fail")
except Closed, e:
- self.assertChannelException(404, e.args[0])
+ self.assertEquals(404, e.args[0].error_code)
#need to reopen a session:
session = self.client.session(2)
@@ -147,7 +147,7 @@ class QueueTests(TestBase010):
session.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
self.fail("Expected bind of non-existant queue to fail")
except Closed, e:
- self.assertChannelException(404, e.args[0])
+ self.assertEquals(404, e.args[0].error_code)
def test_unbind_direct(self):
self.unbind_test(exchange="amq.direct", routing_key="key")
@@ -222,25 +222,28 @@ class QueueTests(TestBase010):
#straight-forward case:
session.queue_declare(queue="delete-me")
- session.message_transfer(message=Message("a", session.delivery_properties(routing_key="delete-me")))
- session.message_transfer(message=Message("b", session.delivery_properties(routing_key="delete-me")))
- session.message_transfer(message=Message("c", session.delivery_properties(routing_key="delete-me")))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c"))
session.queue_delete(queue="delete-me")
#check that it has gone be declaring passively
try:
session.queue_declare(queue="delete-me", passive=True)
self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+ def test_delete_queue_exists(self):
+ """
+ Test core queue deletion behaviour
+ """
#check attempted deletion of non-existant queue is handled correctly:
- session = self.client.session(2)
- session.session_open()
+ session = self.session
try:
session.queue_delete(queue="i-dont-exist", if_empty=True)
self.fail("Expected delete of non-existant queue to fail")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
@@ -253,21 +256,22 @@ class QueueTests(TestBase010):
#create a queue and add a message to it (use default binding):
session.queue_declare(queue="delete-me-2")
session.queue_declare(queue="delete-me-2", passive=True)
- session.message_transfer(message=Message("message", session.delivery_properties(routing_key="delete-me-2")))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message"))
#try to delete, but only if empty:
try:
session.queue_delete(queue="delete-me-2", if_empty=True)
self.fail("Expected delete if_empty to fail for non-empty queue")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
+ except SessionException, e:
+ self.assertEquals(406, e.args[0].error_code)
- #need new channel now:
- session = self.client.session(2)
- session.session_open()
+ #need new session now:
+ session = self.conn.session("replacement", 2)
#empty queue:
- self.subscribe(session, destination="consumer_tag", queue="delete-me-2")
+ session.message_subscribe(session, destination="consumer_tag", queue="delete-me-2")
+ session.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
queue = session.incoming("consumer_tag")
msg = queue.get(timeout=1)
self.assertEqual("message", msg.body)
@@ -280,30 +284,29 @@ class QueueTests(TestBase010):
try:
session.queue_declare(queue="delete-me-2", passive=True)
self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
def test_delete_ifunused(self):
"""
Test that if_unused field of queue_delete is honoured
"""
- session = self.channel
+ session = self.session
#create a queue and register a consumer:
session.queue_declare(queue="delete-me-3")
session.queue_declare(queue="delete-me-3", passive=True)
- self.subscribe(destination="consumer_tag", queue="delete-me-3")
+ session.message_subscribe(destination="consumer_tag", queue="delete-me-3")
#need new session now:
- session2 = self.client.session(2)
- session2.session_open()
+ session2 = self.conn.session("replacement", 2)
+
#try to delete, but only if empty:
try:
session2.queue_delete(queue="delete-me-3", if_unused=True)
self.fail("Expected delete if_unused to fail for queue with existing consumer")
- except Closed, e:
- self.assertChannelException(406, e.args[0])
-
+ except SessionException, e:
+ self.assertEquals(406, e.args[0].error_code)
session.message_cancel(destination="consumer_tag")
session.queue_delete(queue="delete-me-3", if_unused=True)
@@ -311,8 +314,8 @@ class QueueTests(TestBase010):
try:
session.queue_declare(queue="delete-me-3", passive=True)
self.fail("Queue has not been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
def test_autodelete_shared(self):
@@ -345,7 +348,7 @@ class QueueTests(TestBase010):
try:
session.queue_declare(queue="auto-delete-me", passive=True)
self.fail("Expected queue to have been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)