diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 92 |
1 files changed, 62 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index ed092d6a05..9b065be8af 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -26,6 +26,8 @@ #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" +#include <boost/bind.hpp> + namespace qpid { namespace broker { using namespace framing; @@ -33,7 +35,9 @@ using namespace std; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : InOutHandler(0, &c.getOutput()), - connection(c), channel(ch), proxy(out), + connection(c), channel(ch, &c.getOutput()), + proxy(out), // Via my own handleOut() for L2 data. + peerSession(channel), // Direct to channel for L2 commands. ignoring(false) {} SessionHandler::~SessionHandler() {} @@ -54,15 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) { try { if (m && invoke(*this, *m)) return; - else if (session.get()) - session->in(f); + else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->in.handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (!ignoring) throw ChannelErrorException( - QPID_MSG("Channel " << channel << " is not open")); + QPID_MSG("Channel " << channel.get() << " is not open")); } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. session.reset(); - getProxy().getSession().closed(e.code, e.toString()); + peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -72,21 +80,22 @@ void SessionHandler::handleIn(AMQFrame& f) { } void SessionHandler::handleOut(AMQFrame& f) { - f.setChannel(getChannel()); - out.next->handle(f); + channel.handle(f); // Send it. + if (session->sent(f)) + peerSession.solicitAck(); } -void SessionHandler::assertOpen(const char* method) { - if (!session.get()) +void SessionHandler::assertAttached(const char* method) const { + if (!session.get()) throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); } -void SessionHandler::assertClosed(const char* method) { +void SessionHandler::assertClosed(const char* method) const { if (session.get()) throw ChannelBusyException( - QPID_MSG(method << " failed: channel " << channel + QPID_MSG(method << " failed: channel " << channel.get() << " is already open.")); } @@ -95,32 +104,38 @@ void SessionHandler::open(uint32_t detachedLifetime) { std::auto_ptr<SessionState> state( connection.broker.getSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); - getProxy().getSession().attached(session->getId(), session->getTimeout()); + peerSession.attached(session->getId(), session->getTimeout()); } void SessionHandler::resume(const Uuid& id) { assertClosed("resume"); - session = connection.broker.getSessionManager().resume(*this, id); - getProxy().getSession().attached(session->getId(), session->getTimeout()); + session = connection.broker.getSessionManager().resume(id); + session->attach(*this); + SequenceNumber seq = session->resuming(); + peerSession.attached(session->getId(), session->getTimeout()); + proxy.getSession().ack(seq, SequenceNumberSet()); } void SessionHandler::flow(bool /*active*/) { + assertAttached("flow"); // FIXME aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException(); + assert(0); throw NotImplementedException("session.flow"); } void SessionHandler::flowOk(bool /*active*/) { + assertAttached("flowOk"); // FIXME aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException(); + assert(0); throw NotImplementedException("session.flowOk"); } void SessionHandler::close() { + assertAttached("close"); QPID_LOG(info, "Received session.close"); ignoring=false; session.reset(); - getProxy().getSession().closed(REPLY_SUCCESS, "ok"); - assert(&connection.getChannel(channel) == this); - connection.closeChannel(channel); + peerSession.closed(REPLY_SUCCESS, "ok"); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); } void SessionHandler::closed(uint16_t replyCode, const string& replyText) { @@ -129,26 +144,43 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) { session.reset(); } +void SessionHandler::localSuspend() { + if (session.get() && session->getState() == SessionState::ATTACHED) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + } +} + void SessionHandler::suspend() { - assertOpen("suspend"); - connection.broker.getSessionManager().suspend(session); - assert(!session.get()); - getProxy().getSession().detached(); - assert(&connection.getChannel(channel) == this); - connection.closeChannel(channel); + assertAttached("suspend"); + localSuspend(); + peerSession.detached(); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); } -void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/, - const SequenceNumberSet& /*seenFrameSet*/) { - assert(0); throw NotImplementedException(); +void SessionHandler::ack(uint32_t cumulativeSeenMark, + const SequenceNumberSet& /*seenFrameSet*/) +{ + assertAttached("ack"); + if (session->getState() == SessionState::RESUMING) { + session->receivedAck(cumulativeSeenMark); + framing::SessionState::Replay replay=session->replay(); + std::for_each(replay.begin(), replay.end(), + boost::bind(&SessionHandler::handleOut, this, _1)); + } + else + session->receivedAck(cumulativeSeenMark); } void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - assert(0); throw NotImplementedException(); + // FIXME aconway 2007-10-02: may be removed from spec. + assert(0); throw NotImplementedException("session.high-water-mark"); } void SessionHandler::solicitAck() { - assert(0); throw NotImplementedException(); + assertAttached("solicit-ack"); + peerSession.ack(session->sendingAck(), SequenceNumberSet()); } }} // namespace qpid::broker |