summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-22 15:51:15 +0000
committerAlan Conway <aconway@apache.org>2008-05-22 15:51:15 +0000
commit4043a9f33164862a9a9de9f1243d77093e7918f2 (patch)
treea5c2922a6331384a33bd44037d4ba926bad671e5
parent3ee4d6aeac14d4dcf7df7cd531b7a2778714bdcd (diff)
downloadqpid-python-4043a9f33164862a9a9de9f1243d77093e7918f2.tar.gz
Improved logging for session state - show incomplete commands on receive-completed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659139 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/SessionState.cpp23
-rw-r--r--cpp/src/qpid/SessionState.h29
2 files changed, 29 insertions, 23 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index b57035b586..96f2cca726 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -32,6 +32,7 @@ using amqp_0_10::NotImplementedException;
using amqp_0_10::InvalidArgumentException;
using amqp_0_10::IllegalStateException;
using amqp_0_10::ResourceLimitExceededException;
+using amqp_0_10::InternalErrorException;
namespace {
bool isControl(const AMQFrame& f) {
@@ -145,22 +146,25 @@ bool SessionState::receiverRecord(const AMQFrame& f) {
stateful = true;
receiver.expected.advance(f);
bool firstTime = receiver.expected > receiver.received;
- if (firstTime)
+ if (firstTime) {
receiver.received = receiver.expected;
+ receiver.incomplete += receiverGetCurrent();
+ }
QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << receiverGetCurrent() << ": " << *f.getMethod());
QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << receiverGetCurrent() << ": " << f);
return firstTime;
}
void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
- assert(command <= receiver.received.command); // Internal error to complete an unreceived command.
- assert(receiver.firstIncomplete <= command);
- if (cumulative)
- receiver.unknownCompleted.add(receiver.firstIncomplete, command);
- else
- receiver.unknownCompleted += command;
- receiver.firstIncomplete = receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end();
- QPID_LOG(debug, getId() << ": receiver marked completed: " << command << " unknown: " << receiver.unknownCompleted);
+ if (!receiver.incomplete.contains(command))
+ throw InternalErrorException(QPID_MSG(getId() << "command is not received-incomplete: " << command ));
+ SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
+ SequenceNumber last = command;
+ receiver.unknownCompleted.add(first, last);
+ receiver.incomplete.remove(first, last);
+ QPID_LOG(debug, getId() << ": receiver marked completed: " << command
+ << " incomplete: " << receiver.incomplete
+ << " unknown-completed: " << receiver.unknownCompleted);
}
void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
@@ -173,6 +177,7 @@ void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; }
const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; }
const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; }
+const SequenceSet& SessionState::receiverGetIncomplete() const { return receiver.incomplete; }
SequenceNumber SessionState::receiverGetCurrent() const {
SequenceNumber current = receiver.expected.command;
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index 47fabb04c8..f72b41ba55 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -108,7 +108,7 @@ class SessionState {
/** Called when flush for confirmed and completed commands is sent to peer. */
virtual void senderRecordFlush();
- /** Called when the peer confirms up to comfirmed. */
+ /** Called when the peer confirms up to comfirmed. */
virtual void senderConfirmed(const SessionPoint& confirmed);
/** Called when the peer indicates commands completed */
@@ -117,15 +117,15 @@ class SessionState {
/** Point from which the next new (not replayed) data will be sent. */
virtual SessionPoint senderGetCommandPoint();
- /** Set of outstanding incomplete commands */
+ /** Set of outstanding incomplete commands */
virtual SequenceSet senderGetIncomplete() const;
/** Point from which we can replay. */
virtual SessionPoint senderGetReplayPoint() const;
- /** Peer expecting commands from this point.
- virtual *@return Range of frames to be replayed.
- */
+ /** Peer expecting commands from this point.
+ virtual *@return Range of frames to be replayed.
+ */
virtual ReplayRange senderExpected(const SessionPoint& expected);
@@ -143,23 +143,25 @@ class SessionState {
/** Peer has indicated commands are known completed */
virtual void receiverKnownCompleted(const SequenceSet& commands);
- /** Get the incoming command point */
+ /** Get the incoming command point */
virtual const SessionPoint& receiverGetExpected() const;
- /** Get the received high-water-mark, may be > getExpected() during replay */
+ /** Get the received high-water-mark, may be > getExpected() during replay */
virtual const SessionPoint& receiverGetReceived() const;
- /** Completed commands that the peer may not know about */
+ /** Completed received commands that the peer may not know about. */
virtual const SequenceSet& receiverGetUnknownComplete() const;
- /** ID of the command currently being handled. */
+ /** Incomplete received commands. */
+ virtual const SequenceSet& receiverGetIncomplete() const;
+
+ /** ID of the command currently being handled. */
virtual SequenceNumber receiverGetCurrent() const;
private:
struct SendState {
- SendState() : session(), unflushedSize(), replaySize() {}
- SessionState* session;
+ SendState() : unflushedSize(), replaySize() {}
// invariant: replayPoint <= flushPoint <= sendPoint
SessionPoint replayPoint; // Can replay from this point
SessionPoint flushPoint; // Point of last flush
@@ -171,12 +173,11 @@ class SessionState {
} sender;
struct ReceiveState {
- ReceiveState() : session() {}
- SessionState* session;
+ ReceiveState() {}
SessionPoint expected; // Expected from here
SessionPoint received; // Received to here. Invariant: expected <= received.
SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
- SequenceNumber firstIncomplete; // First incomplete command.
+ SequenceSet incomplete; // Incomplete received commands.
} receiver;
SessionId id;