summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java57
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java4
7 files changed, 70 insertions, 19 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ebeb29af78..f3865bd5af 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -250,7 +250,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
protected AMQConnectionDelegate _delegate;
// this connection maximum number of prefetched messages
- private long _maxPrefetch;
+ protected int _maxPrefetch;
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
@@ -337,13 +337,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
{
- _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
- _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
+ _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
@@ -653,7 +653,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
- return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+ return createSession(transacted, acknowledgeMode, _maxPrefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 39f2ad5048..cbdefd0548 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -181,13 +181,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
-
- /** The default maximum number of prefetched message at which to suspend the channel. */
- public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
-
- /** The default minimum number of prefetched messages at which to resume the channel. */
- public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
-
/**
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
@@ -233,10 +226,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
- private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+ private int _defaultPrefetchHighMark;
/** Holds the low mark for prefetched messages, below which the session is resumed. */
- private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+ private int _defaultPrefetchLowMark;
/** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
index 476536e42b..20fa68605a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
@@ -47,7 +47,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ
public synchronized XASession createXASession() throws JMSException
{
checkNotClosed();
- return _delegate.createXASession(AMQSession.DEFAULT_PREFETCH_HIGH_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
+ return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2);
}
//-- Interface XAQueueConnection
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index edda18c715..49ac89d9b3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -39,7 +39,7 @@ public class ClientProperties
* type: long
*/
public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
- public static final String MAX_PREFETCH_DEFAULT = "1000";
+ public static final String MAX_PREFETCH_DEFAULT = "5000";
/**
* When true a sync command is sent after every persistent messages.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index e14efe03a7..74d3c5f1cb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -95,7 +96,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
FieldTable ft = new FieldTable();
ft.setString("F1000", "1");
- consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+ consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft);
//force synch to ensure the consumer has resulted in a bound queue
//((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index c91c27e894..55750dcafb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -21,13 +21,19 @@
package org.apache.qpid.test.unit.client;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.TopicSession;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -187,6 +193,57 @@ public class AMQConnectionTest extends QpidTestCase
}
}
+ public void testPrefetchSystemProperty() throws Exception
+ {
+ String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME);
+ try
+ {
+ _connection.close();
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
+ _connection = (AMQConnection) getConnection();
+ _connection.start();
+ // Create two consumers on different sessions
+ Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerA = consSessA.createConsumer(_queue);
+
+ Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ // Send 3 messages
+ for (int i = 0; i < 3; i++)
+ {
+ producer.send(producerSession.createTextMessage(new Integer(i).toString()));
+ }
+ Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerB = consSessB.createConsumer(_queue);
+
+ Message msg;
+ // Check that one consumer has 2 messages
+ for (int i = 0; i < 2; i++)
+ {
+ msg = consumerA.receive(1500);
+ assertNotNull(msg);
+ assertEquals(new Integer(i).toString(), ((TextMessage) msg).getText());
+ }
+
+ msg = consumerA.receive(1500);
+ assertNull(msg);
+
+ // Check that other consumer has last message
+ msg = consumerB.receive(1500);
+ assertNotNull(msg);
+ assertEquals(new Integer(2).toString(), ((TextMessage) msg).getText());
+ }
+ finally
+ {
+ if (oldPrefetch == null)
+ {
+ oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT;
+ }
+ System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch);
+ }
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(AMQConnectionTest.class);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 6fa0172ae3..7978e2c818 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -24,6 +24,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQHeadersExchange;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -76,8 +77,7 @@ public class StreamMessageTest extends QpidTestCase
FieldTable ft = new FieldTable();
ft.setString("F1000", "1");
MessageConsumer consumer =
- consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK,
- AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
+ consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft);
// force synch to ensure the consumer has resulted in a bound queue
// ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);