summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpidity/transport/Session.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpidity/transport/Session.java')
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java18
1 files changed, 13 insertions, 5 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 331e1517d1..988ac4788f 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -128,6 +128,10 @@ public class Session extends Invoker
int id = nextCommandId();
cmd.setId(id);
log.debug("ID: [%s] %s", this.channel, id);
+ if ((id % 65536) == 0)
+ {
+ flushProcessed(true);
+ }
}
public void processed(Method command)
@@ -164,12 +168,17 @@ public class Session extends Invoker
public void flushProcessed()
{
+ flushProcessed(false);
+ }
+
+ private void flushProcessed(boolean timely_reply)
+ {
RangeSet copy;
synchronized (processedLock)
{
copy = processed.copy();
}
- sessionCompleted(copy);
+ sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION);
}
void knownComplete(RangeSet kc)
@@ -177,16 +186,15 @@ public class Session extends Invoker
synchronized (processedLock)
{
RangeSet newProcessed = new RangeSet();
- OUTER: for (Range r : processed)
+ for (Range pr : processed)
{
for (Range kr : kc)
{
- if (kr.includes(r))
+ for (Range r : pr.subtract(kr))
{
- continue OUTER;
+ newProcessed.add(r);
}
}
- newProcessed.add(r);
}
this.processed = newProcessed;
}