summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-10-17 15:57:18 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-10-17 15:57:18 +0000
commitb8cb6602b076df6f9bacc91cc4396ab90e26b9ca (patch)
tree648c6c607bffbd7299339b10d1903c325ebefb9a
parentf6ed9e1b6a770caa5889f670cba0c74ab1c82357 (diff)
downloadqpid-python-b8cb6602b076df6f9bacc91cc4396ab90e26b9ca.tar.gz
Implemented Client side high/low water mark prefetching for NO_ACK.
Use of single prefetch should be unaffected. Setting the high and low to be the same. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464950 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/org/apache/qpid/client/AMQConnection.java82
-rw-r--r--java/client/src/org/apache/qpid/client/AMQSession.java172
-rw-r--r--java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java16
-rw-r--r--java/client/src/org/apache/qpid/jms/Connection.java24
-rw-r--r--java/client/src/org/apache/qpid/jms/Session.java10
-rw-r--r--java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java2
6 files changed, 198 insertions, 108 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQConnection.java b/java/client/src/org/apache/qpid/client/AMQConnection.java
index f8bea185d2..1f72484993 100644
--- a/java/client/src/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/org/apache/qpid/client/AMQConnection.java
@@ -17,34 +17,44 @@
*/
package org.apache.qpid.client;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.jms.*;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Connection;
-
-import org.apache.log4j.Logger;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.Reference;
import javax.naming.NamingException;
-import javax.naming.StringRefAddr;
+import javax.naming.Reference;
import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -129,8 +139,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
- virtualHost + "?brokerlist='" + broker + "'"));
+ username + ":" + password + "@" + clientName +
+ virtualHost + "?brokerlist='" + broker + "'"));
}
public AMQConnection(String host, int port, String username, String password,
@@ -143,14 +153,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(useSSL ?
- ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
- virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
- + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
- ConnectionURL.AMQ_PROTOCOL + "://" +
- username + ":" + password + "@" + clientName +
- virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
- + "," + ConnectionURL.OPTIONS_SSL + "='false'"
+ ConnectionURL.AMQ_PROTOCOL + "://" +
+ username + ":" + password + "@" + clientName +
+ virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
+ ConnectionURL.AMQ_PROTOCOL + "://" +
+ username + ":" + password + "@" + clientName +
+ virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ + "," + ConnectionURL.OPTIONS_SSL + "='false'"
));
}
@@ -369,12 +379,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
- return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH);
+ return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetch) throws JMSException
{
+ return createSession(transacted, acknowledgeMode, prefetch, prefetch);
+ }
+
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+ final int prefetchHigh, final int prefetchLow) throws JMSException
+ {
checkNotClosed();
if (channelLimitReached())
{
@@ -397,14 +413,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// open it, so that there is no window where we could receive data on the channel and not be set
// up to handle it appropriately.
AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode,
- prefetch);
+ prefetchHigh, prefetchLow);
_protocolHandler.addSessionByChannel(channelId, session);
registerSession(channelId, session);
boolean success = false;
try
{
- createChannelOverWire(channelId, prefetch, transacted);
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
success = true;
}
catch (AMQException e)
@@ -432,13 +448,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- private void createChannelOverWire(int channelId, int prefetch, boolean transacted)
+ private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
_protocolHandler.syncWrite(
ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+
+ //todo send low water mark when protocol allows.
_protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId, 0, prefetch, false),
+ BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
BasicQosOkBody.class);
if (transacted)
@@ -451,11 +469,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- private void reopenChannel(int channelId, int prefetch, boolean transacted) throws AMQException
+ private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException
{
try
{
- createChannelOverWire(channelId, prefetch, transacted);
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
}
catch (AMQException e)
{
@@ -559,7 +577,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close() throws JMSException
{
- synchronized (getFailoverMutex())
+ synchronized(getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
@@ -897,7 +915,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
AMQSession s = (AMQSession) it.next();
_protocolHandler.addSessionByChannel(s.getChannelId(), s);
- reopenChannel(s.getChannelId(), s.getDefaultPrefetch(), s.getTransacted());
+ reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
}
}
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java
index 1f15c24cb2..54ffc979af 100644
--- a/java/client/src/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/org/apache/qpid/client/AMQSession.java
@@ -20,11 +20,11 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
@@ -46,7 +46,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
private static final Logger _logger = Logger.getLogger(AMQSession.class);
- public static final int DEFAULT_PREFETCH = 5000;
+ public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+ public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
private AMQConnection _connection;
@@ -56,7 +57,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _channelId;
- private int _defaultPrefetch = DEFAULT_PREFETCH;
+ private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+ private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
/**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -98,7 +100,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* The counter of the next producer id. This id is generated by the session and used only to allow the
* producer to identify itself to the session when deregistering itself.
- *
+ * <p/>
* Access to this id does not require to be synchronized since according to the JMS specification only one
* thread of control is allowed to create producers for any given session instance.
*/
@@ -125,12 +127,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_stopped.set(false);
try
{
- while (!_stopped.get() && (message = (UnprocessedMessage)_queue.take()) != null)
+ while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
dispatchMessage(message);
}
}
- catch(InterruptedException e)
+ catch (InterruptedException e)
{
;
}
@@ -201,12 +203,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
{
- this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH);
+ this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
{
+ this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
+ }
+
+ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ {
_connection = con;
_transacted = transacted;
if (transacted)
@@ -219,34 +227,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
- _defaultPrefetch = defaultPrefetch;
+ _defaultPrefetchHighMark = defaultPrefetchHighMark;
+ _defaultPrefetchLowMark = defaultPrefetchLowMark;
+
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetch,
- new FlowControllingBlockingQueue.ThresholdListener()
- {
- public void aboveThreshold(int currentValue)
- {
- if(_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.warn("Above threshold so suspending channel. Current value is " + currentValue);
- suspendChannel();
- }
- }
-
- public void underThreshold(int currentValue)
- {
- if(_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.warn("Below threshold so unsuspending channel. Current value is " + currentValue);
- unsuspendChannel();
- }
- }
- });
+ _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue.ThresholdListener()
+ {
+ public void aboveThreshold(int currentValue)
+ {
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
+ suspendChannel();
+ }
+ }
+
+ public void underThreshold(int currentValue)
+ {
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
+ unsuspendChannel();
+ }
+ }
+ });
}
else
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetch,null);
+ _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
}
}
@@ -260,6 +270,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
}
+ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+ {
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+ }
+
AMQConnection getAMQConnection()
{
return _connection;
@@ -267,7 +282,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -283,7 +298,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MapMessage createMapMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -299,7 +314,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public javax.jms.Message createMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -315,7 +330,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -331,7 +346,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -355,7 +370,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -372,7 +387,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -388,11 +403,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- public boolean getTransacted() throws JMSException
- {
- checkNotClosed();
- return _transacted;
- }
+ public boolean getTransacted() throws JMSException
+ {
+ checkNotClosed();
+ return _transacted;
+ }
public int getAcknowledgeMode() throws JMSException
{
@@ -407,7 +422,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// Acknowledge up to message last delivered (if any) for each consumer.
//need to send ack for messages delivered to consumers so far
- for(Iterator i = _consumers.values().iterator(); i.hasNext();)
+ for (Iterator i = _consumers.values().iterator(); i.hasNext();)
{
//Sends acknowledgement to server
((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
@@ -434,7 +449,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (AMQException e)
{
- throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
}
}
@@ -442,7 +457,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
_closed.set(true);
@@ -472,6 +487,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Close all producers or consumers. This is called either in the error case or when closing the session normally.
+ *
* @param amqe the exception, may be null to indicate no error has occurred
*/
private void closeProducersAndConsumers(AMQException amqe)
@@ -497,11 +513,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Called when the server initiates the closure of the session
* unilaterally.
+ *
* @param e the exception that caused this session to be closed. Null causes the
*/
public void closed(Throwable e)
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -523,7 +540,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
* failover when the client has veoted resubscription.
- *
+ * <p/>
* The caller of this method must already hold the failover mutex.
*/
void markClosed()
@@ -575,7 +592,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
- * @param error not null if this is a result of an error occurring at the connection level
+ *
+ * @param error not null if this is a result of an error occurring at the connection level
*/
private void closeConsumers(Throwable error) throws JMSException
{
@@ -624,6 +642,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Asks the broker to resend all unacknowledged messages for the session.
+ *
* @throws JMSException
*/
public void recover() throws JMSException
@@ -692,27 +711,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
- return new BasicMessageProducer(_connection, (AMQDestination)destination, _transacted, _channelId,
- AMQSession.this, _connection.getProtocolHandler(),
- getNextProducerId(), immediate, mandatory, waitUntilSent);
+ return new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
+ AMQSession.this, _connection.getProtocolHandler(),
+ getNextProducerId(), immediate, mandatory, waitUntilSent);
}
}.execute(_connection);
}
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- return createConsumer(destination, _defaultPrefetch, false, false, null);
+ return createConsumer(destination, _defaultPrefetchHighMark, false, false, null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- return createConsumer(destination, _defaultPrefetch, false, false, messageSelector);
+ return createConsumer(destination, _defaultPrefetchHighMark, false, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
- return createConsumer(destination, _defaultPrefetch, noLocal, false, messageSelector);
+ return createConsumer(destination, _defaultPrefetchHighMark, noLocal, false, messageSelector);
}
public MessageConsumer createConsumer(Destination destination,
@@ -748,7 +767,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
- AMQDestination amqd = (AMQDestination)destination;
+ AMQDestination amqd = (AMQDestination) destination;
final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
// TODO: construct the rawSelector from the selector string if rawSelector == null
@@ -804,6 +823,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Declare the queue.
+ *
* @param amqd
* @param protocolHandler
* @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
@@ -814,7 +834,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// For queues (but not topics) we generate the name in the client rather than the
// server. This allows the name to be reused on failover if required. In general,
// the destination indicates whether it wants a name generated or not.
- if(amqd.isNameRequired())
+ if (amqd.isNameRequired())
{
amqd.setQueueName(protocolHandler.generateQueueName());
}
@@ -838,6 +858,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Register to consume from the queue.
+ *
* @param queueName
* @return the consumer tag generated by the broker
*/
@@ -864,9 +885,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- try{
+ try
+ {
return new AMQQueue(new AMQBindingURL(queueName));
- }catch(URLSyntaxException urlse)
+ }
+ catch (URLSyntaxException urlse)
{
JMSException jmse = new JMSException(urlse.getReason());
jmse.setLinkedException(urlse);
@@ -893,13 +916,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public Topic createTopic(String topicName) throws JMSException
{
- if (topicName.indexOf('/') == -1)
+ if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(topicName);
+ return new AMQTopic(topicName);
}
else
{
- try{
+ try
+ {
return new AMQTopic(new AMQBindingURL(topicName));
}
catch (URLSyntaxException urlse)
@@ -1015,9 +1039,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
* a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
* AUTO_ACK or similar.
+ *
* @param deliveryTag the tag of the last message to be acknowledged
- * @param multiple if true will acknowledge all messages up to and including the one specified by the
- * delivery tag
+ * @param multiple if true will acknowledge all messages up to and including the one specified by the
+ * delivery tag
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
@@ -1031,7 +1056,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public int getDefaultPrefetch()
{
- return _defaultPrefetch;
+ return _defaultPrefetchHighMark;
+ }
+
+ public int getDefaultPrefetchHigh()
+ {
+ return _defaultPrefetchHighMark;
+ }
+
+ public int getDefaultPrefetchLow()
+ {
+ return _defaultPrefetchLowMark;
}
public int getChannelId()
@@ -1041,7 +1076,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void start()
{
- if(_dispatcher != null)
+ if (_dispatcher != null)
{
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
@@ -1056,7 +1091,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//stop the server delivering messages to this session
suspendChannel();
- //stop the dispatcher thread
+//stop the dispatcher thread
_stopped.set(true);
}
@@ -1067,6 +1102,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Callers must hold the failover mutex before calling this method.
+ *
* @param consumer
* @throws AMQException
*/
@@ -1083,7 +1119,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetch(), consumer.isNoLocal(),
- consumer.isExclusive(), consumer.getAcknowledgeMode());
+ consumer.isExclusive(), consumer.getAcknowledgeMode());
consumer.setConsumerTag(consumerTag);
_consumers.put(consumerTag, consumer);
@@ -1092,6 +1128,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Called by the MessageConsumer when closing, to deregister the consumer from the
* map from consumerTag to consumer instance.
+ *
* @param consumerTag the consumer tag, that was broker-generated
*/
void deregisterConsumer(String consumerTag)
@@ -1116,6 +1153,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Resubscribes all producers and consumers. This is called when performing failover.
+ *
* @throws AMQException
*/
void resubscribe() throws AMQException
diff --git a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 89e6968e44..e9ca7cb30c 100644
--- a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -28,7 +28,6 @@ import java.util.concurrent.LinkedBlockingQueue;
* <p/>
* This implementation is <b>only</b> safe where we have a single thread adding
* items and a single (different) thread removing items.
- *
*/
public class FlowControllingBlockingQueue
{
@@ -37,7 +36,8 @@ public class FlowControllingBlockingQueue
*/
private final BlockingQueue _queue = new LinkedBlockingQueue();
- private final int _flowControlThreshold;
+ private final int _flowControlHighThreshold;
+ private final int _flowControlLowThreshold;
private final ThresholdListener _listener;
@@ -56,7 +56,13 @@ public class FlowControllingBlockingQueue
public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
{
- _flowControlThreshold = threshold;
+ this(threshold, threshold, listener);
+ }
+
+ public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener)
+ {
+ _flowControlHighThreshold = highThreshold;
+ _flowControlLowThreshold = lowThreshold;
_listener = listener;
}
@@ -67,7 +73,7 @@ public class FlowControllingBlockingQueue
{
synchronized(_listener)
{
- if (--_count == (_flowControlThreshold - 1))
+ if (_count-- == _flowControlLowThreshold)
{
_listener.underThreshold(_count);
}
@@ -83,7 +89,7 @@ public class FlowControllingBlockingQueue
{
synchronized(_listener)
{
- if (++_count == _flowControlThreshold)
+ if (++_count == _flowControlHighThreshold)
{
_listener.aboveThreshold(_count);
}
diff --git a/java/client/src/org/apache/qpid/jms/Connection.java b/java/client/src/org/apache/qpid/jms/Connection.java
index 22e09d0f93..b0375c8493 100644
--- a/java/client/src/org/apache/qpid/jms/Connection.java
+++ b/java/client/src/org/apache/qpid/jms/Connection.java
@@ -30,19 +30,37 @@ public interface Connection extends javax.jms.Connection
/**
* Get the connection listener that has been registered with this connection, if any
+ *
* @return the listener or null if none has been set
*/
ConnectionListener getConnectionListener();
/**
* Create a session specifying the prefetch limit of messages.
+ *
* @param transacted
* @param acknowledgeMode
- * @param prefetch the maximum number of messages to buffer in the client. This
- * applies as a total across all consumers
+ * @param prefetch the maximum number of messages to buffer in the client. This
+ * applies as a total across all consumers
* @return
* @throws JMSException
*/
org.apache.qpid.jms.Session createSession(boolean transacted, int acknowledgeMode,
- int prefetch) throws JMSException;
+ int prefetch) throws JMSException;
+
+
+ /**
+ * Create a session specifying the prefetch limit of messages.
+ *
+ * @param transacted
+ * @param acknowledgeMode
+ * @param prefetchHigh the maximum number of messages to buffer in the client.
+ * This applies as a total across all consumers
+ * @param prefetchLow the number of messages that must be in the buffer in the client to renable message flow.
+ * This applies as a total across all consumers
+ * @return
+ * @throws JMSException
+ */
+ org.apache.qpid.jms.Session createSession(boolean transacted, int acknowledgeMode,
+ int prefetchHigh, int prefetchLow) throws JMSException;
}
diff --git a/java/client/src/org/apache/qpid/jms/Session.java b/java/client/src/org/apache/qpid/jms/Session.java
index 82a2311498..d369c08aa1 100644
--- a/java/client/src/org/apache/qpid/jms/Session.java
+++ b/java/client/src/org/apache/qpid/jms/Session.java
@@ -48,6 +48,16 @@ public interface Session extends javax.jms.Session
int getDefaultPrefetch();
/**
+ * @return the High water prefetch value used by default for consumers created on this session.
+ */
+ int getDefaultPrefetchHigh();
+
+ /**
+ * @return the Low water prefetch value used by default for consumers created on this session.
+ */
+ int getDefaultPrefetchLow();
+
+ /**
* Create a producer
* @param destination
* @param mandatory the value of the mandatory flag used by default on the producer
diff --git a/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java b/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java
index 3445b37317..fad1849fed 100644
--- a/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java
+++ b/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java
@@ -43,7 +43,7 @@ public class ChannelFlowTest implements MessageListener
ChannelFlowTest(AMQConnection connection, AMQDestination destination) throws Exception
{
- AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50);
+ AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50,25);
//set up a slow consumer
session.createConsumer(destination).setMessageListener(this);