From 5871150cac2a3a93dbcd9f9721771d6128c6d3ae Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Wed, 4 Jun 2008 14:30:15 +0000 Subject: QPID-1120: Changed addDeliveredMessage and commit so session.completed is sent before credits dry up git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@663124 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 2 +- .../org/apache/qpid/client/AMQSession_0_10.java | 65 +++++++++++++++++----- 2 files changed, 52 insertions(+), 15 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 4ade83e013..7f95edf60f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1919,7 +1919,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - private void checkTransacted() throws JMSException + protected void checkTransacted() throws JMSException { if (!getTransacted()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 0753ee539a..d802cacf79 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; import javax.jms.*; import javax.jms.IllegalStateException; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.HashMap; import java.util.UUID; import java.util.Map; @@ -73,6 +72,12 @@ public class AMQSession_0_10 extends AMQSession // a ref on the qpidity connection protected org.apache.qpidity.nclient.Connection _qpidConnection; + + /** + * USed to store the range of in tx messages + */ + private RangeSet _txRangeSet = new RangeSet(); + private int _txSize = 0; //--- constructors /** @@ -276,19 +281,9 @@ public class AMQSession_0_10 extends AMQSession { _dispatcher.rollback(); } - - RangeSet ranges = new RangeSet(); - while (true) - { - Long tag = _deliveredMessageTags.poll(); - if (tag == null) - { - break; - } - - ranges.add((int) (long) tag); - } - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); + _txRangeSet.clear(); + _txSize = 0; } /** @@ -743,4 +738,46 @@ public class AMQSession_0_10 extends AMQSession return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); } + + /** + * Store non committed messages for this session + * With 0.10 messages are consumed with window mode, we must send a completion + * before the window size is reached so credits don't dry up. + * @param id + */ + @Override protected void addDeliveredMessage(long id) + { + _txRangeSet.add((int) id); + _txSize++; + // this is a heuristic, we may want to have that configurable + if( _txSize > _connection.getMaxPrefetch() / 2 ) + { + // send completed so consumer credits don't dry up + getQpidSession().messageAcknowledge(_txRangeSet, false); + _txSize = 0; + } + } + + @Override public void commit() throws JMSException + { + checkTransacted(); + try + { + if( _txSize > 0 ) + { + getQpidSession().messageAcknowledge(_txRangeSet, true); + _txRangeSet.clear(); + _txSize = 0; + } + sendCommit(); + } + catch (AMQException e) + { + throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); + } + } } -- cgit v1.2.1