summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-30 18:43:01 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-30 18:43:01 +0000
commit4ffd5cd3936a5c31b7009acead178bc368170912 (patch)
tree487dd21d44904aace436088504f0b4ec3ff3c816
parent7e185ac8cc310718ac8f492df1d4c787cce95d3d (diff)
downloadqpid-python-4ffd5cd3936a5c31b7009acead178bc368170912.tar.gz
QPID-3562: move sending completions if necessary into postDeliver() so that prefetch=1 has the expected impact for asynchronous transacted consumers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1195213 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java25
2 files changed, 28 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 2869e96a87..826ca46cca 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -980,17 +980,23 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Store non committed messages for this session
- * With 0.10 messages are consumed with window mode, we must send a completion
- * before the window size is reached so credits don't dry up.
* @param id
*/
@Override protected void addDeliveredMessage(long id)
{
_txRangeSet.add((int) id);
_txSize++;
+ }
+
+ /**
+ * With 0.10 messages are consumed with window mode, we must send a completion
+ * before the window size is reached so credits don't dry up.
+ */
+ protected void sendTxCompletionsIfNecessary()
+ {
// this is a heuristic, we may want to have that configurable
- if (_connection.getMaxPrefetch() == 1 ||
- _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
+ if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 ||
+ _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0))
{
// send completed so consumer credits don't dry up
messageAcknowledge(_txRangeSet, false);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 548e274571..7c8ccf4cf9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -31,6 +31,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.jms.Session;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -447,16 +448,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
void postDeliver(AbstractJMSMessage msg)
{
super.postDeliver(msg);
- if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+
+ switch (_acknowledgeMode)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ case Session.SESSION_TRANSACTED:
+ _0_10session.sendTxCompletionsIfNecessary();
+ break;
+ case Session.NO_ACKNOWLEDGE:
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ break;
+ case Session.AUTO_ACKNOWLEDGE:
+ if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ }
+ break;
}
- if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
- {
- ((AMQSession_0_10) getSession()).getQpidSession().sync();
- }
}
Message receiveBrowse() throws JMSException