diff options
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/SessionState.h | 29 |
2 files changed, 29 insertions, 23 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index b57035b586..96f2cca726 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -32,6 +32,7 @@ using amqp_0_10::NotImplementedException; using amqp_0_10::InvalidArgumentException; using amqp_0_10::IllegalStateException; using amqp_0_10::ResourceLimitExceededException; +using amqp_0_10::InternalErrorException; namespace { bool isControl(const AMQFrame& f) { @@ -145,22 +146,25 @@ bool SessionState::receiverRecord(const AMQFrame& f) { stateful = true; receiver.expected.advance(f); bool firstTime = receiver.expected > receiver.received; - if (firstTime) + if (firstTime) { receiver.received = receiver.expected; + receiver.incomplete += receiverGetCurrent(); + } QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getMethod()); QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << receiverGetCurrent() << ": " << f); return firstTime; } void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { - assert(command <= receiver.received.command); // Internal error to complete an unreceived command. - assert(receiver.firstIncomplete <= command); - if (cumulative) - receiver.unknownCompleted.add(receiver.firstIncomplete, command); - else - receiver.unknownCompleted += command; - receiver.firstIncomplete = receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end(); - QPID_LOG(debug, getId() << ": receiver marked completed: " << command << " unknown: " << receiver.unknownCompleted); + if (!receiver.incomplete.contains(command)) + throw InternalErrorException(QPID_MSG(getId() << "command is not received-incomplete: " << command )); + SequenceNumber first =cumulative ? receiver.incomplete.front() : command; + SequenceNumber last = command; + receiver.unknownCompleted.add(first, last); + receiver.incomplete.remove(first, last); + QPID_LOG(debug, getId() << ": receiver marked completed: " << command + << " incomplete: " << receiver.incomplete + << " unknown-completed: " << receiver.unknownCompleted); } void SessionState::receiverKnownCompleted(const SequenceSet& commands) { @@ -173,6 +177,7 @@ void SessionState::receiverKnownCompleted(const SequenceSet& commands) { const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; } const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; } const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; } +const SequenceSet& SessionState::receiverGetIncomplete() const { return receiver.incomplete; } SequenceNumber SessionState::receiverGetCurrent() const { SequenceNumber current = receiver.expected.command; diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index 47fabb04c8..f72b41ba55 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -108,7 +108,7 @@ class SessionState { /** Called when flush for confirmed and completed commands is sent to peer. */ virtual void senderRecordFlush(); - /** Called when the peer confirms up to comfirmed. */ + /** Called when the peer confirms up to comfirmed. */ virtual void senderConfirmed(const SessionPoint& confirmed); /** Called when the peer indicates commands completed */ @@ -117,15 +117,15 @@ class SessionState { /** Point from which the next new (not replayed) data will be sent. */ virtual SessionPoint senderGetCommandPoint(); - /** Set of outstanding incomplete commands */ + /** Set of outstanding incomplete commands */ virtual SequenceSet senderGetIncomplete() const; /** Point from which we can replay. */ virtual SessionPoint senderGetReplayPoint() const; - /** Peer expecting commands from this point. - virtual *@return Range of frames to be replayed. - */ + /** Peer expecting commands from this point. + virtual *@return Range of frames to be replayed. + */ virtual ReplayRange senderExpected(const SessionPoint& expected); @@ -143,23 +143,25 @@ class SessionState { /** Peer has indicated commands are known completed */ virtual void receiverKnownCompleted(const SequenceSet& commands); - /** Get the incoming command point */ + /** Get the incoming command point */ virtual const SessionPoint& receiverGetExpected() const; - /** Get the received high-water-mark, may be > getExpected() during replay */ + /** Get the received high-water-mark, may be > getExpected() during replay */ virtual const SessionPoint& receiverGetReceived() const; - /** Completed commands that the peer may not know about */ + /** Completed received commands that the peer may not know about. */ virtual const SequenceSet& receiverGetUnknownComplete() const; - /** ID of the command currently being handled. */ + /** Incomplete received commands. */ + virtual const SequenceSet& receiverGetIncomplete() const; + + /** ID of the command currently being handled. */ virtual SequenceNumber receiverGetCurrent() const; private: struct SendState { - SendState() : session(), unflushedSize(), replaySize() {} - SessionState* session; + SendState() : unflushedSize(), replaySize() {} // invariant: replayPoint <= flushPoint <= sendPoint SessionPoint replayPoint; // Can replay from this point SessionPoint flushPoint; // Point of last flush @@ -171,12 +173,11 @@ class SessionState { } sender; struct ReceiveState { - ReceiveState() : session() {} - SessionState* session; + ReceiveState() {} SessionPoint expected; // Expected from here SessionPoint received; // Received to here. Invariant: expected <= received. SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. - SequenceNumber firstIncomplete; // First incomplete command. + SequenceSet incomplete; // Incomplete received commands. } receiver; SessionId id; |