diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 87 |
1 files changed, 45 insertions, 42 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index d5caf789c0..f5fa22060f 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -42,7 +42,8 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch) connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. - ignoring(false) {} + ignoring(false) +{} SessionHandler::~SessionHandler() {} @@ -52,33 +53,30 @@ MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace void SessionHandler::handleIn(AMQFrame& f) { - // Note on channel states: a channel is open if session != 0. A - // channel that is closed (session == 0) can be in the "ignoring" - // state. This is a temporary state after we have sent a channel - // exception, where extra frames might arrive that should be - // ignored. - // + // Note on channel states: a channel is attached if session != 0 AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { + if (ignoring && !(m && m->isA<SessionDetachedBody>())) { + return; + } + if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { + //frame was a valid session control and has been handled return; } else if (session.get()) { + //we are attached and frame was not a session control so it is for upper layers session->handle(f); - } else if (!ignoring) { - throw ConnectionException(501, QPID_MSG("Channel " << channel.get() << " is not attached")); + } else { + throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); } + }catch(const ChannelException& e){ + QPID_LOG(error, "Session detached due to: " << e.what()); + peerSession.detached(name, e.code); + handleDetach(); + connection.closeChannel(channel.get()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); - }catch(const SessionException& e){ - //execution.exception will have been sent already - ignoring = true; - //peerSession.requestTimeout(0); - session->setTimeout(0); - peerSession.detach(name); - localSuspend(); }catch(const std::exception& e){ - connection.close( - framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m)); + connection.close(501, e.what(), classId(m), methodId(m)); } } @@ -95,7 +93,7 @@ void SessionHandler::handleOut(AMQFrame& f) { void SessionHandler::assertAttached(const char* method) const { if (!session.get()) { std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; - throw ChannelErrorException( + throw NotAttachedException( QPID_MSG(method << " failed: No session for channel " << getChannel())); } @@ -103,33 +101,23 @@ void SessionHandler::assertAttached(const char* method) const { void SessionHandler::assertClosed(const char* method) const { if (session.get()) - throw ChannelBusyException( + throw SessionBusyException( QPID_MSG(method << " failed: channel " << channel.get() << " is already open.")); } -void SessionHandler::localSuspend() { - if (session.get() && session->isAttached()) { - session->detach(); - connection.broker.getSessionManager().suspend(session); - session.reset(); - } -} - - ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } //new methods: void SessionHandler::attach(const std::string& _name, bool /*force*/) { - //TODO: need to revise session manager to support resume as well - assertClosed("attach"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, 0)); name = _name;//TODO: this should be used in conjunction with //userid for connection as sessions identity - session.reset(state.release()); + + //TODO: need to revise session manager to support resume as well + assertClosed("attach"); + session.reset(new SessionState(0, this, 0, 0)); peerSession.attached(name); peerSession.commandPoint(session->nextOut, 0); } @@ -138,31 +126,46 @@ void SessionHandler::attached(const std::string& _name) { name = _name;//TODO: this should be used in conjunction with //userid for connection as sessions identity - std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0)); - session.reset(state.release()); + session.reset(new SessionState(0, this, 0, 0)); peerSession.commandPoint(session->nextOut, 0); } void SessionHandler::detach(const std::string& name) { assertAttached("detach"); - localSuspend(); - peerSession.detached(name, 0); + peerSession.detached(name, session::NORMAL); + handleDetach(); assert(&connection.getChannel(channel.get()) == this); connection.closeChannel(channel.get()); } void SessionHandler::detached(const std::string& name, uint8_t code) { - ignoring=false; - session->detach(); - session.reset(); + ignoring = false; + handleDetach(); if (code) { //no error } else { //error occured QPID_LOG(warning, "Received session.closed: "<< name << " " << code); } + connection.closeChannel(channel.get()); +} + +void SessionHandler::handleDetach() +{ + if (session.get()) { + session->detach(); + session.reset(); + } +} + +void SessionHandler::requestDetach() +{ + //TODO: request timeout when python can handle it + //peerSession.requestTimeout(0); + ignoring = true; + peerSession.detach(name); } void SessionHandler::requestTimeout(uint32_t t) |