diff options
-rw-r--r-- | cpp/src/qpid/Exception.h | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 29 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 7 | ||||
-rw-r--r-- | python/cpp_failing_0-10.txt | 5 | ||||
-rw-r--r-- | python/tests_0-10/queue.py | 71 |
6 files changed, 99 insertions, 61 deletions
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index 00365018ba..ff62817719 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -63,16 +63,29 @@ class Exception : public std::exception const std::string whatStr; }; -struct ChannelException : public Exception { +/** + * I have made SessionException a common base for Channel- and + * Connection- Exceptions. This is not strictly correct but allows all + * model layer exceptions to be handled as SessionExceptions which is + * how they are classified in the final 0-10 specification. I.e. this + * is a temporary hack to allow the preview and final codepaths to + * co-exist with minimal changes. + */ + +struct SessionException : public Exception { const framing::ReplyCode code; - ChannelException(framing::ReplyCode code_, const std::string& message) + SessionException(framing::ReplyCode code_, const std::string& message) : Exception(message), code(code_) {} }; -struct ConnectionException : public Exception { - const framing::ReplyCode code; - ConnectionException(framing::ReplyCode code_, const std::string& message) - : Exception(message), code(code_) {} +struct ChannelException : public SessionException { + ChannelException(framing::ReplyCode code, const std::string& message) + : SessionException(code, message) {} +}; + +struct ConnectionException : public SessionException { + ConnectionException(framing::ReplyCode code, const std::string& message) + : SessionException(code, message) {} }; struct ClosedException : public Exception { diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 663565c26c..990727dda5 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -114,7 +114,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -135,7 +135,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); @@ -181,7 +181,7 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = state.getQueue(name); + Queue::shared_ptr queue = getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return Queue010QueryResult(queue->getName(), @@ -204,7 +204,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = state.getQueue(name); + queue = getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = @@ -245,12 +245,12 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ - state.getQueue(queue)->purge(); + getQueue(queue)->purge(); } void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = state.getQueue(queue); + Queue::shared_ptr q = getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -269,7 +269,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : - HandlerImpl(s), + HandlerHelper(s), releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)), acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2)) @@ -301,7 +301,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, uint64_t /*resumeTtl*/, const FieldTable& arguments) { - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); @@ -404,6 +404,19 @@ void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/, //TODO } + +Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { + Queue::shared_ptr queue; + if (name.empty()) { + throw SessionException(531, QPID_MSG("No queue name specified.")); + } else { + queue = session.getBroker().getQueues().find(name); + if (!queue) + throw NotFoundException(QPID_MSG("Queue not found: "<<name)); + } + return queue; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index 23cc1beb93..e83d58bc2f 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -77,12 +77,21 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); } private: + //common base for utility methods etc that are specific to this adapter + struct HandlerHelper : public HandlerImpl + { + HandlerHelper(SemanticState& s) : HandlerImpl(s) {} + + Queue::shared_ptr getQueue(const string& name) const; + }; + + class ExchangeHandlerImpl : public Exchange010Handler, - public HandlerImpl + public HandlerHelper { public: - ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + ExchangeHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void declare(const std::string& exchange, const std::string& type, const std::string& alternateExchange, @@ -109,10 +118,10 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations class QueueHandlerImpl : public Queue010Handler, - public HandlerImpl + public HandlerHelper { public: - QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + QueueHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void declare(const std::string& queue, const std::string& alternateExchange, @@ -127,7 +136,7 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations class MessageHandlerImpl : public Message010Handler, - public HandlerImpl + public HandlerHelper { typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; RangedOperation releaseOp; @@ -175,10 +184,10 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations }; - class ExecutionHandlerImpl : public Execution010Handler, public HandlerImpl + class ExecutionHandlerImpl : public Execution010Handler, public HandlerHelper { public: - ExecutionHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void sync(); void result(uint32_t commandId, const string& value); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 7d0d0dacfa..e2f18bc927 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -223,8 +223,13 @@ void SessionState::handle(AMQFrame& frame) } else { handleContent(frame, commandId); } - } catch(const ChannelException& e) { + } catch(const SessionException& e) { //TODO: better implementation of new exception handling mechanism + + //0-10 final changes the types of exceptions, 'model layer' + //exceptions will all be session exceptions regardless of + //current channel/connection classification + AMQMethodBody* m = frame.getMethod(); if (m) { getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 3b2c560d8a..8930f6e05b 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -54,10 +54,5 @@ tests_0-10.testlib.TestBaseTest.testAssertEmptyPass tests_0-10.testlib.TestBaseTest.testMessageProperties tests_0-10.queue.QueueTests.test_autodelete_shared tests_0-10.queue.QueueTests.test_declare_exclusive -tests_0-10.queue.QueueTests.test_declare_passive -tests_0-10.queue.QueueTests.test_delete_ifempty -tests_0-10.queue.QueueTests.test_delete_ifunused -tests_0-10.queue.QueueTests.test_delete_simple tests_0-10.queue.QueueTests.test_bind tests_0-10.queue.QueueTests.test_unbind_headers -tests_0-10.queue.QueueTests.test_purge_empty_name
\ No newline at end of file diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index 97e17048d3..dba732d415 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -99,7 +99,7 @@ class QueueTests(TestBase010): c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) self.fail("Expected second exclusive queue_declare to raise a channel exception") except Closed, e: - self.assertChannelException(405, e.args[0]) + self.assertEquals(405, e.args[0].error_code) def test_declare_passive(self): @@ -114,8 +114,8 @@ class QueueTests(TestBase010): #other connection should not be allowed to declare this: session.queue_declare(queue="passive-queue-2", passive=True) self.fail("Expected passive declaration of non-existant queue to raise a channel exception") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) #not-found def test_bind(self): @@ -136,7 +136,7 @@ class QueueTests(TestBase010): session.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") self.fail("Expected bind to non-existant exchange to fail") except Closed, e: - self.assertChannelException(404, e.args[0]) + self.assertEquals(404, e.args[0].error_code) #need to reopen a session: session = self.client.session(2) @@ -147,7 +147,7 @@ class QueueTests(TestBase010): session.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") self.fail("Expected bind of non-existant queue to fail") except Closed, e: - self.assertChannelException(404, e.args[0]) + self.assertEquals(404, e.args[0].error_code) def test_unbind_direct(self): self.unbind_test(exchange="amq.direct", routing_key="key") @@ -222,25 +222,28 @@ class QueueTests(TestBase010): #straight-forward case: session.queue_declare(queue="delete-me") - session.message_transfer(message=Message("a", session.delivery_properties(routing_key="delete-me"))) - session.message_transfer(message=Message("b", session.delivery_properties(routing_key="delete-me"))) - session.message_transfer(message=Message("c", session.delivery_properties(routing_key="delete-me"))) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c")) session.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: session.queue_declare(queue="delete-me", passive=True) self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + def test_delete_queue_exists(self): + """ + Test core queue deletion behaviour + """ #check attempted deletion of non-existant queue is handled correctly: - session = self.client.session(2) - session.session_open() + session = self.session try: session.queue_delete(queue="i-dont-exist", if_empty=True) self.fail("Expected delete of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) @@ -253,21 +256,22 @@ class QueueTests(TestBase010): #create a queue and add a message to it (use default binding): session.queue_declare(queue="delete-me-2") session.queue_declare(queue="delete-me-2", passive=True) - session.message_transfer(message=Message("message", session.delivery_properties(routing_key="delete-me-2"))) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message")) #try to delete, but only if empty: try: session.queue_delete(queue="delete-me-2", if_empty=True) self.fail("Expected delete if_empty to fail for non-empty queue") - except Closed, e: - self.assertChannelException(406, e.args[0]) + except SessionException, e: + self.assertEquals(406, e.args[0].error_code) - #need new channel now: - session = self.client.session(2) - session.session_open() + #need new session now: + session = self.conn.session("replacement", 2) #empty queue: - self.subscribe(session, destination="consumer_tag", queue="delete-me-2") + session.message_subscribe(session, destination="consumer_tag", queue="delete-me-2") + session.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) queue = session.incoming("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.body) @@ -280,30 +284,29 @@ class QueueTests(TestBase010): try: session.queue_declare(queue="delete-me-2", passive=True) self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) def test_delete_ifunused(self): """ Test that if_unused field of queue_delete is honoured """ - session = self.channel + session = self.session #create a queue and register a consumer: session.queue_declare(queue="delete-me-3") session.queue_declare(queue="delete-me-3", passive=True) - self.subscribe(destination="consumer_tag", queue="delete-me-3") + session.message_subscribe(destination="consumer_tag", queue="delete-me-3") #need new session now: - session2 = self.client.session(2) - session2.session_open() + session2 = self.conn.session("replacement", 2) + #try to delete, but only if empty: try: session2.queue_delete(queue="delete-me-3", if_unused=True) self.fail("Expected delete if_unused to fail for queue with existing consumer") - except Closed, e: - self.assertChannelException(406, e.args[0]) - + except SessionException, e: + self.assertEquals(406, e.args[0].error_code) session.message_cancel(destination="consumer_tag") session.queue_delete(queue="delete-me-3", if_unused=True) @@ -311,8 +314,8 @@ class QueueTests(TestBase010): try: session.queue_declare(queue="delete-me-3", passive=True) self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) def test_autodelete_shared(self): @@ -345,7 +348,7 @@ class QueueTests(TestBase010): try: session.queue_declare(queue="auto-delete-me", passive=True) self.fail("Expected queue to have been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) |