diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2008-06-03 07:01:54 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-06-03 07:01:54 +0000 |
commit | cccdaa4e8af53d73a7303d07569ed86953441cce (patch) | |
tree | c4f54fea961abbbd8a7f4bdb0741216db4076a20 | |
parent | 0cfdeb1423ca9fa7b3c2e39ac9231ecc0bc9db77 (diff) | |
download | qpid-python-cccdaa4e8af53d73a7303d07569ed86953441cce.tar.gz |
QPID-1112: Added sessionCompleted support and changed onMessage for invoking sessionCompleted when all expected messages have been received.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@662665 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 42 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 1741903bb8..c4245d4fc8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -38,7 +38,6 @@ import javax.jms.MessageListener; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.SortedSet; import java.util.ArrayList; import java.util.Collections; @@ -116,7 +115,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our * implementation. */ - private final int _acknowledgeMode; + protected final int _acknowledgeMode; /** * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index f050cbe455..c47aee0410 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -77,6 +77,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ private final AtomicBoolean _syncReceive = new AtomicBoolean(false); + /** + * Used for no-ack mode so to send session completion command + */ + private int _numberReceivedMessages = 0; + private int _firstMessageToComplete; + //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -115,7 +121,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By * message listener or to the sync consumer queue. * * @param jmsMessage this message has already been processed so can't redo preDeliver - * @param channelId */ @Override public void notifyMessage(AbstractJMSMessage jmsMessage) { @@ -160,6 +165,25 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ public void onMessage(Message message) { + /** + * For no-ack mode + */ + if( _acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE ) + { + _numberReceivedMessages++; + if(_numberReceivedMessages == 1) + { + _firstMessageToComplete = message.getMessageTransferId(); + } + if(_numberReceivedMessages >= getSession().getAMQConnection().getMaxPrefetch() ) + { + RangeSet r = new RangeSet(); + r.add(_firstMessageToComplete, message.getMessageTransferId()); + _0_10session.getQpidSession().sessionCompleted(r, Option.TIMELY_REPLY); + _numberReceivedMessages = 0; + } + } + int channelId = getSession().getChannelId(); long deliveryId = message.getMessageTransferId(); AMQShortString consumerTag = getConsumerTag(); diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index 65b3685f86..28218e01d6 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -65,6 +65,16 @@ public interface Session public void sessionDetach(byte[] name); + /** + * This control is sent by the receiver of commands, and handled by the sender + * of commands. It informs the sender of all commands completed by the receiver. + * This excludes commands known by the receiver to be considered complete at the sender. + * + * @param commands completed commands. + * @param options {@link Option#TIMELY_REPLY} If set, the sender is no longer free to delay the known-completed reply. + */ + public void sessionCompleted(RangeSet commands, Option... options); + public void sessionRequestTimeout(long expiry); public byte[] getName(); @@ -103,6 +113,7 @@ public interface Session public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException; + /** * <p>This transfer streams a complete message using a single method. * It uses pull-semantics instead of doing a push.</p> diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index f1701a6b38..58ffffb12b 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -105,6 +105,11 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen _currentDataSizeNotSynced = 0; } + public void sessionCompleted(RangeSet commands, Option ... options) + { + super.sessionCompleted(commands, options); + } + /* ------------------------- * Data methods * ------------------------*/ |