diff options
Diffstat (limited to 'cpp/src/qpid/amqp_0_10/SessionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 121 |
1 files changed, 77 insertions, 44 deletions
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 35587940e5..5f97d292bc 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -19,10 +19,11 @@ */ -#include "SessionHandler.h" +#include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/SessionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/AllInvoker.h" +#include "qpid/framing/enum.h" #include "qpid/log/Statement.h" @@ -33,30 +34,35 @@ namespace amqp_0_10 { using namespace framing; using namespace std; +void SessionHandler::checkAttached() { + if (!getState()) + throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); +} + SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch) - : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {} + : channel(ch, out), peer(channel), + awaitingDetached(false), + sendReady(), receiveReady() {} SessionHandler::~SessionHandler() {} namespace { bool isSessionControl(AMQMethodBody* m) { - return m && - m->amqpClassId() == SESSION_CLASS_ID; + return m && m->amqpClassId() == SESSION_CLASS_ID; } -bool isSessionDetachedControl(AMQMethodBody* m) { - return isSessionControl(m) && - m->amqpMethodId() == SESSION_DETACHED_METHOD_ID; -} -} // namespace -void SessionHandler::checkAttached() { - if (!getState()) - throw NotAttachedException( - QPID_MSG("Channel " << channel.get() << " is not attached")); - assert(getInHandler()); - assert(channel.next); +session::DetachCode convert(uint8_t code) { + switch(code) { + case 0: return session::DETACH_CODE_NORMAL; + case 1: return session::DETACH_CODE_SESSION_BUSY; + case 2: return session::DETACH_CODE_TRANSPORT_BUSY; + case 3: return session::DETACH_CODE_NOT_ATTACHED; + case 4: default: return session::DETACH_CODE_UNKNOWN_IDS; + } } +} // namespace + void SessionHandler::invoke(const AMQMethodBody& m) { framing::invoke(*this, m); } @@ -65,12 +71,19 @@ void SessionHandler::handleIn(AMQFrame& f) { // Note on channel states: a channel is attached if session != 0 AMQMethodBody* m = f.getBody()->getMethod(); try { - if (ignoring && !isSessionDetachedControl(m)) - return; - else if (isSessionControl(m)) + // Ignore all but detach controls while awaiting detach + if (awaitingDetached) { + if (!isSessionControl(m)) return; + if (m->amqpMethodId() != SESSION_DETACH_METHOD_ID && + m->amqpMethodId() != SESSION_DETACHED_METHOD_ID) + return; + } + if (isSessionControl(m)) { invoke(*m); + } else { - checkAttached(); + // Drop frames if we are detached. + if (!getState()) return; if (!receiveReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); if (!getState()->receiverRecord(f)) @@ -80,11 +93,21 @@ void SessionHandler::handleIn(AMQFrame& f) { getInHandler()->handle(f); } } + catch(const SessionException& e) { + QPID_LOG(error, "Execution exception: " << e.what()); + executionException(e.code, e.what()); // Let subclass handle this first. + framing::AMQP_AllProxy::Execution execution(channel); + AMQMethodBody* m = f.getMethod(); + SequenceNumber commandId; + if (getState()) commandId = getState()->receiverGetCurrent(); + execution.exception(e.code, commandId, m ? m->amqpClassId() : 0, m ? m->amqpMethodId() : 0, 0, e.what(), FieldTable()); + detaching(); + sendDetach(); + } catch(const ChannelException& e){ QPID_LOG(error, "Channel exception: " << e.what()); - if (getState()) - peer.detached(getState()->getId().getName(), e.code); - channelException(e.code, e.getMessage()); + channelException(e.code, e.what()); // Let subclass handle this first. + peer.detached(name, e.code); } catch(const ConnectionException& e) { QPID_LOG(error, "Connection exception: " << e.what()); @@ -92,16 +115,16 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const std::exception& e) { QPID_LOG(error, "Unexpected exception: " << e.what()); - connectionException(connection::FRAMING_ERROR, e.what()); + connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what()); } } namespace { bool isControl(const AMQFrame& f) { - return f.getMethod() && f.getMethod()->type() == framing::CONTROL; + return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL; } bool isCommand(const AMQFrame& f) { - return f.getMethod() && f.getMethod()->type() == framing::COMMAND; + return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND; } } // namespace @@ -117,19 +140,15 @@ void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); } -void SessionHandler::checkName(const std::string& name) { - checkAttached(); - if (name != getState()->getId().getName()) - throw InvalidArgumentException( - QPID_MSG("Incorrect session name: " << name - << ", expecting: " << getState()->getId().getName())); -} - -void SessionHandler::attach(const std::string& name, bool force) { +void SessionHandler::attach(const std::string& name_, bool force) { + // Save the name for possible session-busy exception. Session-busy + // can be thrown before we have attached the handler to a valid + // SessionState, and in that case we need the name to send peer.detached + name = name_; if (getState() && name == getState()->getId().getName()) return; // Idempotent if (getState()) - throw SessionBusyException( + throw TransportBusyException( QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId())); setState(name, force); QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId()); @@ -140,21 +159,30 @@ void SessionHandler::attach(const std::string& name, bool force) { sendCommandPoint(getState()->senderGetCommandPoint()); } +#define CHECK_NAME(NAME, MSG) do { \ + checkAttached(); \ + if (NAME != getState()->getId().getName()) \ + throw InvalidArgumentException( \ + QPID_MSG(MSG << ": incorrect session name: " << NAME \ + << ", expecting: " << getState()->getId().getName())); \ + } while(0) + + void SessionHandler::attached(const std::string& name) { - checkName(name); + CHECK_NAME(name, "session.attached"); } void SessionHandler::detach(const std::string& name) { - checkName(name); - peer.detached(name, session::NORMAL); + CHECK_NAME(name, "session.detach"); + peer.detached(name, session::DETACH_CODE_NORMAL); handleDetach(); } void SessionHandler::detached(const std::string& name, uint8_t code) { - checkName(name); - ignoring = false; - if (code != session::NORMAL) - channelException(code, "session.detached from peer."); + CHECK_NAME(name, "session.detached"); + awaitingDetached = false; + if (code != session::DETACH_CODE_NORMAL) + channelException(convert(code), "session.detached from peer."); else { handleDetach(); } @@ -247,7 +275,7 @@ void SessionHandler::gap(const SequenceSet& /*commands*/) { void SessionHandler::sendDetach() { checkAttached(); - ignoring = true; + awaitingDetached = true; peer.detach(getState()->getId().getName()); } @@ -258,7 +286,6 @@ void SessionHandler::sendCompletion() { } void SessionHandler::sendAttach(bool force) { - checkAttached(); QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId()); peer.attach(getState()->getId().getName(), force); if (getState()->hasState()) @@ -275,6 +302,12 @@ void SessionHandler::sendCommandPoint(const SessionPoint& point) { } } +void SessionHandler::markReadyToSend() { + if (!sendReady) { + sendReady = true; + } +} + void SessionHandler::sendTimeout(uint32_t t) { checkAttached(); peer.requestTimeout(t); |