diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-06-13 18:38:25 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-06-13 18:38:25 +0000 |
commit | 28a7fd34ef9fc43a1a2d44c0cb383535eea17ab7 (patch) | |
tree | c8dafa9ae6f70eb97406a976052812c55cc2ddbc | |
parent | 0a99f79e0d90f0d1c0836fbef124bfe269677840 (diff) | |
download | qpid-python-28a7fd34ef9fc43a1a2d44c0cb383535eea17ab7.tar.gz |
QPID-901: request known-completed every 64K incoming commands, fixed handling of incoming known-completed to clear out processed set
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@667615 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/common/src/main/java/org/apache/qpidity/transport/Range.java | 39 | ||||
-rw-r--r-- | java/common/src/main/java/org/apache/qpidity/transport/Session.java | 18 |
2 files changed, 52 insertions, 5 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Range.java b/java/common/src/main/java/org/apache/qpidity/transport/Range.java index 827eade9f8..780f9e5997 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Range.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Range.java @@ -20,6 +20,9 @@ */ package org.apache.qpidity.transport; +import java.util.ArrayList; +import java.util.List; + import static org.apache.qpid.util.Serial.*; @@ -77,6 +80,42 @@ public class Range return new Range(min(lower, range.lower), max(upper, range.upper)); } + public List<Range> subtract(Range range) + { + List<Range> result = new ArrayList<Range>(); + + if (includes(range.lower) && le(lower, range.lower - 1)) + { + result.add(new Range(lower, range.lower - 1)); + } + + if (includes(range.upper) && le(range.upper + 1, upper)) + { + result.add(new Range(range.upper + 1, upper)); + } + + if (result.isEmpty() && !range.includes(this)) + { + result.add(this); + } + + return result; + } + + public Range intersect(Range range) + { + int l = max(lower, range.lower); + int r = min(upper, range.upper); + if (gt(l, r)) + { + return null; + } + else + { + return new Range(l, r); + } + } + public String toString() { return "[" + lower + ", " + upper + "]"; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 331e1517d1..988ac4788f 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -128,6 +128,10 @@ public class Session extends Invoker int id = nextCommandId(); cmd.setId(id); log.debug("ID: [%s] %s", this.channel, id); + if ((id % 65536) == 0) + { + flushProcessed(true); + } } public void processed(Method command) @@ -164,12 +168,17 @@ public class Session extends Invoker public void flushProcessed() { + flushProcessed(false); + } + + private void flushProcessed(boolean timely_reply) + { RangeSet copy; synchronized (processedLock) { copy = processed.copy(); } - sessionCompleted(copy); + sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION); } void knownComplete(RangeSet kc) @@ -177,16 +186,15 @@ public class Session extends Invoker synchronized (processedLock) { RangeSet newProcessed = new RangeSet(); - OUTER: for (Range r : processed) + for (Range pr : processed) { for (Range kr : kc) { - if (kr.includes(r)) + for (Range r : pr.subtract(kr)) { - continue OUTER; + newProcessed.add(r); } } - newProcessed.add(r); } this.processed = newProcessed; } |