summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/org/apache/qpid/client/AMQSession.java47
-rw-r--r--java/client/src/org/apache/qpid/client/BasicMessageConsumer.java53
-rw-r--r--java/client/src/org/apache/qpid/jms/Session.java7
3 files changed, 86 insertions, 21 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java
index 54ffc979af..8b2c2ec04d 100644
--- a/java/client/src/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/org/apache/qpid/client/AMQSession.java
@@ -35,12 +35,12 @@ import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.text.MessageFormat;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -720,18 +720,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- return createConsumer(destination, _defaultPrefetchHighMark, false, false, null);
+ return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- return createConsumer(destination, _defaultPrefetchHighMark, false, false, messageSelector);
+ return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
- return createConsumer(destination, _defaultPrefetchHighMark, noLocal, false, messageSelector);
+ return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination,
@@ -740,7 +740,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean exclusive,
String selector) throws JMSException
{
- return createConsumer(destination, prefetch, noLocal, exclusive, selector, null);
+ return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ }
+
+
+ public MessageConsumer createConsumer(Destination destination,
+ int prefetchHigh,
+ int prefetchLow,
+ boolean noLocal,
+ boolean exclusive,
+ String selector) throws JMSException
+ {
+ return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
public MessageConsumer createConsumer(Destination destination,
@@ -750,12 +761,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector,
FieldTable rawSelector) throws JMSException
{
- return createConsumerImpl(destination, prefetch, noLocal, exclusive,
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
+ selector, rawSelector);
+ }
+
+ public MessageConsumer createConsumer(Destination destination,
+ int prefetchHigh,
+ int prefetchLow,
+ boolean noLocal,
+ boolean exclusive,
+ String selector,
+ FieldTable rawSelector) throws JMSException
+ {
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
selector, rawSelector);
}
protected MessageConsumer createConsumerImpl(final Destination destination,
- final int prefetch,
+ final int prefetchHigh,
+ final int prefetchLow,
final boolean noLocal,
final boolean exclusive,
final String selector,
@@ -780,7 +804,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
- protocolHandler, ft, prefetch, exclusive,
+ protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
_acknowledgeMode);
try
@@ -862,9 +886,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param queueName
* @return the consumer tag generated by the broker
*/
- private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetch,
+ private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
{
+ //fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
@@ -1118,8 +1143,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetch(), consumer.isNoLocal(),
- consumer.isExclusive(), consumer.getAcknowledgeMode());
+ String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
+ consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode());
consumer.setConsumerTag(consumerTag);
_consumers.put(consumerTag, consumer);
diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
index b46c5f111d..a6f89fd221 100644
--- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
@@ -91,9 +91,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private FieldTable _rawSelectorFieldTable;
/**
- * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover
*/
- private int _prefetch;
+ private int _prefetchHigh;
+
+ /**
+ * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ */
+ private int _prefetchLow;
/**
* We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
@@ -118,10 +123,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private long _lastDeliveryTag;
+ /**
+ * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode.
+ * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+ */
+ private boolean _dups_ok_acknowledge_send;
+
BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetch,
- boolean exclusive, int acknowledgeMode)
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+ int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
{
_channelId = channelId;
_connection = connection;
@@ -132,7 +143,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_session = session;
_protocolHandler = protocolHandler;
_rawSelectorFieldTable = rawSelectorFieldTable;
- _prefetch = prefetch;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
_exclusive = exclusive;
_acknowledgeMode = acknowledgeMode;
}
@@ -232,7 +244,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public int getPrefetch()
{
- return _prefetch;
+ return _prefetchHigh;
+ }
+
+ public int getPrefetchHigh()
+ {
+ return _prefetchHigh;
+ }
+
+ public int getPrefetchLow()
+ {
+ return _prefetchLow;
}
public boolean isNoLocal()
@@ -309,10 +331,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* We can get back either a Message or an exception from the queue. This method examines the argument and deals
* with it by throwing it (if an exception) or returning it (in any other case).
+ *
* @param o
* @return a message only if o is a Message
* @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not
- * a JMSException is created with the linked exception set appropriately
+ * a JMSException is created with the linked exception set appropriately
*/
private AbstractJMSMessage returnMessageOrThrow(Object o)
throws JMSException
@@ -335,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
@@ -370,8 +393,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
* of a message listener or a synchronous receive() caller.
+ *
* @param messageFrame the raw unprocessed mesage
- * @param channelId channel on which this message was sent
+ * @param channelId channel on which this message was sent
*/
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
{
@@ -435,7 +459,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetch)
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+
+ if (_dups_ok_acknowledge_send)
{
_session.acknowledgeMessage(msg.getDeliveryTag(), true);
}
diff --git a/java/client/src/org/apache/qpid/jms/Session.java b/java/client/src/org/apache/qpid/jms/Session.java
index d369c08aa1..1440ace2b6 100644
--- a/java/client/src/org/apache/qpid/jms/Session.java
+++ b/java/client/src/org/apache/qpid/jms/Session.java
@@ -42,6 +42,13 @@ public interface Session extends javax.jms.Session
boolean exclusive,
String selector) throws JMSException;
+ MessageConsumer createConsumer(Destination destination,
+ int prefetchHigh,
+ int prefetchLow,
+ boolean noLocal,
+ boolean exclusive,
+ String selector) throws JMSException;
+
/**
* @return the prefetch value used by default for consumers created on this session.
*/