diff options
author | Martin Ritchie <ritchiem@apache.org> | 2009-10-08 12:40:23 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2009-10-08 12:40:23 +0000 |
commit | 6516117db5080ac5808db4de3439b48e7c42ccaf (patch) | |
tree | 1a1c900b9a15cc530907dc8e659069648fc96bb4 | |
parent | dc58d0f43925ccba273e62714133060db9eda2be (diff) | |
download | qpid-python-6516117db5080ac5808db4de3439b48e7c42ccaf.tar.gz |
QPID-1440 : Code review changes from QPID-1289. All actioned except adding new createConsumer() method.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@823149 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 53 insertions, 24 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 5659ce0474..ed122a772e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -313,7 +313,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages - protected int _maxPrefetch; + private int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index e5980d8b7d..e6c3473cb1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -39,6 +39,15 @@ public interface AMQConnectionDelegate Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException; + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + XASession createXASession() throws JMSException; + XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; void failoverPrep(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 927929c94a..211f72e9ba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -100,6 +100,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + + /** * create an XA Session and start it if required. */ public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index a0b69b5493..275d555bdb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -191,6 +191,18 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate }, _conn).execute(); } + /** + * Create an XASession with default prefetch values of: + * High = MaxPrefetch + * Low = MaxPrefetch / 2 + * @return XASession + * @throws JMSException thrown if there is a problem creating the session. + */ + public XASession createXASession() throws JMSException + { + return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); + } + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException, FailoverException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fc81e32e4d..dd9a00ce10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,7 +21,6 @@ package org.apache.qpid.client; import java.io.Serializable; -import java.io.IOException; import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; @@ -92,7 +91,6 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; -import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,8 +113,6 @@ import org.slf4j.LoggerFactory; public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { - - public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -263,10 +259,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private int _ticket; /** Holds the high mark for prefetched message, at which the session is suspended. */ - private int _defaultPrefetchHighMark; + private int _prefetchHighMark; /** Holds the low mark for prefetched messages, below which the session is resumed. */ - private int _defaultPrefetchLowMark; + private int _prefetchLowMark; /** Holds the message listener, if any, which is attached to this session. */ private MessageListener _messageListener = null; @@ -447,13 +443,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; - _defaultPrefetchHighMark = defaultPrefetchHighMark; - _defaultPrefetchLowMark = defaultPrefetchLowMark; + _prefetchHighMark = defaultPrefetchHighMark; + _prefetchLowMark = defaultPrefetchLowMark; if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = - new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, + new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { private final AtomicBoolean _suspendState = new AtomicBoolean(); @@ -461,7 +457,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void aboveThreshold(int currentValue) { _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark + "Above threshold(" + _prefetchHighMark + ") so suspending channel. Current value is " + currentValue); _suspendState.set(true); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -471,7 +467,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void underThreshold(int currentValue) { _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark + "Below threshold(" + _prefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); _suspendState.set(false); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -481,7 +477,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null); + _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null); } } @@ -897,7 +893,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false, messageSelector, null, true, true); } @@ -905,7 +901,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, false, false); } @@ -913,7 +909,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false); } @@ -921,7 +917,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), messageSelector, null, false, false); } @@ -930,7 +926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic), + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), messageSelector, null, false, false); } @@ -939,7 +935,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true, + return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true, messageSelector, null, false, false); } @@ -1363,17 +1359,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public int getDefaultPrefetch() { - return _defaultPrefetchHighMark; + return _prefetchHighMark; } public int getDefaultPrefetchHigh() { - return _defaultPrefetchHighMark; + return _prefetchHighMark; } public int getDefaultPrefetchLow() { - return _defaultPrefetchLowMark; + return _prefetchLowMark; } public AMQShortString getDefaultQueueExchangeName() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index 20fa68605a..43025bd724 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -47,7 +47,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public synchronized XASession createXASession() throws JMSException { checkNotClosed(); - return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2); + return _delegate.createXASession(); } //-- Interface XAQueueConnection diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 3627618e68..be0d283470 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -39,7 +39,7 @@ public class ClientProperties * type: long */ public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch"; - public static final String MAX_PREFETCH_DEFAULT = "5000"; + public static final String MAX_PREFETCH_DEFAULT = "500"; /** * When true a sync command is sent after every persistent messages. |