From 28a7fd34ef9fc43a1a2d44c0cb383535eea17ab7 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 13 Jun 2008 18:38:25 +0000 Subject: QPID-901: request known-completed every 64K incoming commands, fixed handling of incoming known-completed to clear out processed set git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@667615 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpidity/transport/Range.java | 39 ++++++++++++++++++++++ .../java/org/apache/qpidity/transport/Session.java | 18 +++++++--- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Range.java b/java/common/src/main/java/org/apache/qpidity/transport/Range.java index 827eade9f8..780f9e5997 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Range.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Range.java @@ -20,6 +20,9 @@ */ package org.apache.qpidity.transport; +import java.util.ArrayList; +import java.util.List; + import static org.apache.qpid.util.Serial.*; @@ -77,6 +80,42 @@ public class Range return new Range(min(lower, range.lower), max(upper, range.upper)); } + public List subtract(Range range) + { + List result = new ArrayList(); + + if (includes(range.lower) && le(lower, range.lower - 1)) + { + result.add(new Range(lower, range.lower - 1)); + } + + if (includes(range.upper) && le(range.upper + 1, upper)) + { + result.add(new Range(range.upper + 1, upper)); + } + + if (result.isEmpty() && !range.includes(this)) + { + result.add(this); + } + + return result; + } + + public Range intersect(Range range) + { + int l = max(lower, range.lower); + int r = min(upper, range.upper); + if (gt(l, r)) + { + return null; + } + else + { + return new Range(l, r); + } + } + public String toString() { return "[" + lower + ", " + upper + "]"; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 331e1517d1..988ac4788f 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -128,6 +128,10 @@ public class Session extends Invoker int id = nextCommandId(); cmd.setId(id); log.debug("ID: [%s] %s", this.channel, id); + if ((id % 65536) == 0) + { + flushProcessed(true); + } } public void processed(Method command) @@ -163,13 +167,18 @@ public class Session extends Invoker } public void flushProcessed() + { + flushProcessed(false); + } + + private void flushProcessed(boolean timely_reply) { RangeSet copy; synchronized (processedLock) { copy = processed.copy(); } - sessionCompleted(copy); + sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION); } void knownComplete(RangeSet kc) @@ -177,16 +186,15 @@ public class Session extends Invoker synchronized (processedLock) { RangeSet newProcessed = new RangeSet(); - OUTER: for (Range r : processed) + for (Range pr : processed) { for (Range kr : kc) { - if (kr.includes(r)) + for (Range r : pr.subtract(kr)) { - continue OUTER; + newProcessed.add(r); } } - newProcessed.add(r); } this.processed = newProcessed; } -- cgit v1.2.1