diff options
Diffstat (limited to 'cpp/src/qpid/framing/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/framing/SessionState.cpp | 30 |
1 files changed, 22 insertions, 8 deletions
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp index 045a0ae115..8056f4a523 100644 --- a/cpp/src/qpid/framing/SessionState.cpp +++ b/cpp/src/qpid/framing/SessionState.cpp @@ -40,11 +40,22 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) : ackInterval(ack), sendAckAt(lastReceived+ackInterval), solicitAckAt(lastSent+ackInterval), - ackSolicited(false) + ackSolicited(false), + resumable(true) +{} + +SessionState::SessionState(const Uuid& uuid) : + state(ATTACHED), + id(uuid), + lastReceived(-1), + lastSent(-1), + ackInterval(0), + sendAckAt(0), + solicitAckAt(0), + ackSolicited(false), + resumable(false) { - assert(ackInterval > 0); } - namespace { bool isSessionCommand(const AMQFrame& f) { return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID; @@ -58,10 +69,9 @@ boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { throw CommandInvalidException( QPID_MSG("Invalid frame: Resuming session, expected session-ack")); assert(state = ATTACHED); - assert(lastReceived<sendAckAt); ++lastReceived; QPID_LOG(trace, "Recv # "<< lastReceived << " " << id); - if (lastReceived == sendAckAt) + if (ackInterval && lastReceived == sendAckAt) return sendingAck(); else return boost::none; @@ -70,10 +80,12 @@ boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { bool SessionState::sent(const AMQFrame& f) { if (isSessionCommand(f)) return false; - unackedOut.push_back(f); + if (resumable) + unackedOut.push_back(f); ++lastSent; QPID_LOG(trace, "Sent # "<< lastSent << " " << id); - return (state!=RESUMING) && + return ackInterval && + (state!=RESUMING) && (lastSent == solicitAckAt) && sendingSolicit(); } @@ -105,10 +117,12 @@ bool SessionState::sendingSolicit() { if (ackSolicited) return false; solicitAckAt = lastSent + ackInterval; - return true; + return ackInterval != 0; } SequenceNumber SessionState::resuming() { + if (!resumable) + throw InternalErrorException("Session is not resumable"); state = RESUMING; return sendingAck(); } |