diff options
author | Gordon Sim <gsim@apache.org> | 2008-02-25 16:56:29 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-02-25 16:56:29 +0000 |
commit | 773cc35a38cd34095f8800259ee7a2165a817053 (patch) | |
tree | 24c6b26d43ae2f95a7ad7be695bbe57ddbc4b641 /cpp/src/qpid/broker/SessionHandler.cpp | |
parent | f4e2d3de5df7bbaae41053191638734a991fd21c (diff) | |
download | qpid-python-773cc35a38cd34095f8800259ee7a2165a817053.tar.gz |
Some refactoring of the 0-10 codepath (being migrated to final spec) that primarily colocates the current session and execution layers to facilitate implementing the new session layer that will now encompass this behaviour.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@630934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 63 |
1 files changed, 49 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 1cb10d0c19..0e3c9928d1 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,6 +21,7 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" +#include "ConnectionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" #include "qpid/framing/ClientInvoker.h" @@ -36,7 +37,7 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - return; - } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); - session->in.handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; - } else if (!ignoring) { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); + if (!ignoring) { + if (m && + (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || + invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + return; + } else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { + return; + } else { + throw ChannelErrorException( + QPID_MSG("Channel " << channel.get() << " is not open")); + } } } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. @@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) { } void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) + if (!session.get()) { + std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); + } } void SessionHandler::assertClosed(const char* method) const { @@ -208,4 +215,32 @@ void SessionHandler::detached() ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } +void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + assertAttached("complete"); + session->complete(cumulative, range); +} + +void SessionHandler::flush() +{ + assertAttached("flush"); + session->flush(); +} +void SessionHandler::sync() +{ + assertAttached("sync"); + session->sync(); +} + +void SessionHandler::noop() +{ + assertAttached("noop"); + session->noop(); +} + +void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //never actually sent by client at present +} + }} // namespace qpid::broker |