summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SessionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/SessionHandler.cpp121
1 files changed, 64 insertions, 57 deletions
diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp
index 93e628ab34..d3b04e5356 100644
--- a/cpp/src/qpid/client/SessionHandler.cpp
+++ b/cpp/src/qpid/client/SessionHandler.cpp
@@ -22,31 +22,44 @@
#include "SessionHandler.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/all_method_bodies.h"
+#include "qpid/client/SessionCore.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
-SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {}
+namespace {
+// TODO aconway 2007-09-28: hack till we have multi-version support.
+ProtocolVersion version;
+}
+
+SessionHandler::SessionHandler(SessionCore& parent)
+ : StateManager(CLOSED), core(parent) {}
+
+SessionHandler::~SessionHandler() {}
-void SessionHandler::incoming(AMQFrame& frame)
+void SessionHandler::handle(AMQFrame& frame)
{
AMQBody* body = frame.getBody();
if (getState() == OPEN) {
- SessionClosedBody* closeBody=
+ core.checkClosed();
+ SessionClosedBody* closedBody=
dynamic_cast<SessionClosedBody*>(body->getMethod());
- if (closeBody) {
- setState(CLOSED_BY_PEER);
- code = closeBody->getReplyCode();
- text = closeBody->getReplyText();
- if (onClose) {
- onClose(closeBody->getReplyCode(), closeBody->getReplyText());
- }
+ if (closedBody) {
+ closed();
+ core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
} else {
try {
- in(frame);
- }catch(ChannelException& e){
- closed(e.code, e.toString());
+ next->handle(frame);
+ }
+ catch(const ChannelException& e){
+ QPID_LOG(error, "Channel exception:" << e.what());
+ closed();
+ AMQFrame f(0, SessionClosedBody(version, e.code, e.toString()));
+ core.out(f);
+ core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
}
}
} else {
@@ -57,69 +70,63 @@ void SessionHandler::incoming(AMQFrame& frame)
}
}
-void SessionHandler::outgoing(AMQFrame& frame)
-{
- if (getState() == OPEN) {
- frame.setChannel(id);
- out(frame);
- } else if (getState() == CLOSED) {
- throw Exception(QPID_MSG("Channel not open, can't send " << frame));
- } else if (getState() == CLOSED_BY_PEER) {
- throw ChannelException(code, text);
- }
-}
-
-void SessionHandler::open(uint16_t _id)
+void SessionHandler::attach(const AMQMethodBody& command)
{
- id = _id;
-
setState(OPENING);
- // FIXME aconway 2007-09-19: Need to get this from API.
- AMQFrame f(id, SessionOpenBody(version, 0));
- out(f);
-
+ AMQFrame f(0, command);
+ core.out(f);
std::set<int> states;
states.insert(OPEN);
- states.insert(CLOSED_BY_PEER);
+ states.insert(CLOSED);
waitFor(states);
- if (getState() != OPEN) {
- throw Exception("Failed to open channel.");
- }
+ if (getState() != OPEN)
+ throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel()));
+}
+
+void SessionHandler::open(uint32_t detachedLifetime) {
+ attach(SessionOpenBody(version, detachedLifetime));
}
-void SessionHandler::close()
+void SessionHandler::resume() {
+ attach(SessionResumeBody(version, core.getId()));
+}
+
+void SessionHandler::detach(const AMQMethodBody& command)
{
setState(CLOSING);
- AMQFrame f(id, SessionCloseBody(version));
- out(f);
+ AMQFrame f(0, command);
+ core.out(f);
waitFor(CLOSED);
}
-void SessionHandler::closed(uint16_t code, const std::string& msg)
-{
- setState(CLOSED);
- AMQFrame f(id, SessionClosedBody(version, code, msg));
- out(f);
-}
+void SessionHandler::close() { detach(SessionCloseBody(version)); }
+void SessionHandler::suspend() { detach(SessionSuspendBody(version)); }
+void SessionHandler::closed() { setState(CLOSED); }
void SessionHandler::handleMethod(AMQMethodBody* method)
{
switch (getState()) {
- case OPENING:
- if (method->isA<SessionAttachedBody>()) {
- setState(OPEN);
- } else {
- throw ConnectionException(504, "Channel not opened.");
- }
- break;
+ case OPENING: {
+ SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method);
+ if (attached) {
+ core.setId(attached->getSessionId());
+ setState(OPEN);
+ } else
+ throw ChannelErrorException();
+ break;
+ }
case CLOSING:
- if (method->isA<SessionClosedBody>()) {
- setState(CLOSED);
- } //else just ignore it
+ if (method->isA<SessionClosedBody>() ||
+ method->isA<SessionDetachedBody>())
+ closed();
break;
+
case CLOSED:
- throw ConnectionException(504, "Channel is closed.");
+ throw ChannelErrorException();
+
default:
- throw Exception("Unexpected state encountered in SessionHandler!");
+ assert(0);
+ throw InternalErrorException(QPID_MSG("Internal Error."));
}
}
+