summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java26
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Session.java11
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java5
4 files changed, 42 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 1741903bb8..c4245d4fc8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -38,7 +38,6 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.SortedSet;
import java.util.ArrayList;
import java.util.Collections;
@@ -116,7 +115,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
* consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
* implementation.
*/
- private final int _acknowledgeMode;
+ protected final int _acknowledgeMode;
/**
* Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index f050cbe455..c47aee0410 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -77,6 +77,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+ /**
+ * Used for no-ack mode so to send session completion command
+ */
+ private int _numberReceivedMessages = 0;
+ private int _firstMessageToComplete;
+
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -115,7 +121,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* message listener or to the sync consumer queue.
*
* @param jmsMessage this message has already been processed so can't redo preDeliver
- * @param channelId
*/
@Override public void notifyMessage(AbstractJMSMessage jmsMessage)
{
@@ -160,6 +165,25 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
public void onMessage(Message message)
{
+ /**
+ * For no-ack mode
+ */
+ if( _acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE )
+ {
+ _numberReceivedMessages++;
+ if(_numberReceivedMessages == 1)
+ {
+ _firstMessageToComplete = message.getMessageTransferId();
+ }
+ if(_numberReceivedMessages >= getSession().getAMQConnection().getMaxPrefetch() )
+ {
+ RangeSet r = new RangeSet();
+ r.add(_firstMessageToComplete, message.getMessageTransferId());
+ _0_10session.getQpidSession().sessionCompleted(r, Option.TIMELY_REPLY);
+ _numberReceivedMessages = 0;
+ }
+ }
+
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
AMQShortString consumerTag = getConsumerTag();
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
index 65b3685f86..28218e01d6 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
@@ -65,6 +65,16 @@ public interface Session
public void sessionDetach(byte[] name);
+ /**
+ * This control is sent by the receiver of commands, and handled by the sender
+ * of commands. It informs the sender of all commands completed by the receiver.
+ * This excludes commands known by the receiver to be considered complete at the sender.
+ *
+ * @param commands completed commands.
+ * @param options {@link Option#TIMELY_REPLY} If set, the sender is no longer free to delay the known-completed reply.
+ */
+ public void sessionCompleted(RangeSet commands, Option... options);
+
public void sessionRequestTimeout(long expiry);
public byte[] getName();
@@ -103,6 +113,7 @@ public interface Session
public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)
throws IOException;
+
/**
* <p>This transfer streams a complete message using a single method.
* It uses pull-semantics instead of doing a push.</p>
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index f1701a6b38..58ffffb12b 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -105,6 +105,11 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
_currentDataSizeNotSynced = 0;
}
+ public void sessionCompleted(RangeSet commands, Option ... options)
+ {
+ super.sessionCompleted(commands, options);
+ }
+
/* -------------------------
* Data methods
* ------------------------*/