summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-10-08 12:40:23 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-10-08 12:40:23 +0000
commit6516117db5080ac5808db4de3439b48e7c42ccaf (patch)
tree1a1c900b9a15cc530907dc8e659069648fc96bb4
parentdc58d0f43925ccba273e62714133060db9eda2be (diff)
downloadqpid-python-6516117db5080ac5808db4de3439b48e7c42ccaf.tar.gz
QPID-1440 : Code review changes from QPID-1289. All actioned except adding new createConsumer() method.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@823149 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java2
7 files changed, 53 insertions, 24 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5659ce0474..ed122a772e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -313,7 +313,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
protected AMQConnectionDelegate _delegate;
// this connection maximum number of prefetched messages
- protected int _maxPrefetch;
+ private int _maxPrefetch;
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index e5980d8b7d..e6c3473cb1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -39,6 +39,15 @@ public interface AMQConnectionDelegate
Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException;
+ /**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException thrown if there is a problem creating the session.
+ */
+ XASession createXASession() throws JMSException;
+
XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
void failoverPrep();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 927929c94a..211f72e9ba 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -100,6 +100,18 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
/**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
+ /**
* create an XA Session and start it if required.
*/
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index a0b69b5493..275d555bdb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -191,6 +191,18 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}, _conn).execute();
}
+ /**
+ * Create an XASession with default prefetch values of:
+ * High = MaxPrefetch
+ * Low = MaxPrefetch / 2
+ * @return XASession
+ * @throws JMSException thrown if there is a problem creating the session.
+ */
+ public XASession createXASession() throws JMSException
+ {
+ return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+ }
+
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException, FailoverException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index fc81e32e4d..dd9a00ce10 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import java.io.Serializable;
-import java.io.IOException;
import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -92,7 +91,6 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,8 +113,6 @@ import org.slf4j.LoggerFactory;
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
-
-
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -263,10 +259,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
- private int _defaultPrefetchHighMark;
+ private int _prefetchHighMark;
/** Holds the low mark for prefetched messages, below which the session is resumed. */
- private int _defaultPrefetchLowMark;
+ private int _prefetchLowMark;
/** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
@@ -447,13 +443,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_channelId = channelId;
_messageFactoryRegistry = messageFactoryRegistry;
- _defaultPrefetchHighMark = defaultPrefetchHighMark;
- _defaultPrefetchLowMark = defaultPrefetchLowMark;
+ _prefetchHighMark = defaultPrefetchHighMark;
+ _prefetchLowMark = defaultPrefetchLowMark;
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
- new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -461,7 +457,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void aboveThreshold(int currentValue)
{
_logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
+ "Above threshold(" + _prefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
_suspendState.set(true);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -471,7 +467,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void underThreshold(int currentValue)
{
_logger.debug(
- "Below threshold(" + _defaultPrefetchLowMark
+ "Below threshold(" + _prefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
_suspendState.set(false);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -481,7 +477,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+ _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
}
@@ -897,7 +893,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false,
messageSelector, null, true, true);
}
@@ -905,7 +901,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
false, false);
}
@@ -913,7 +909,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
false, false);
}
@@ -921,7 +917,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -930,7 +926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
messageSelector, null, false, false);
}
@@ -939,7 +935,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
messageSelector, null, false, false);
}
@@ -1363,17 +1359,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public int getDefaultPrefetch()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchHigh()
{
- return _defaultPrefetchHighMark;
+ return _prefetchHighMark;
}
public int getDefaultPrefetchLow()
{
- return _defaultPrefetchLowMark;
+ return _prefetchLowMark;
}
public AMQShortString getDefaultQueueExchangeName()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
index 20fa68605a..43025bd724 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
@@ -47,7 +47,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ
public synchronized XASession createXASession() throws JMSException
{
checkNotClosed();
- return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2);
+ return _delegate.createXASession();
}
//-- Interface XAQueueConnection
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index 3627618e68..be0d283470 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -39,7 +39,7 @@ public class ClientProperties
* type: long
*/
public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
- public static final String MAX_PREFETCH_DEFAULT = "5000";
+ public static final String MAX_PREFETCH_DEFAULT = "500";
/**
* When true a sync command is sent after every persistent messages.