summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/Exception.h25
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp29
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h23
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp7
4 files changed, 62 insertions, 22 deletions
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index 00365018ba..ff62817719 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -63,16 +63,29 @@ class Exception : public std::exception
const std::string whatStr;
};
-struct ChannelException : public Exception {
+/**
+ * I have made SessionException a common base for Channel- and
+ * Connection- Exceptions. This is not strictly correct but allows all
+ * model layer exceptions to be handled as SessionExceptions which is
+ * how they are classified in the final 0-10 specification. I.e. this
+ * is a temporary hack to allow the preview and final codepaths to
+ * co-exist with minimal changes.
+ */
+
+struct SessionException : public Exception {
const framing::ReplyCode code;
- ChannelException(framing::ReplyCode code_, const std::string& message)
+ SessionException(framing::ReplyCode code_, const std::string& message)
: Exception(message), code(code_) {}
};
-struct ConnectionException : public Exception {
- const framing::ReplyCode code;
- ConnectionException(framing::ReplyCode code_, const std::string& message)
- : Exception(message), code(code_) {}
+struct ChannelException : public SessionException {
+ ChannelException(framing::ReplyCode code, const std::string& message)
+ : SessionException(code, message) {}
+};
+
+struct ConnectionException : public SessionException {
+ ConnectionException(framing::ReplyCode code, const std::string& message)
+ : SessionException(code, message) {}
};
struct ClosedException : public Exception {
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 663565c26c..990727dda5 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -114,7 +114,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -135,7 +135,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
@@ -181,7 +181,7 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str
Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = state.getQueue(name);
+ Queue::shared_ptr queue = getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return Queue010QueryResult(queue->getName(),
@@ -204,7 +204,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = state.getQueue(name);
+ queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
@@ -245,12 +245,12 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
- state.getQueue(queue)->purge();
+ getQueue(queue)->purge();
}
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = state.getQueue(queue);
+ Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -269,7 +269,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
- HandlerImpl(s),
+ HandlerHelper(s),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
@@ -301,7 +301,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
uint64_t /*resumeTtl*/,
const FieldTable& arguments)
{
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
if(!destination.empty() && state.exists(destination))
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
@@ -404,6 +404,19 @@ void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
//TODO
}
+
+Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const {
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ throw SessionException(531, QPID_MSG("No queue name specified."));
+ } else {
+ queue = session.getBroker().getQueues().find(name);
+ if (!queue)
+ throw NotFoundException(QPID_MSG("Queue not found: "<<name));
+ }
+ return queue;
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index 23cc1beb93..e83d58bc2f 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -77,12 +77,21 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
private:
+ //common base for utility methods etc that are specific to this adapter
+ struct HandlerHelper : public HandlerImpl
+ {
+ HandlerHelper(SemanticState& s) : HandlerImpl(s) {}
+
+ Queue::shared_ptr getQueue(const string& name) const;
+ };
+
+
class ExchangeHandlerImpl :
public Exchange010Handler,
- public HandlerImpl
+ public HandlerHelper
{
public:
- ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+ ExchangeHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void declare(const std::string& exchange, const std::string& type,
const std::string& alternateExchange,
@@ -109,10 +118,10 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
class QueueHandlerImpl :
public Queue010Handler,
- public HandlerImpl
+ public HandlerHelper
{
public:
- QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+ QueueHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void declare(const std::string& queue,
const std::string& alternateExchange,
@@ -127,7 +136,7 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
class MessageHandlerImpl :
public Message010Handler,
- public HandlerImpl
+ public HandlerHelper
{
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
RangedOperation releaseOp;
@@ -175,10 +184,10 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
};
- class ExecutionHandlerImpl : public Execution010Handler, public HandlerImpl
+ class ExecutionHandlerImpl : public Execution010Handler, public HandlerHelper
{
public:
- ExecutionHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+ ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void sync();
void result(uint32_t commandId, const string& value);
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 7d0d0dacfa..e2f18bc927 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -223,8 +223,13 @@ void SessionState::handle(AMQFrame& frame)
} else {
handleContent(frame, commandId);
}
- } catch(const ChannelException& e) {
+ } catch(const SessionException& e) {
//TODO: better implementation of new exception handling mechanism
+
+ //0-10 final changes the types of exceptions, 'model layer'
+ //exceptions will all be session exceptions regardless of
+ //current channel/connection classification
+
AMQMethodBody* m = frame.getMethod();
if (m) {
getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());