summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-06-13 18:38:25 +0000
committerRafael H. Schloming <rhs@apache.org>2008-06-13 18:38:25 +0000
commit28a7fd34ef9fc43a1a2d44c0cb383535eea17ab7 (patch)
treec8dafa9ae6f70eb97406a976052812c55cc2ddbc
parent0a99f79e0d90f0d1c0836fbef124bfe269677840 (diff)
downloadqpid-python-28a7fd34ef9fc43a1a2d44c0cb383535eea17ab7.tar.gz
QPID-901: request known-completed every 64K incoming commands, fixed handling of incoming known-completed to clear out processed set
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@667615 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Range.java39
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java18
2 files changed, 52 insertions, 5 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Range.java b/java/common/src/main/java/org/apache/qpidity/transport/Range.java
index 827eade9f8..780f9e5997 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Range.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Range.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpidity.transport;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.apache.qpid.util.Serial.*;
@@ -77,6 +80,42 @@ public class Range
return new Range(min(lower, range.lower), max(upper, range.upper));
}
+ public List<Range> subtract(Range range)
+ {
+ List<Range> result = new ArrayList<Range>();
+
+ if (includes(range.lower) && le(lower, range.lower - 1))
+ {
+ result.add(new Range(lower, range.lower - 1));
+ }
+
+ if (includes(range.upper) && le(range.upper + 1, upper))
+ {
+ result.add(new Range(range.upper + 1, upper));
+ }
+
+ if (result.isEmpty() && !range.includes(this))
+ {
+ result.add(this);
+ }
+
+ return result;
+ }
+
+ public Range intersect(Range range)
+ {
+ int l = max(lower, range.lower);
+ int r = min(upper, range.upper);
+ if (gt(l, r))
+ {
+ return null;
+ }
+ else
+ {
+ return new Range(l, r);
+ }
+ }
+
public String toString()
{
return "[" + lower + ", " + upper + "]";
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 331e1517d1..988ac4788f 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -128,6 +128,10 @@ public class Session extends Invoker
int id = nextCommandId();
cmd.setId(id);
log.debug("ID: [%s] %s", this.channel, id);
+ if ((id % 65536) == 0)
+ {
+ flushProcessed(true);
+ }
}
public void processed(Method command)
@@ -164,12 +168,17 @@ public class Session extends Invoker
public void flushProcessed()
{
+ flushProcessed(false);
+ }
+
+ private void flushProcessed(boolean timely_reply)
+ {
RangeSet copy;
synchronized (processedLock)
{
copy = processed.copy();
}
- sessionCompleted(copy);
+ sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION);
}
void knownComplete(RangeSet kc)
@@ -177,16 +186,15 @@ public class Session extends Invoker
synchronized (processedLock)
{
RangeSet newProcessed = new RangeSet();
- OUTER: for (Range r : processed)
+ for (Range pr : processed)
{
for (Range kr : kc)
{
- if (kr.includes(r))
+ for (Range r : pr.subtract(kr))
{
- continue OUTER;
+ newProcessed.add(r);
}
}
- newProcessed.add(r);
}
this.processed = newProcessed;
}