diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-10-17 15:57:18 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-10-17 15:57:18 +0000 |
commit | b8cb6602b076df6f9bacc91cc4396ab90e26b9ca (patch) | |
tree | 648c6c607bffbd7299339b10d1903c325ebefb9a | |
parent | f6ed9e1b6a770caa5889f670cba0c74ab1c82357 (diff) | |
download | qpid-python-b8cb6602b076df6f9bacc91cc4396ab90e26b9ca.tar.gz |
Implemented Client side high/low water mark prefetching for NO_ACK.
Use of single prefetch should be unaffected. Setting the high and low to be the same.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464950 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 198 insertions, 108 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQConnection.java b/java/client/src/org/apache/qpid/client/AMQConnection.java index f8bea185d2..1f72484993 100644 --- a/java/client/src/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/org/apache/qpid/client/AMQConnection.java @@ -17,34 +17,44 @@ */ package org.apache.qpid.client; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.failover.FailoverSupport; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.framing.*; -import org.apache.qpid.jms.*; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.TxSelectBody; +import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Connection; - -import org.apache.log4j.Logger; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.url.URLSyntaxException; import javax.jms.*; -import javax.jms.Queue; -import javax.jms.Session; -import javax.naming.Reference; import javax.naming.NamingException; -import javax.naming.StringRefAddr; +import javax.naming.Reference; import javax.naming.Referenceable; +import javax.naming.StringRefAddr; import java.io.IOException; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { @@ -129,8 +139,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + - virtualHost + "?brokerlist='" + broker + "'")); + username + ":" + password + "@" + clientName + + virtualHost + "?brokerlist='" + broker + "'")); } public AMQConnection(String host, int port, String username, String password, @@ -143,14 +153,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(useSSL ? - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='true'" : - ConnectionURL.AMQ_PROTOCOL + "://" + - username + ":" + password + "@" + clientName + - virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" - + "," + ConnectionURL.OPTIONS_SSL + "='false'" + ConnectionURL.AMQ_PROTOCOL + "://" + + username + ":" + password + "@" + clientName + + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + + "," + ConnectionURL.OPTIONS_SSL + "='true'" : + ConnectionURL.AMQ_PROTOCOL + "://" + + username + ":" + password + "@" + clientName + + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + + "," + ConnectionURL.OPTIONS_SSL + "='false'" )); } @@ -369,12 +379,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { - return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH); + return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) throws JMSException { + return createSession(transacted, acknowledgeMode, prefetch, prefetch); + } + + public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, + final int prefetchHigh, final int prefetchLow) throws JMSException + { checkNotClosed(); if (channelLimitReached()) { @@ -397,14 +413,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // open it, so that there is no window where we could receive data on the channel and not be set // up to handle it appropriately. AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, - prefetch); + prefetchHigh, prefetchLow); _protocolHandler.addSessionByChannel(channelId, session); registerSession(channelId, session); boolean success = false; try { - createChannelOverWire(channelId, prefetch, transacted); + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); success = true; } catch (AMQException e) @@ -432,13 +448,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private void createChannelOverWire(int channelId, int prefetch, boolean transacted) + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { _protocolHandler.syncWrite( ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class); + + //todo send low water mark when protocol allows. _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, 0, prefetch, false), + BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false), BasicQosOkBody.class); if (transacted) @@ -451,11 +469,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private void reopenChannel(int channelId, int prefetch, boolean transacted) throws AMQException + private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { try { - createChannelOverWire(channelId, prefetch, transacted); + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); } catch (AMQException e) { @@ -559,7 +577,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { - synchronized (getFailoverMutex()) + synchronized(getFailoverMutex()) { if (!_closed.getAndSet(true)) { @@ -897,7 +915,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { AMQSession s = (AMQSession) it.next(); _protocolHandler.addSessionByChannel(s.getChannelId(), s); - reopenChannel(s.getChannelId(), s.getDefaultPrefetch(), s.getTransacted()); + reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); } } diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 1f15c24cb2..54ffc979af 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -20,11 +20,11 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; +import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; @@ -46,7 +46,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { private static final Logger _logger = Logger.getLogger(AMQSession.class); - public static final int DEFAULT_PREFETCH = 5000; + public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; + public static final int DEFAULT_PREFETCH_LOW_MARK = 2500; private AMQConnection _connection; @@ -56,7 +57,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _channelId; - private int _defaultPrefetch = DEFAULT_PREFETCH; + private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; /** * Used in the consume method. We generate the consume tag on the client so that we can use the nowait @@ -98,7 +100,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * The counter of the next producer id. This id is generated by the session and used only to allow the * producer to identify itself to the session when deregistering itself. - * + * <p/> * Access to this id does not require to be synchronized since according to the JMS specification only one * thread of control is allowed to create producers for any given session instance. */ @@ -125,12 +127,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _stopped.set(false); try { - while (!_stopped.get() && (message = (UnprocessedMessage)_queue.take()) != null) + while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) { dispatchMessage(message); } } - catch(InterruptedException e) + catch (InterruptedException e) { ; } @@ -201,12 +203,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) { - this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH); + this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK); } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch) { + this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch); + } + + AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) + { _connection = con; _transacted = transacted; if (transacted) @@ -219,34 +227,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; - _defaultPrefetch = defaultPrefetch; + _defaultPrefetchHighMark = defaultPrefetchHighMark; + _defaultPrefetchLowMark = defaultPrefetchLowMark; + if (_acknowledgeMode == NO_ACKNOWLEDGE) { - _queue = new FlowControllingBlockingQueue(_defaultPrefetch, - new FlowControllingBlockingQueue.ThresholdListener() - { - public void aboveThreshold(int currentValue) - { - if(_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Above threshold so suspending channel. Current value is " + currentValue); - suspendChannel(); - } - } - - public void underThreshold(int currentValue) - { - if(_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Below threshold so unsuspending channel. Current value is " + currentValue); - unsuspendChannel(); - } - } - }); + _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue.ThresholdListener() + { + public void aboveThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); + suspendChannel(); + } + } + + public void underThreshold(int currentValue) + { + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { + _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); + unsuspendChannel(); + } + } + }); } else { - _queue = new FlowControllingBlockingQueue(_defaultPrefetch,null); + _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null); } } @@ -260,6 +270,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch); } + AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) + { + this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); + } + AMQConnection getAMQConnection() { return _connection; @@ -267,7 +282,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public BytesMessage createBytesMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -283,7 +298,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MapMessage createMapMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -299,7 +314,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public javax.jms.Message createMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -315,7 +330,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -331,7 +346,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public ObjectMessage createObjectMessage(Serializable object) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -355,7 +370,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage() throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); @@ -372,7 +387,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TextMessage createTextMessage(String text) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { checkNotClosed(); try @@ -388,11 +403,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public boolean getTransacted() throws JMSException - { - checkNotClosed(); - return _transacted; - } + public boolean getTransacted() throws JMSException + { + checkNotClosed(); + return _transacted; + } public int getAcknowledgeMode() throws JMSException { @@ -407,7 +422,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // Acknowledge up to message last delivered (if any) for each consumer. //need to send ack for messages delivered to consumers so far - for(Iterator i = _consumers.values().iterator(); i.hasNext();) + for (Iterator i = _consumers.values().iterator(); i.hasNext();) { //Sends acknowledgement to server ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered(); @@ -434,7 +449,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (AMQException e) { - throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); + throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); } } @@ -442,7 +457,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { _closed.set(true); @@ -472,6 +487,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Close all producers or consumers. This is called either in the error case or when closing the session normally. + * * @param amqe the exception, may be null to indicate no error has occurred */ private void closeProducersAndConsumers(AMQException amqe) @@ -497,11 +513,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Called when the server initiates the closure of the session * unilaterally. + * * @param e the exception that caused this session to be closed. Null causes the */ public void closed(Throwable e) { - synchronized (_connection.getFailoverMutex()) + synchronized(_connection.getFailoverMutex()) { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request @@ -523,7 +540,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after * failover when the client has veoted resubscription. - * + * <p/> * The caller of this method must already hold the failover mutex. */ void markClosed() @@ -575,7 +592,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. - * @param error not null if this is a result of an error occurring at the connection level + * + * @param error not null if this is a result of an error occurring at the connection level */ private void closeConsumers(Throwable error) throws JMSException { @@ -624,6 +642,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Asks the broker to resend all unacknowledged messages for the session. + * * @throws JMSException */ public void recover() throws JMSException @@ -692,27 +711,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); - return new BasicMessageProducer(_connection, (AMQDestination)destination, _transacted, _channelId, - AMQSession.this, _connection.getProtocolHandler(), - getNextProducerId(), immediate, mandatory, waitUntilSent); + return new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, + AMQSession.this, _connection.getProtocolHandler(), + getNextProducerId(), immediate, mandatory, waitUntilSent); } }.execute(_connection); } public MessageConsumer createConsumer(Destination destination) throws JMSException { - return createConsumer(destination, _defaultPrefetch, false, false, null); + return createConsumer(destination, _defaultPrefetchHighMark, false, false, null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { - return createConsumer(destination, _defaultPrefetch, false, false, messageSelector); + return createConsumer(destination, _defaultPrefetchHighMark, false, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { - return createConsumer(destination, _defaultPrefetch, noLocal, false, messageSelector); + return createConsumer(destination, _defaultPrefetchHighMark, noLocal, false, messageSelector); } public MessageConsumer createConsumer(Destination destination, @@ -748,7 +767,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); - AMQDestination amqd = (AMQDestination)destination; + AMQDestination amqd = (AMQDestination) destination; final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler(); // TODO: construct the rawSelector from the selector string if rawSelector == null @@ -804,6 +823,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Declare the queue. + * * @param amqd * @param protocolHandler * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. @@ -814,7 +834,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // For queues (but not topics) we generate the name in the client rather than the // server. This allows the name to be reused on failover if required. In general, // the destination indicates whether it wants a name generated or not. - if(amqd.isNameRequired()) + if (amqd.isNameRequired()) { amqd.setQueueName(protocolHandler.generateQueueName()); } @@ -838,6 +858,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Register to consume from the queue. + * * @param queueName * @return the consumer tag generated by the broker */ @@ -864,9 +885,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - try{ + try + { return new AMQQueue(new AMQBindingURL(queueName)); - }catch(URLSyntaxException urlse) + } + catch (URLSyntaxException urlse) { JMSException jmse = new JMSException(urlse.getReason()); jmse.setLinkedException(urlse); @@ -893,13 +916,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Topic createTopic(String topicName) throws JMSException { - if (topicName.indexOf('/') == -1) + if (topicName.indexOf('/') == -1) { - return new AMQTopic(topicName); + return new AMQTopic(topicName); } else { - try{ + try + { return new AMQTopic(new AMQBindingURL(topicName)); } catch (URLSyntaxException urlse) @@ -1015,9 +1039,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is * AUTO_ACK or similar. + * * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the - * delivery tag + * @param multiple if true will acknowledge all messages up to and including the one specified by the + * delivery tag */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { @@ -1031,7 +1056,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public int getDefaultPrefetch() { - return _defaultPrefetch; + return _defaultPrefetchHighMark; + } + + public int getDefaultPrefetchHigh() + { + return _defaultPrefetchHighMark; + } + + public int getDefaultPrefetchLow() + { + return _defaultPrefetchLowMark; } public int getChannelId() @@ -1041,7 +1076,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void start() { - if(_dispatcher != null) + if (_dispatcher != null) { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); @@ -1056,7 +1091,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //stop the server delivering messages to this session suspendChannel(); - //stop the dispatcher thread +//stop the dispatcher thread _stopped.set(true); } @@ -1067,6 +1102,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Callers must hold the failover mutex before calling this method. + * * @param consumer * @throws AMQException */ @@ -1083,7 +1119,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetch(), consumer.isNoLocal(), - consumer.isExclusive(), consumer.getAcknowledgeMode()); + consumer.isExclusive(), consumer.getAcknowledgeMode()); consumer.setConsumerTag(consumerTag); _consumers.put(consumerTag, consumer); @@ -1092,6 +1128,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Called by the MessageConsumer when closing, to deregister the consumer from the * map from consumerTag to consumer instance. + * * @param consumerTag the consumer tag, that was broker-generated */ void deregisterConsumer(String consumerTag) @@ -1116,6 +1153,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Resubscribes all producers and consumers. This is called when performing failover. + * * @throws AMQException */ void resubscribe() throws AMQException diff --git a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 89e6968e44..e9ca7cb30c 100644 --- a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -28,7 +28,6 @@ import java.util.concurrent.LinkedBlockingQueue; * <p/> * This implementation is <b>only</b> safe where we have a single thread adding * items and a single (different) thread removing items. - * */ public class FlowControllingBlockingQueue { @@ -37,7 +36,8 @@ public class FlowControllingBlockingQueue */ private final BlockingQueue _queue = new LinkedBlockingQueue(); - private final int _flowControlThreshold; + private final int _flowControlHighThreshold; + private final int _flowControlLowThreshold; private final ThresholdListener _listener; @@ -56,7 +56,13 @@ public class FlowControllingBlockingQueue public FlowControllingBlockingQueue(int threshold, ThresholdListener listener) { - _flowControlThreshold = threshold; + this(threshold, threshold, listener); + } + + public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener) + { + _flowControlHighThreshold = highThreshold; + _flowControlLowThreshold = lowThreshold; _listener = listener; } @@ -67,7 +73,7 @@ public class FlowControllingBlockingQueue { synchronized(_listener) { - if (--_count == (_flowControlThreshold - 1)) + if (_count-- == _flowControlLowThreshold) { _listener.underThreshold(_count); } @@ -83,7 +89,7 @@ public class FlowControllingBlockingQueue { synchronized(_listener) { - if (++_count == _flowControlThreshold) + if (++_count == _flowControlHighThreshold) { _listener.aboveThreshold(_count); } diff --git a/java/client/src/org/apache/qpid/jms/Connection.java b/java/client/src/org/apache/qpid/jms/Connection.java index 22e09d0f93..b0375c8493 100644 --- a/java/client/src/org/apache/qpid/jms/Connection.java +++ b/java/client/src/org/apache/qpid/jms/Connection.java @@ -30,19 +30,37 @@ public interface Connection extends javax.jms.Connection /** * Get the connection listener that has been registered with this connection, if any + * * @return the listener or null if none has been set */ ConnectionListener getConnectionListener(); /** * Create a session specifying the prefetch limit of messages. + * * @param transacted * @param acknowledgeMode - * @param prefetch the maximum number of messages to buffer in the client. This - * applies as a total across all consumers + * @param prefetch the maximum number of messages to buffer in the client. This + * applies as a total across all consumers * @return * @throws JMSException */ org.apache.qpid.jms.Session createSession(boolean transacted, int acknowledgeMode, - int prefetch) throws JMSException; + int prefetch) throws JMSException; + + + /** + * Create a session specifying the prefetch limit of messages. + * + * @param transacted + * @param acknowledgeMode + * @param prefetchHigh the maximum number of messages to buffer in the client. + * This applies as a total across all consumers + * @param prefetchLow the number of messages that must be in the buffer in the client to renable message flow. + * This applies as a total across all consumers + * @return + * @throws JMSException + */ + org.apache.qpid.jms.Session createSession(boolean transacted, int acknowledgeMode, + int prefetchHigh, int prefetchLow) throws JMSException; } diff --git a/java/client/src/org/apache/qpid/jms/Session.java b/java/client/src/org/apache/qpid/jms/Session.java index 82a2311498..d369c08aa1 100644 --- a/java/client/src/org/apache/qpid/jms/Session.java +++ b/java/client/src/org/apache/qpid/jms/Session.java @@ -48,6 +48,16 @@ public interface Session extends javax.jms.Session int getDefaultPrefetch(); /** + * @return the High water prefetch value used by default for consumers created on this session. + */ + int getDefaultPrefetchHigh(); + + /** + * @return the Low water prefetch value used by default for consumers created on this session. + */ + int getDefaultPrefetchLow(); + + /** * Create a producer * @param destination * @param mandatory the value of the mandatory flag used by default on the producer diff --git a/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java b/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java index 3445b37317..fad1849fed 100644 --- a/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java +++ b/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java @@ -43,7 +43,7 @@ public class ChannelFlowTest implements MessageListener ChannelFlowTest(AMQConnection connection, AMQDestination destination) throws Exception { - AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50); + AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50,25); //set up a slow consumer session.createConsumer(destination).setMessageListener(this); |