diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 46 |
1 files changed, 32 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 7ce51c8918..0644bd88a8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -52,6 +52,8 @@ import static org.apache.qpid.transport.Option.*; import javax.jms.*; import javax.jms.IllegalStateException; +import java.lang.ref.WeakReference; + import java.util.Date; import java.util.HashMap; import java.util.UUID; @@ -72,6 +74,34 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); private static Timer timer = new Timer("ack-flusher", true); + private static class Flusher extends TimerTask + { + + private WeakReference<AMQSession_0_10> session; + public Flusher(AMQSession_0_10 session) + { + this.session = new WeakReference<AMQSession_0_10>(session); + } + + public void run() { + AMQSession_0_10 ssn = session.get(); + if (ssn == null) + { + cancel(); + } + else + { + try + { + ssn.flushAcknowledgments(true); + } + catch (Throwable t) + { + _logger.error("error flushing acks", t); + } + } + } + } /** @@ -129,20 +159,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (maxAckDelay > 0) { - flushTask = new TimerTask() - { - public void run() - { - try - { - flushAcknowledgments(true); - } - catch (Throwable t) - { - _logger.error("error flushing acks", t); - } - } - }; + flushTask = new Flusher(this); timer.schedule(flushTask, new Date(), maxAckDelay); } } @@ -319,6 +336,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (flushTask != null) { flushTask.cancel(); + flushTask = null; } flushAcknowledgments(); getQpidSession().sync(); |