From 3452ccc779803371e6ba04a1a3b9bec0d973d11d Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Thu, 19 Feb 2009 15:09:26 +0000 Subject: QPID-1665: add a timer to ensure message acking is never delayed more than 1000 ms by default, this is configurable by qpid.session.max_ack_delay git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@745892 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_10.java | 60 ++++++++++++++++------ qpid/java/common/Composite.tpl | 1 + qpid/java/common/Option.tpl | 1 + .../java/org/apache/qpid/transport/Method.java | 11 ++++ .../java/org/apache/qpid/transport/Session.java | 15 +++++- 5 files changed, 72 insertions(+), 16 deletions(-) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 45b74f317e..8ab8110dd4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -52,9 +52,12 @@ import static org.apache.qpid.transport.Option.*; import javax.jms.*; import javax.jms.IllegalStateException; +import java.util.Date; import java.util.HashMap; import java.util.UUID; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; /** * This is a 0.10 Session @@ -68,6 +71,8 @@ public class AMQSession_0_10 extends AMQSession= prefetch/2 || _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) + if (unackedCount >= prefetch/2) { flushAcknowledgments(); - } + } } void flushAcknowledgments() { - if (unackedCount > 0) + synchronized (unacked) { - messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - clearUnacked(); + if (unackedCount > 0) + { + messageAcknowledge + (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + clearUnacked(); + } } } @@ -222,7 +251,7 @@ public class AMQSession_0_10 extends AMQSession