diff options
Diffstat (limited to 'cpp/src/qpid/client/SessionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/SessionHandler.cpp | 121 |
1 files changed, 64 insertions, 57 deletions
diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp index 93e628ab34..d3b04e5356 100644 --- a/cpp/src/qpid/client/SessionHandler.cpp +++ b/cpp/src/qpid/client/SessionHandler.cpp @@ -22,31 +22,44 @@ #include "SessionHandler.h" #include "qpid/framing/amqp_framing.h" #include "qpid/framing/all_method_bodies.h" +#include "qpid/client/SessionCore.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" using namespace qpid::client; using namespace qpid::framing; using namespace boost; -SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {} +namespace { +// TODO aconway 2007-09-28: hack till we have multi-version support. +ProtocolVersion version; +} + +SessionHandler::SessionHandler(SessionCore& parent) + : StateManager(CLOSED), core(parent) {} + +SessionHandler::~SessionHandler() {} -void SessionHandler::incoming(AMQFrame& frame) +void SessionHandler::handle(AMQFrame& frame) { AMQBody* body = frame.getBody(); if (getState() == OPEN) { - SessionClosedBody* closeBody= + core.checkClosed(); + SessionClosedBody* closedBody= dynamic_cast<SessionClosedBody*>(body->getMethod()); - if (closeBody) { - setState(CLOSED_BY_PEER); - code = closeBody->getReplyCode(); - text = closeBody->getReplyText(); - if (onClose) { - onClose(closeBody->getReplyCode(), closeBody->getReplyText()); - } + if (closedBody) { + closed(); + core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); } else { try { - in(frame); - }catch(ChannelException& e){ - closed(e.code, e.toString()); + next->handle(frame); + } + catch(const ChannelException& e){ + QPID_LOG(error, "Channel exception:" << e.what()); + closed(); + AMQFrame f(0, SessionClosedBody(version, e.code, e.toString())); + core.out(f); + core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); } } } else { @@ -57,69 +70,63 @@ void SessionHandler::incoming(AMQFrame& frame) } } -void SessionHandler::outgoing(AMQFrame& frame) -{ - if (getState() == OPEN) { - frame.setChannel(id); - out(frame); - } else if (getState() == CLOSED) { - throw Exception(QPID_MSG("Channel not open, can't send " << frame)); - } else if (getState() == CLOSED_BY_PEER) { - throw ChannelException(code, text); - } -} - -void SessionHandler::open(uint16_t _id) +void SessionHandler::attach(const AMQMethodBody& command) { - id = _id; - setState(OPENING); - // FIXME aconway 2007-09-19: Need to get this from API. - AMQFrame f(id, SessionOpenBody(version, 0)); - out(f); - + AMQFrame f(0, command); + core.out(f); std::set<int> states; states.insert(OPEN); - states.insert(CLOSED_BY_PEER); + states.insert(CLOSED); waitFor(states); - if (getState() != OPEN) { - throw Exception("Failed to open channel."); - } + if (getState() != OPEN) + throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel())); +} + +void SessionHandler::open(uint32_t detachedLifetime) { + attach(SessionOpenBody(version, detachedLifetime)); } -void SessionHandler::close() +void SessionHandler::resume() { + attach(SessionResumeBody(version, core.getId())); +} + +void SessionHandler::detach(const AMQMethodBody& command) { setState(CLOSING); - AMQFrame f(id, SessionCloseBody(version)); - out(f); + AMQFrame f(0, command); + core.out(f); waitFor(CLOSED); } -void SessionHandler::closed(uint16_t code, const std::string& msg) -{ - setState(CLOSED); - AMQFrame f(id, SessionClosedBody(version, code, msg)); - out(f); -} +void SessionHandler::close() { detach(SessionCloseBody(version)); } +void SessionHandler::suspend() { detach(SessionSuspendBody(version)); } +void SessionHandler::closed() { setState(CLOSED); } void SessionHandler::handleMethod(AMQMethodBody* method) { switch (getState()) { - case OPENING: - if (method->isA<SessionAttachedBody>()) { - setState(OPEN); - } else { - throw ConnectionException(504, "Channel not opened."); - } - break; + case OPENING: { + SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method); + if (attached) { + core.setId(attached->getSessionId()); + setState(OPEN); + } else + throw ChannelErrorException(); + break; + } case CLOSING: - if (method->isA<SessionClosedBody>()) { - setState(CLOSED); - } //else just ignore it + if (method->isA<SessionClosedBody>() || + method->isA<SessionDetachedBody>()) + closed(); break; + case CLOSED: - throw ConnectionException(504, "Channel is closed."); + throw ChannelErrorException(); + default: - throw Exception("Unexpected state encountered in SessionHandler!"); + assert(0); + throw InternalErrorException(QPID_MSG("Internal Error.")); } } + |