diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 8 |
4 files changed, 30 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 0b1e744e25..8310980800 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -26,6 +26,7 @@ #include "qpid/framing/constants.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" +#include "qpid/framing/all_method_bodies.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -59,24 +60,32 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (!ignoring) { - if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { - return; - } else if (session.get()) { - session->handle(f); - } else { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); - } + if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { + return; + } else if (session.get()) { + session->handle(f); + } else if (!ignoring) { + throw ConnectionException(501, QPID_MSG("Channel " << channel.get() << " is not attached")); } }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); + }catch(const SessionException& e){ + //execution.exception will have been sent already + ignoring = true; + //peerSession.requestTimeout(0); + session->setTimeout(0); + peerSession.detach(name); + localSuspend(); }catch(const std::exception& e){ connection.close( framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m)); } } +bool SessionHandler::isValid(AMQMethodBody* m) { + return session.get() || m->isA<SessionAttachBody>(); +} + void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) @@ -112,12 +121,14 @@ ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } //new methods: -void SessionHandler::attach(const std::string& name, bool /*force*/) +void SessionHandler::attach(const std::string& _name, bool /*force*/) { //TODO: need to revise session manager to support resume as well assertClosed("attach"); std::auto_ptr<SessionState> state( connection.broker.getSessionManager().open(*this, 0)); + name = _name;//TODO: this should be used in conjunction with + //userid for connection as sessions identity session.reset(state.release()); peerSession.attached(name); peerSession.commandPoint(session->nextOut, 0); diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 4b031f2951..4010ce15d2 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -100,12 +100,15 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, void assertActive(const char* method) const; void assertClosed(const char* method) const; + bool isValid(framing::AMQMethodBody*); + Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; framing::AMQP_ClientProxy::Session010 peerSession; bool ignoring; std::auto_ptr<SessionState> session; + std::string name;//TODO: this should be part of the session state and replace the id }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 3c6bed4344..b96d7b5e3f 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -275,11 +275,7 @@ void SessionState::handle(AMQFrame& frame) } else { getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - timeout = 0; - //The python client doesn't currently detach on receiving an exception - //so the session state isn't destroyed. This is a temporary workaround - //until that is addressed - adapter.destroyExclusiveQueues(); + throw e; } } diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 4e3f6cdd98..190141c411 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -426,9 +426,12 @@ void SessionImpl::attached(const std::string& _name) setState(ATTACHED); } -void SessionImpl::detach(const std::string& /*name*/) +void SessionImpl::detach(const std::string& _name) { - throw NotImplementedException("Client does not support detach"); + Lock l(state); + if (name != _name) throw InternalErrorException("Incorrect session name"); + setState(DETACHED); + QPID_LOG(info, "Session detached by peer: " << name); } void SessionImpl::detached(const std::string& _name, uint8_t _code) @@ -561,7 +564,6 @@ void SessionImpl::exception(uint16_t errorCode, //should we wait for the timeout response? detachedLifetime = 0; } - detach(); } |