summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/amqp_0_10/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp121
1 files changed, 77 insertions, 44 deletions
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 35587940e5..5f97d292bc 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -19,10 +19,11 @@
*/
-#include "SessionHandler.h"
+#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/SessionState.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
@@ -33,30 +34,35 @@ namespace amqp_0_10 {
using namespace framing;
using namespace std;
+void SessionHandler::checkAttached() {
+ if (!getState())
+ throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached"));
+}
+
SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch)
- : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {}
+ : channel(ch, out), peer(channel),
+ awaitingDetached(false),
+ sendReady(), receiveReady() {}
SessionHandler::~SessionHandler() {}
namespace {
bool isSessionControl(AMQMethodBody* m) {
- return m &&
- m->amqpClassId() == SESSION_CLASS_ID;
+ return m && m->amqpClassId() == SESSION_CLASS_ID;
}
-bool isSessionDetachedControl(AMQMethodBody* m) {
- return isSessionControl(m) &&
- m->amqpMethodId() == SESSION_DETACHED_METHOD_ID;
-}
-} // namespace
-void SessionHandler::checkAttached() {
- if (!getState())
- throw NotAttachedException(
- QPID_MSG("Channel " << channel.get() << " is not attached"));
- assert(getInHandler());
- assert(channel.next);
+session::DetachCode convert(uint8_t code) {
+ switch(code) {
+ case 0: return session::DETACH_CODE_NORMAL;
+ case 1: return session::DETACH_CODE_SESSION_BUSY;
+ case 2: return session::DETACH_CODE_TRANSPORT_BUSY;
+ case 3: return session::DETACH_CODE_NOT_ATTACHED;
+ case 4: default: return session::DETACH_CODE_UNKNOWN_IDS;
+ }
}
+} // namespace
+
void SessionHandler::invoke(const AMQMethodBody& m) {
framing::invoke(*this, m);
}
@@ -65,12 +71,19 @@ void SessionHandler::handleIn(AMQFrame& f) {
// Note on channel states: a channel is attached if session != 0
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (ignoring && !isSessionDetachedControl(m))
- return;
- else if (isSessionControl(m))
+ // Ignore all but detach controls while awaiting detach
+ if (awaitingDetached) {
+ if (!isSessionControl(m)) return;
+ if (m->amqpMethodId() != SESSION_DETACH_METHOD_ID &&
+ m->amqpMethodId() != SESSION_DETACHED_METHOD_ID)
+ return;
+ }
+ if (isSessionControl(m)) {
invoke(*m);
+ }
else {
- checkAttached();
+ // Drop frames if we are detached.
+ if (!getState()) return;
if (!receiveReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
if (!getState()->receiverRecord(f))
@@ -80,11 +93,21 @@ void SessionHandler::handleIn(AMQFrame& f) {
getInHandler()->handle(f);
}
}
+ catch(const SessionException& e) {
+ QPID_LOG(error, "Execution exception: " << e.what());
+ executionException(e.code, e.what()); // Let subclass handle this first.
+ framing::AMQP_AllProxy::Execution execution(channel);
+ AMQMethodBody* m = f.getMethod();
+ SequenceNumber commandId;
+ if (getState()) commandId = getState()->receiverGetCurrent();
+ execution.exception(e.code, commandId, m ? m->amqpClassId() : 0, m ? m->amqpMethodId() : 0, 0, e.what(), FieldTable());
+ detaching();
+ sendDetach();
+ }
catch(const ChannelException& e){
QPID_LOG(error, "Channel exception: " << e.what());
- if (getState())
- peer.detached(getState()->getId().getName(), e.code);
- channelException(e.code, e.getMessage());
+ channelException(e.code, e.what()); // Let subclass handle this first.
+ peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
QPID_LOG(error, "Connection exception: " << e.what());
@@ -92,16 +115,16 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
catch(const std::exception& e) {
QPID_LOG(error, "Unexpected exception: " << e.what());
- connectionException(connection::FRAMING_ERROR, e.what());
+ connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what());
}
}
namespace {
bool isControl(const AMQFrame& f) {
- return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+ return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL;
}
bool isCommand(const AMQFrame& f) {
- return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
+ return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND;
}
} // namespace
@@ -117,19 +140,15 @@ void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f);
}
-void SessionHandler::checkName(const std::string& name) {
- checkAttached();
- if (name != getState()->getId().getName())
- throw InvalidArgumentException(
- QPID_MSG("Incorrect session name: " << name
- << ", expecting: " << getState()->getId().getName()));
-}
-
-void SessionHandler::attach(const std::string& name, bool force) {
+void SessionHandler::attach(const std::string& name_, bool force) {
+ // Save the name for possible session-busy exception. Session-busy
+ // can be thrown before we have attached the handler to a valid
+ // SessionState, and in that case we need the name to send peer.detached
+ name = name_;
if (getState() && name == getState()->getId().getName())
return; // Idempotent
if (getState())
- throw SessionBusyException(
+ throw TransportBusyException(
QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId()));
setState(name, force);
QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId());
@@ -140,21 +159,30 @@ void SessionHandler::attach(const std::string& name, bool force) {
sendCommandPoint(getState()->senderGetCommandPoint());
}
+#define CHECK_NAME(NAME, MSG) do { \
+ checkAttached(); \
+ if (NAME != getState()->getId().getName()) \
+ throw InvalidArgumentException( \
+ QPID_MSG(MSG << ": incorrect session name: " << NAME \
+ << ", expecting: " << getState()->getId().getName())); \
+ } while(0)
+
+
void SessionHandler::attached(const std::string& name) {
- checkName(name);
+ CHECK_NAME(name, "session.attached");
}
void SessionHandler::detach(const std::string& name) {
- checkName(name);
- peer.detached(name, session::NORMAL);
+ CHECK_NAME(name, "session.detach");
+ peer.detached(name, session::DETACH_CODE_NORMAL);
handleDetach();
}
void SessionHandler::detached(const std::string& name, uint8_t code) {
- checkName(name);
- ignoring = false;
- if (code != session::NORMAL)
- channelException(code, "session.detached from peer.");
+ CHECK_NAME(name, "session.detached");
+ awaitingDetached = false;
+ if (code != session::DETACH_CODE_NORMAL)
+ channelException(convert(code), "session.detached from peer.");
else {
handleDetach();
}
@@ -247,7 +275,7 @@ void SessionHandler::gap(const SequenceSet& /*commands*/) {
void SessionHandler::sendDetach()
{
checkAttached();
- ignoring = true;
+ awaitingDetached = true;
peer.detach(getState()->getId().getName());
}
@@ -258,7 +286,6 @@ void SessionHandler::sendCompletion() {
}
void SessionHandler::sendAttach(bool force) {
- checkAttached();
QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId());
peer.attach(getState()->getId().getName(), force);
if (getState()->hasState())
@@ -275,6 +302,12 @@ void SessionHandler::sendCommandPoint(const SessionPoint& point) {
}
}
+void SessionHandler::markReadyToSend() {
+ if (!sendReady) {
+ sendReady = true;
+ }
+}
+
void SessionHandler::sendTimeout(uint32_t t) {
checkAttached();
peer.requestTimeout(t);