diff options
Diffstat (limited to 'java/client/src')
3 files changed, 86 insertions, 21 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 54ffc979af..8b2c2ec04d 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -35,12 +35,12 @@ import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; import javax.jms.IllegalStateException; import java.io.Serializable; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.text.MessageFormat; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -720,18 +720,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { - return createConsumer(destination, _defaultPrefetchHighMark, false, false, null); + return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - return createConsumer(destination, _defaultPrefetchHighMark, false, false, messageSelector); + return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { - return createConsumer(destination, _defaultPrefetchHighMark, noLocal, false, messageSelector); + return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, @@ -740,7 +740,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean exclusive, String selector) throws JMSException { - return createConsumer(destination, prefetch, noLocal, exclusive, selector, null); + return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); + } + + + public MessageConsumer createConsumer(Destination destination, + int prefetchHigh, + int prefetchLow, + boolean noLocal, + boolean exclusive, + String selector) throws JMSException + { + return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } public MessageConsumer createConsumer(Destination destination, @@ -750,12 +761,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector, FieldTable rawSelector) throws JMSException { - return createConsumerImpl(destination, prefetch, noLocal, exclusive, + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, + selector, rawSelector); + } + + public MessageConsumer createConsumer(Destination destination, + int prefetchHigh, + int prefetchLow, + boolean noLocal, + boolean exclusive, + String selector, + FieldTable rawSelector) throws JMSException + { + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector); } protected MessageConsumer createConsumerImpl(final Destination destination, - final int prefetch, + final int prefetchHigh, + final int prefetchLow, final boolean noLocal, final boolean exclusive, final String selector, @@ -780,7 +804,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, _messageFactoryRegistry, AMQSession.this, - protocolHandler, ft, prefetch, exclusive, + protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode); try @@ -862,9 +886,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param queueName * @return the consumer tag generated by the broker */ - private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetch, + private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException { + //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag String tag = Integer.toString(_nextTag++); @@ -1118,8 +1143,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetch(), consumer.isNoLocal(), - consumer.isExclusive(), consumer.getAcknowledgeMode()); + String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), + consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode()); consumer.setConsumerTag(consumerTag); _consumers.put(consumerTag, consumer); diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java index b46c5f111d..a6f89fd221 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java @@ -91,9 +91,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private FieldTable _rawSelectorFieldTable; /** - * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover + * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover */ - private int _prefetch; + private int _prefetchHigh; + + /** + * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover + */ + private int _prefetchLow; /** * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover @@ -118,10 +123,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private long _lastDeliveryTag; + /** + * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. + * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow + */ + private boolean _dups_ok_acknowledge_send; + BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetch, - boolean exclusive, int acknowledgeMode) + AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, + int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode) { _channelId = channelId; _connection = connection; @@ -132,7 +143,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session = session; _protocolHandler = protocolHandler; _rawSelectorFieldTable = rawSelectorFieldTable; - _prefetch = prefetch; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; _exclusive = exclusive; _acknowledgeMode = acknowledgeMode; } @@ -232,7 +244,17 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public int getPrefetch() { - return _prefetch; + return _prefetchHigh; + } + + public int getPrefetchHigh() + { + return _prefetchHigh; + } + + public int getPrefetchLow() + { + return _prefetchLow; } public boolean isNoLocal() @@ -309,10 +331,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * We can get back either a Message or an exception from the queue. This method examines the argument and deals * with it by throwing it (if an exception) or returning it (in any other case). + * * @param o * @return a message only if o is a Message * @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not - * a JMSException is created with the linked exception set appropriately + * a JMSException is created with the linked exception set appropriately */ private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException @@ -335,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) { @@ -370,8 +393,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case * of a message listener or a synchronous receive() caller. + * * @param messageFrame the raw unprocessed mesage - * @param channelId channel on which this message was sent + * @param channelId channel on which this message was sent */ void notifyMessage(UnprocessedMessage messageFrame, int channelId) { @@ -435,7 +459,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { case Session.DUPS_OK_ACKNOWLEDGE: - if (++_outstanding >= _prefetch) + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } + + if (_dups_ok_acknowledge_send) { _session.acknowledgeMessage(msg.getDeliveryTag(), true); } diff --git a/java/client/src/org/apache/qpid/jms/Session.java b/java/client/src/org/apache/qpid/jms/Session.java index d369c08aa1..1440ace2b6 100644 --- a/java/client/src/org/apache/qpid/jms/Session.java +++ b/java/client/src/org/apache/qpid/jms/Session.java @@ -42,6 +42,13 @@ public interface Session extends javax.jms.Session boolean exclusive, String selector) throws JMSException; + MessageConsumer createConsumer(Destination destination, + int prefetchHigh, + int prefetchLow, + boolean noLocal, + boolean exclusive, + String selector) throws JMSException; + /** * @return the prefetch value used by default for consumers created on this session. */ |