diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 0b1e744e25..8310980800 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -26,6 +26,7 @@ #include "qpid/framing/constants.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" +#include "qpid/framing/all_method_bodies.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -59,24 +60,32 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (!ignoring) { - if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { - return; - } else if (session.get()) { - session->handle(f); - } else { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); - } + if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { + return; + } else if (session.get()) { + session->handle(f); + } else if (!ignoring) { + throw ConnectionException(501, QPID_MSG("Channel " << channel.get() << " is not attached")); } }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); + }catch(const SessionException& e){ + //execution.exception will have been sent already + ignoring = true; + //peerSession.requestTimeout(0); + session->setTimeout(0); + peerSession.detach(name); + localSuspend(); }catch(const std::exception& e){ connection.close( framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m)); } } +bool SessionHandler::isValid(AMQMethodBody* m) { + return session.get() || m->isA<SessionAttachBody>(); +} + void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) @@ -112,12 +121,14 @@ ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } //new methods: -void SessionHandler::attach(const std::string& name, bool /*force*/) +void SessionHandler::attach(const std::string& _name, bool /*force*/) { //TODO: need to revise session manager to support resume as well assertClosed("attach"); std::auto_ptr<SessionState> state( connection.broker.getSessionManager().open(*this, 0)); + name = _name;//TODO: this should be used in conjunction with + //userid for connection as sessions identity session.reset(state.release()); peerSession.attached(name); peerSession.commandPoint(session->nextOut, 0); |