diff options
author | Aidan Skinner <aidan@apache.org> | 2008-10-10 10:22:21 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-10-10 10:22:21 +0000 |
commit | 9e422621ddc3bf6fb8af271663deb3ac62ff72b0 (patch) | |
tree | f3ee97de900faa0c1c84de25b23022662c2e4d42 | |
parent | 75c3d77f879c4e3175b249250f5cfbbff2480fbe (diff) | |
download | qpid-python-9e422621ddc3bf6fb8af271663deb3ac62ff72b0.tar.gz |
QPID-1289: Make 0-8/0-9 client honour the max_preftech system property.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703383 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 70 insertions, 19 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ebeb29af78..f3865bd5af 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -250,7 +250,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages - private long _maxPrefetch; + protected int _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; @@ -337,13 +337,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // set this connection maxPrefetch if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) { - _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); } else { // use the defaul value set for all connections - _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, - ClientProperties.MAX_PREFETCH_DEFAULT)); + _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, + ClientProperties.MAX_PREFETCH_DEFAULT)); } if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) @@ -653,7 +653,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException { - return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); + return createSession(transacted, acknowledgeMode, _maxPrefetch); } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 39f2ad5048..cbdefd0548 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -181,13 +181,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); - - /** The default maximum number of prefetched message at which to suspend the channel. */ - public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; - - /** The default minimum number of prefetched messages at which to resume the channel. */ - public static final int DEFAULT_PREFETCH_LOW_MARK = 2500; - /** * The default value for immediate flag used by producers created by this session is false. That is, a consumer does * not need to be attached to a queue. @@ -233,10 +226,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private int _ticket; /** Holds the high mark for prefetched message, at which the session is suspended. */ - private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + private int _defaultPrefetchHighMark; /** Holds the low mark for prefetched messages, below which the session is resumed. */ - private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; + private int _defaultPrefetchLowMark; /** Holds the message listener, if any, which is attached to this session. */ private MessageListener _messageListener = null; diff --git a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index 476536e42b..20fa68605a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -47,7 +47,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public synchronized XASession createXASession() throws JMSException { checkNotClosed(); - return _delegate.createXASession(AMQSession.DEFAULT_PREFETCH_HIGH_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK); + return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2); } //-- Interface XAQueueConnection diff --git a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index edda18c715..49ac89d9b3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -39,7 +39,7 @@ public class ClientProperties * type: long */ public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch"; - public static final String MAX_PREFETCH_DEFAULT = "1000"; + public static final String MAX_PREFETCH_DEFAULT = "5000"; /** * When true a sync command is sent after every persistent messages. diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index e14efe03a7..74d3c5f1cb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -26,6 +26,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.client.*; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; @@ -95,7 +96,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); FieldTable ft = new FieldTable(); ft.setString("F1000", "1"); - consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft); + consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft); //force synch to ensure the consumer has resulted in a bound queue //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index c91c27e894..55750dcafb 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -21,13 +21,19 @@ package org.apache.qpid.test.unit.client; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.TopicSession; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidTestCase; @@ -187,6 +193,57 @@ public class AMQConnectionTest extends QpidTestCase } } + public void testPrefetchSystemProperty() throws Exception + { + String oldPrefetch = System.getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME); + try + { + _connection.close(); + System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); + _connection = (AMQConnection) getConnection(); + _connection.start(); + // Create two consumers on different sessions + Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerA = consSessA.createConsumer(_queue); + + Session producerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + // Send 3 messages + for (int i = 0; i < 3; i++) + { + producer.send(producerSession.createTextMessage(new Integer(i).toString())); + } + Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumerB = consSessB.createConsumer(_queue); + + Message msg; + // Check that one consumer has 2 messages + for (int i = 0; i < 2; i++) + { + msg = consumerA.receive(1500); + assertNotNull(msg); + assertEquals(new Integer(i).toString(), ((TextMessage) msg).getText()); + } + + msg = consumerA.receive(1500); + assertNull(msg); + + // Check that other consumer has last message + msg = consumerB.receive(1500); + assertNotNull(msg); + assertEquals(new Integer(2).toString(), ((TextMessage) msg).getText()); + } + finally + { + if (oldPrefetch == null) + { + oldPrefetch = ClientProperties.MAX_PREFETCH_DEFAULT; + } + System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, oldPrefetch); + } + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(AMQConnectionTest.class); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 6fa0172ae3..7978e2c818 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -24,6 +24,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQHeadersExchange; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -76,8 +77,7 @@ public class StreamMessageTest extends QpidTestCase FieldTable ft = new FieldTable(); ft.setString("F1000", "1"); MessageConsumer consumer = - consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, - AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft); + consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); // force synch to ensure the consumer has resulted in a bound queue // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); |