diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpidity/transport/Session.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpidity/transport/Session.java | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 331e1517d1..988ac4788f 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -128,6 +128,10 @@ public class Session extends Invoker int id = nextCommandId(); cmd.setId(id); log.debug("ID: [%s] %s", this.channel, id); + if ((id % 65536) == 0) + { + flushProcessed(true); + } } public void processed(Method command) @@ -164,12 +168,17 @@ public class Session extends Invoker public void flushProcessed() { + flushProcessed(false); + } + + private void flushProcessed(boolean timely_reply) + { RangeSet copy; synchronized (processedLock) { copy = processed.copy(); } - sessionCompleted(copy); + sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION); } void knownComplete(RangeSet kc) @@ -177,16 +186,15 @@ public class Session extends Invoker synchronized (processedLock) { RangeSet newProcessed = new RangeSet(); - OUTER: for (Range r : processed) + for (Range pr : processed) { for (Range kr : kc) { - if (kr.includes(r)) + for (Range r : pr.subtract(kr)) { - continue OUTER; + newProcessed.add(r); } } - newProcessed.add(r); } this.processed = newProcessed; } |