diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-02-19 15:09:26 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-02-19 15:09:26 +0000 |
commit | 3452ccc779803371e6ba04a1a3b9bec0d973d11d (patch) | |
tree | 2e386ff54b83830477a70e694e8f5c914da13316 | |
parent | f824b8b88b8065716006380306f0a6274ec85f2e (diff) | |
download | qpid-python-3452ccc779803371e6ba04a1a3b9bec0d973d11d.tar.gz |
QPID-1665: add a timer to ensure message acking is never delayed more than 1000 ms by default, this is configurable by qpid.session.max_ack_delay
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@745892 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 72 insertions, 16 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 45b74f317e..8ab8110dd4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -52,9 +52,12 @@ import static org.apache.qpid.transport.Option.*; import javax.jms.*; import javax.jms.IllegalStateException; +import java.util.Date; import java.util.HashMap; import java.util.UUID; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; /** * This is a 0.10 Session @@ -68,6 +71,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); + private static Timer timer = new Timer("ack-flusher", true); + /** * The underlying QpidSession @@ -83,6 +88,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // a ref on the qpid connection protected org.apache.qpid.transport.Connection _qpidConnection; + private TimerTask flushTask = new TimerTask() + { + public void run() + { + try + { + flushAcknowledgments(); + } + catch (Throwable t) + { + _logger.error("error flushing acks", t); + } + } + }; private RangeSet unacked = new RangeSet(); private int unackedCount = 0; @@ -119,6 +138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { _qpidSession.txSelect(); } + timer.schedule(flushTask, new Date(), Long.getLong("qpid.session.max_ack_delay", 1000)); } /** @@ -142,14 +162,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private void addUnacked(int id) { - unacked.add(id); - unackedCount++; + synchronized (unacked) + { + unacked.add(id); + unackedCount++; + } } private void clearUnacked() { - unacked.clear(); - unackedCount = 0; + synchronized (unacked) + { + unacked.clear(); + unackedCount = 0; + } } //------- overwritten methods of class AMQSession @@ -196,19 +222,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic long prefetch = getAMQConnection().getMaxPrefetch(); - if (unackedCount >= prefetch/2 || _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) + if (unackedCount >= prefetch/2) { flushAcknowledgments(); - } + } } void flushAcknowledgments() { - if (unackedCount > 0) + synchronized (unacked) { - messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - clearUnacked(); + if (unackedCount > 0) + { + messageAcknowledge + (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + clearUnacked(); + } } } @@ -222,7 +251,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic ssn.flushProcessed(accept ? BATCH : NONE); if (accept) { - ssn.messageAccept(ranges); + ssn.messageAccept(ranges, UNRELIABLE); } } @@ -267,6 +296,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public void sendClose(long timeout) throws AMQException, FailoverException { + flushTask.cancel(); flushAcknowledgments(); getQpidSession().sync(); getQpidSession().close(); @@ -692,7 +722,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic String binddingKey = ""; for(AMQShortString key : amqd.getBindingKeys()) { - binddingKey = binddingKey + "_" + key.toString(); + binddingKey = binddingKey + "_" + key.toString(); } amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); @@ -722,7 +752,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException @@ -800,14 +830,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Store non committed messages for this session * With 0.10 messages are consumed with window mode, we must send a completion - * before the window size is reached so credits don't dry up. + * before the window size is reached so credits don't dry up. * @param id */ @Override protected void addDeliveredMessage(long id) { _txRangeSet.add((int) id); _txSize++; - // this is a heuristic, we may want to have that configurable + // this is a heuristic, we may want to have that configurable if (_connection.getMaxPrefetch() == 1 || _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0) { diff --git a/qpid/java/common/Composite.tpl b/qpid/java/common/Composite.tpl index 17cf846d8c..c46d0a12cc 100644 --- a/qpid/java/common/Composite.tpl +++ b/qpid/java/common/Composite.tpl @@ -145,6 +145,7 @@ if options or base == "Method": if base == "Method": out(""" case SYNC: this.setSync(true); break; case BATCH: this.setBatch(true); break; + case UNRELIABLE: this.setUnreliable(true); break; """) out(""" case NONE: break; default: throw new IllegalArgumentException("invalid option: " + _options[i]); diff --git a/qpid/java/common/Option.tpl b/qpid/java/common/Option.tpl index 776b211ad5..c22b35b999 100644 --- a/qpid/java/common/Option.tpl +++ b/qpid/java/common/Option.tpl @@ -37,5 +37,6 @@ for c in composites: options[option] = None out(" $option,\n")} BATCH, + UNRELIABLE, NONE } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java index 09cfd119be..611c742fb1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -48,6 +48,7 @@ public abstract class Method extends Struct implements ProtocolEvent private boolean idSet = false; private boolean sync = false; private boolean batch = false; + private boolean unreliable = false; public final int getId() { @@ -90,6 +91,16 @@ public abstract class Method extends Struct implements ProtocolEvent this.batch = value; } + public final boolean isUnreliable() + { + return unreliable; + } + + final void setUnreliable(boolean value) + { + this.unreliable = value; + } + public abstract boolean hasPayload(); public Header getHeader() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index f94edcc655..4079097f96 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -384,7 +384,15 @@ public class Session extends SessionInvoker { copy = processed.copy(); } - sessionCompleted(copy, options); + + synchronized (commands) + { + if (state == DETACHED) + { + return; + } + sessionCompleted(copy, options); + } } void knownComplete(RangeSet kc) @@ -484,6 +492,11 @@ public class Session extends SessionInvoker synchronized (commands) { + if (state == DETACHED && m.isUnreliable()) + { + return; + } + if (state != OPEN && state != CLOSED) { Waiter w = new Waiter(commands, timeout); |