diff options
author | Robert Gemmell <robbie@apache.org> | 2011-10-30 18:43:01 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-10-30 18:43:01 +0000 |
commit | 4ffd5cd3936a5c31b7009acead178bc368170912 (patch) | |
tree | 487dd21d44904aace436088504f0b4ec3ff3c816 | |
parent | 7e185ac8cc310718ac8f492df1d4c787cce95d3d (diff) | |
download | qpid-python-4ffd5cd3936a5c31b7009acead178bc368170912.tar.gz |
QPID-3562: move sending completions if necessary into postDeliver() so that prefetch=1 has the expected impact for asynchronous transacted consumers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1195213 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 14 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 25 |
2 files changed, 28 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 2869e96a87..826ca46cca 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -980,17 +980,23 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Store non committed messages for this session - * With 0.10 messages are consumed with window mode, we must send a completion - * before the window size is reached so credits don't dry up. * @param id */ @Override protected void addDeliveredMessage(long id) { _txRangeSet.add((int) id); _txSize++; + } + + /** + * With 0.10 messages are consumed with window mode, we must send a completion + * before the window size is reached so credits don't dry up. + */ + protected void sendTxCompletionsIfNecessary() + { // this is a heuristic, we may want to have that configurable - if (_connection.getMaxPrefetch() == 1 || - _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0) + if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 || + _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)) { // send completed so consumer credits don't dry up messageAcknowledge(_txRangeSet, false); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 548e274571..7c8ccf4cf9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -31,6 +31,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.filter.JMSSelectorFilter; +import org.apache.qpid.jms.Session; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -447,16 +448,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM void postDeliver(AbstractJMSMessage msg) { super.postDeliver(msg); - if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) + + switch (_acknowledgeMode) { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + case Session.SESSION_TRANSACTED: + _0_10session.sendTxCompletionsIfNecessary(); + break; + case Session.NO_ACKNOWLEDGE: + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + break; + case Session.AUTO_ACKNOWLEDGE: + if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck()) + { + ((AMQSession_0_10) getSession()).getQpidSession().sync(); + } + break; } - if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE && - !_session.isInRecovery() && _session.getAMQConnection().getSyncAck()) - { - ((AMQSession_0_10) getSession()).getQpidSession().sync(); - } } Message receiveBrowse() throws JMSException |