summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-06-04 17:24:10 +0000
committerAlan Conway <aconway@apache.org>2008-06-04 17:24:10 +0000
commit8654fb2db74eacff8ec943d192f80ef1fd512f6e (patch)
treeeb07d031edc4a1672a2adee60a0885f665bd9d16 /cpp/src
parent656a4f3d5fcea275f954e3d1c9eaa5087a2e7005 (diff)
downloadqpid-python-8654fb2db74eacff8ec943d192f80ef1fd512f6e.tar.gz
Request a timely reqply to session.completed based on configured flush interval.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@663318 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/SessionState.cpp10
-rw-r--r--cpp/src/qpid/SessionState.h6
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp2
3 files changed, 13 insertions, 5 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index ce8f45d085..d85a14c64c 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -135,10 +135,6 @@ void SessionState::senderRecordFlush() {
}
bool SessionState::senderNeedKnownCompleted() const {
- // FIXME aconway 2008-06-04: this is unpleasant - replayFlushLimit == 0
- // means never send spontaneous flush, but sends a knownCompleted for
- // every completed. Need separate configuration?
- //
return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
}
@@ -187,6 +183,7 @@ bool SessionState::receiverRecord(const AMQFrame& f) {
if (isControl(f)) return true; // Ignore control frames.
stateful = true;
receiver.expected.advance(f);
+ receiver.bytesSinceKnownCompleted += f.size();
bool firstTime = receiver.expected > receiver.received;
if (firstTime) {
receiver.received = receiver.expected;
@@ -211,10 +208,15 @@ void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
if (!commands.empty() && commands.back() > receiver.received.command)
throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands."));
+ receiver.bytesSinceKnownCompleted=0;
receiver.unknownCompleted -= commands;
QPID_LOG(debug, getId() << ": receiver known completed: " << commands << " unknown: " << receiver.unknownCompleted);
}
+bool SessionState::receiverNeedKnownCompleted() const {
+ return receiver.bytesSinceKnownCompleted >= config.replayFlushLimit;
+}
+
const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; }
const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; }
const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; }
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index 80d218b8bb..10937b7a1e 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -148,6 +148,11 @@ class SessionState {
/** Peer has indicated commands are known completed */
virtual void receiverKnownCompleted(const SequenceSet& commands);
+ /** True if the next completed control should set the timely-reply argument
+ * to request a knonw-completed response.
+ */
+ virtual bool receiverNeedKnownCompleted() const;
+
/** Get the incoming command point */
virtual const SessionPoint& receiverGetExpected() const;
@@ -184,6 +189,7 @@ class SessionState {
SessionPoint received; // Received to here. Invariant: expected <= received.
SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
SequenceSet incomplete; // Incomplete received commands.
+ size_t bytesSinceKnownCompleted; // Bytes sent since we last issued a knownCompleted.
} receiver;
SessionId id;
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 53f847a3cf..fa05cee1b3 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -243,7 +243,7 @@ void SessionHandler::sendDetach()
void SessionHandler::sendCompletion() {
checkAttached();
const SequenceSet& c = getState()->receiverGetUnknownComplete();
- peer.completed(c, c.span() > 1000);
+ peer.completed(c, getState()->receiverNeedKnownCompleted());
}
void SessionHandler::sendAttach(bool force) {