diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 63 |
1 files changed, 49 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 1cb10d0c19..0e3c9928d1 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,6 +21,7 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" +#include "ConnectionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" #include "qpid/framing/ClientInvoker.h" @@ -36,7 +37,7 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - return; - } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); - session->in.handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; - } else if (!ignoring) { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); + if (!ignoring) { + if (m && + (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || + invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + return; + } else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { + return; + } else { + throw ChannelErrorException( + QPID_MSG("Channel " << channel.get() << " is not open")); + } } } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. @@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) { } void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) + if (!session.get()) { + std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); + } } void SessionHandler::assertClosed(const char* method) const { @@ -208,4 +215,32 @@ void SessionHandler::detached() ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } +void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + assertAttached("complete"); + session->complete(cumulative, range); +} + +void SessionHandler::flush() +{ + assertAttached("flush"); + session->flush(); +} +void SessionHandler::sync() +{ + assertAttached("sync"); + session->sync(); +} + +void SessionHandler::noop() +{ + assertAttached("noop"); + session->noop(); +} + +void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //never actually sent by client at present +} + }} // namespace qpid::broker |