diff options
author | Alan Conway <aconway@apache.org> | 2008-05-22 15:51:15 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-22 15:51:15 +0000 |
commit | 4043a9f33164862a9a9de9f1243d77093e7918f2 (patch) | |
tree | a5c2922a6331384a33bd44037d4ba926bad671e5 | |
parent | 3ee4d6aeac14d4dcf7df7cd531b7a2778714bdcd (diff) | |
download | qpid-python-4043a9f33164862a9a9de9f1243d77093e7918f2.tar.gz |
Improved logging for session state - show incomplete commands on receive-completed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659139 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/SessionState.h | 29 |
2 files changed, 29 insertions, 23 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index b57035b586..96f2cca726 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -32,6 +32,7 @@ using amqp_0_10::NotImplementedException; using amqp_0_10::InvalidArgumentException; using amqp_0_10::IllegalStateException; using amqp_0_10::ResourceLimitExceededException; +using amqp_0_10::InternalErrorException; namespace { bool isControl(const AMQFrame& f) { @@ -145,22 +146,25 @@ bool SessionState::receiverRecord(const AMQFrame& f) { stateful = true; receiver.expected.advance(f); bool firstTime = receiver.expected > receiver.received; - if (firstTime) + if (firstTime) { receiver.received = receiver.expected; + receiver.incomplete += receiverGetCurrent(); + } QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getMethod()); QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << receiverGetCurrent() << ": " << f); return firstTime; } void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { - assert(command <= receiver.received.command); // Internal error to complete an unreceived command. - assert(receiver.firstIncomplete <= command); - if (cumulative) - receiver.unknownCompleted.add(receiver.firstIncomplete, command); - else - receiver.unknownCompleted += command; - receiver.firstIncomplete = receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end(); - QPID_LOG(debug, getId() << ": receiver marked completed: " << command << " unknown: " << receiver.unknownCompleted); + if (!receiver.incomplete.contains(command)) + throw InternalErrorException(QPID_MSG(getId() << "command is not received-incomplete: " << command )); + SequenceNumber first =cumulative ? receiver.incomplete.front() : command; + SequenceNumber last = command; + receiver.unknownCompleted.add(first, last); + receiver.incomplete.remove(first, last); + QPID_LOG(debug, getId() << ": receiver marked completed: " << command + << " incomplete: " << receiver.incomplete + << " unknown-completed: " << receiver.unknownCompleted); } void SessionState::receiverKnownCompleted(const SequenceSet& commands) { @@ -173,6 +177,7 @@ void SessionState::receiverKnownCompleted(const SequenceSet& commands) { const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; } const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; } const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; } +const SequenceSet& SessionState::receiverGetIncomplete() const { return receiver.incomplete; } SequenceNumber SessionState::receiverGetCurrent() const { SequenceNumber current = receiver.expected.command; diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index 47fabb04c8..f72b41ba55 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -108,7 +108,7 @@ class SessionState { /** Called when flush for confirmed and completed commands is sent to peer. */ virtual void senderRecordFlush(); - /** Called when the peer confirms up to comfirmed. */ + /** Called when the peer confirms up to comfirmed. */ virtual void senderConfirmed(const SessionPoint& confirmed); /** Called when the peer indicates commands completed */ @@ -117,15 +117,15 @@ class SessionState { /** Point from which the next new (not replayed) data will be sent. */ virtual SessionPoint senderGetCommandPoint(); - /** Set of outstanding incomplete commands */ + /** Set of outstanding incomplete commands */ virtual SequenceSet senderGetIncomplete() const; /** Point from which we can replay. */ virtual SessionPoint senderGetReplayPoint() const; - /** Peer expecting commands from this point. - virtual *@return Range of frames to be replayed. - */ + /** Peer expecting commands from this point. + virtual *@return Range of frames to be replayed. + */ virtual ReplayRange senderExpected(const SessionPoint& expected); @@ -143,23 +143,25 @@ class SessionState { /** Peer has indicated commands are known completed */ virtual void receiverKnownCompleted(const SequenceSet& commands); - /** Get the incoming command point */ + /** Get the incoming command point */ virtual const SessionPoint& receiverGetExpected() const; - /** Get the received high-water-mark, may be > getExpected() during replay */ + /** Get the received high-water-mark, may be > getExpected() during replay */ virtual const SessionPoint& receiverGetReceived() const; - /** Completed commands that the peer may not know about */ + /** Completed received commands that the peer may not know about. */ virtual const SequenceSet& receiverGetUnknownComplete() const; - /** ID of the command currently being handled. */ + /** Incomplete received commands. */ + virtual const SequenceSet& receiverGetIncomplete() const; + + /** ID of the command currently being handled. */ virtual SequenceNumber receiverGetCurrent() const; private: struct SendState { - SendState() : session(), unflushedSize(), replaySize() {} - SessionState* session; + SendState() : unflushedSize(), replaySize() {} // invariant: replayPoint <= flushPoint <= sendPoint SessionPoint replayPoint; // Can replay from this point SessionPoint flushPoint; // Point of last flush @@ -171,12 +173,11 @@ class SessionState { } sender; struct ReceiveState { - ReceiveState() : session() {} - SessionState* session; + ReceiveState() {} SessionPoint expected; // Expected from here SessionPoint received; // Received to here. Invariant: expected <= received. SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. - SequenceNumber firstIncomplete; // First incomplete command. + SequenceSet incomplete; // Incomplete received commands. } receiver; SessionId id; |