summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-06-04 14:30:15 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-06-04 14:30:15 +0000
commit5871150cac2a3a93dbcd9f9721771d6128c6d3ae (patch)
tree72c072986addfa738291a629238d08ab83a9df5d /qpid/java
parentec51de87d3951474f7c7fc0399ac51b3c7310a2a (diff)
downloadqpid-python-5871150cac2a3a93dbcd9f9721771d6128c6d3ae.tar.gz
QPID-1120: Changed addDeliveredMessage and commit so session.completed is sent before credits dry up
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@663124 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java65
2 files changed, 52 insertions, 15 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 4ade83e013..7f95edf60f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1919,7 +1919,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- private void checkTransacted() throws JMSException
+ protected void checkTransacted() throws JMSException
{
if (!getTransacted())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 0753ee539a..d802cacf79 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
import javax.jms.*;
import javax.jms.IllegalStateException;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.HashMap;
import java.util.UUID;
import java.util.Map;
@@ -73,6 +72,12 @@ public class AMQSession_0_10 extends AMQSession
// a ref on the qpidity connection
protected org.apache.qpidity.nclient.Connection _qpidConnection;
+
+ /**
+ * USed to store the range of in tx messages
+ */
+ private RangeSet _txRangeSet = new RangeSet();
+ private int _txSize = 0;
//--- constructors
/**
@@ -276,19 +281,9 @@ public class AMQSession_0_10 extends AMQSession
{
_dispatcher.rollback();
}
-
- RangeSet ranges = new RangeSet();
- while (true)
- {
- Long tag = _deliveredMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- ranges.add((int) (long) tag);
- }
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
+ _txRangeSet.clear();
+ _txSize = 0;
}
/**
@@ -743,4 +738,46 @@ public class AMQSession_0_10 extends AMQSession
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
+
+ /**
+ * Store non committed messages for this session
+ * With 0.10 messages are consumed with window mode, we must send a completion
+ * before the window size is reached so credits don't dry up.
+ * @param id
+ */
+ @Override protected void addDeliveredMessage(long id)
+ {
+ _txRangeSet.add((int) id);
+ _txSize++;
+ // this is a heuristic, we may want to have that configurable
+ if( _txSize > _connection.getMaxPrefetch() / 2 )
+ {
+ // send completed so consumer credits don't dry up
+ getQpidSession().messageAcknowledge(_txRangeSet, false);
+ _txSize = 0;
+ }
+ }
+
+ @Override public void commit() throws JMSException
+ {
+ checkTransacted();
+ try
+ {
+ if( _txSize > 0 )
+ {
+ getQpidSession().messageAcknowledge(_txRangeSet, true);
+ _txRangeSet.clear();
+ _txSize = 0;
+ }
+ sendCommit();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ }
+ }
}