diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/Exception.h | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 29 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 7 |
4 files changed, 62 insertions, 22 deletions
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index 00365018ba..ff62817719 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -63,16 +63,29 @@ class Exception : public std::exception const std::string whatStr; }; -struct ChannelException : public Exception { +/** + * I have made SessionException a common base for Channel- and + * Connection- Exceptions. This is not strictly correct but allows all + * model layer exceptions to be handled as SessionExceptions which is + * how they are classified in the final 0-10 specification. I.e. this + * is a temporary hack to allow the preview and final codepaths to + * co-exist with minimal changes. + */ + +struct SessionException : public Exception { const framing::ReplyCode code; - ChannelException(framing::ReplyCode code_, const std::string& message) + SessionException(framing::ReplyCode code_, const std::string& message) : Exception(message), code(code_) {} }; -struct ConnectionException : public Exception { - const framing::ReplyCode code; - ConnectionException(framing::ReplyCode code_, const std::string& message) - : Exception(message), code(code_) {} +struct ChannelException : public SessionException { + ChannelException(framing::ReplyCode code, const std::string& message) + : SessionException(code, message) {} +}; + +struct ConnectionException : public SessionException { + ConnectionException(framing::ReplyCode code, const std::string& message) + : SessionException(code, message) {} }; struct ClosedException : public Exception { diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 663565c26c..990727dda5 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -114,7 +114,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -135,7 +135,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); @@ -181,7 +181,7 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = state.getQueue(name); + Queue::shared_ptr queue = getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return Queue010QueryResult(queue->getName(), @@ -204,7 +204,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = state.getQueue(name); + queue = getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = @@ -245,12 +245,12 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ - state.getQueue(queue)->purge(); + getQueue(queue)->purge(); } void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = state.getQueue(queue); + Queue::shared_ptr q = getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -269,7 +269,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : - HandlerImpl(s), + HandlerHelper(s), releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)), acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2)) @@ -301,7 +301,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, uint64_t /*resumeTtl*/, const FieldTable& arguments) { - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); @@ -404,6 +404,19 @@ void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/, //TODO } + +Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { + Queue::shared_ptr queue; + if (name.empty()) { + throw SessionException(531, QPID_MSG("No queue name specified.")); + } else { + queue = session.getBroker().getQueues().find(name); + if (!queue) + throw NotFoundException(QPID_MSG("Queue not found: "<<name)); + } + return queue; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index 23cc1beb93..e83d58bc2f 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -77,12 +77,21 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); } private: + //common base for utility methods etc that are specific to this adapter + struct HandlerHelper : public HandlerImpl + { + HandlerHelper(SemanticState& s) : HandlerImpl(s) {} + + Queue::shared_ptr getQueue(const string& name) const; + }; + + class ExchangeHandlerImpl : public Exchange010Handler, - public HandlerImpl + public HandlerHelper { public: - ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + ExchangeHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void declare(const std::string& exchange, const std::string& type, const std::string& alternateExchange, @@ -109,10 +118,10 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations class QueueHandlerImpl : public Queue010Handler, - public HandlerImpl + public HandlerHelper { public: - QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + QueueHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void declare(const std::string& queue, const std::string& alternateExchange, @@ -127,7 +136,7 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations class MessageHandlerImpl : public Message010Handler, - public HandlerImpl + public HandlerHelper { typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; RangedOperation releaseOp; @@ -175,10 +184,10 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations }; - class ExecutionHandlerImpl : public Execution010Handler, public HandlerImpl + class ExecutionHandlerImpl : public Execution010Handler, public HandlerHelper { public: - ExecutionHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void sync(); void result(uint32_t commandId, const string& value); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 7d0d0dacfa..e2f18bc927 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -223,8 +223,13 @@ void SessionState::handle(AMQFrame& frame) } else { handleContent(frame, commandId); } - } catch(const ChannelException& e) { + } catch(const SessionException& e) { //TODO: better implementation of new exception handling mechanism + + //0-10 final changes the types of exceptions, 'model layer' + //exceptions will all be session exceptions regardless of + //current channel/connection classification + AMQMethodBody* m = frame.getMethod(); if (m) { getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); |