diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2008-04-04 12:02:52 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-04-04 12:02:52 +0000 |
commit | 78fa82c7b48711f06c03e176f6e24b70af65e692 (patch) | |
tree | 24655c9e1c70c0cea76a97ac6b0976b64612342d /java | |
parent | 83026d0f664b69cc8ed74b1ed52df212f0a66ab7 (diff) | |
download | qpid-python-78fa82c7b48711f06c03e176f6e24b70af65e692.tar.gz |
QPID-796: Added ability to enable/disable message prefetching. Prefetching is controlled through the property max_prefetch, it is turned off when max_prefetch =0. (this is 0.10 code path change)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@644688 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r-- | java/010ExcludeList | 5 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 49 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 84 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/ClientProperties.java | 36 | ||||
-rw-r--r-- | java/cpp.async.testprofile | 1 | ||||
-rw-r--r-- | java/cpp.sync.testprofile | 1 | ||||
-rw-r--r-- | java/module.xml | 1 |
7 files changed, 141 insertions, 36 deletions
diff --git a/java/010ExcludeList b/java/010ExcludeList index 996332afa8..709c846068 100644 --- a/java/010ExcludeList +++ b/java/010ExcludeList @@ -5,5 +5,6 @@ org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber -// this test needs durable subscribe states to be persisted -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
\ No newline at end of file +// those tests need durable subscribe states to be persisted +org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent +org.apache.qpid.test.unit.ct.DurableSubscriberTests#testDurSubRestoresMessageSelector
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 27feba694c..16d5a07141 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,7 +27,6 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; -import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; @@ -45,7 +44,6 @@ import javax.jms.IllegalStateException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; -import java.util.Iterator; /** * This is a 0.10 Session @@ -58,10 +56,6 @@ public class AMQSession_0_10 extends AMQSession */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); - /** - * The maximum number of pre-fetched messages per destination - */ - public static long MAX_PREFETCH = 1000; /** * The underlying QpidSession @@ -101,8 +95,6 @@ public class AMQSession_0_10 extends AMQSession super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); - MAX_PREFETCH = Integer.parseInt(System.getProperty("max_prefetch","1000")); - // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = qpidConnection.createSession(0); // set the exception listnere for this session @@ -404,18 +396,23 @@ public class AMQSession_0_10 extends AMQSession new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); - - getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + if (ClientProperties.MAX_PREFETCH == 0) + { + getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + } + else + { + getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + } getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch - if(consumer.isStrated() || _immediatePrefetch) + if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumer.getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - AMQSession_0_10.MAX_PREFETCH); - + ClientProperties.MAX_PREFETCH); } getQpidSession().sync(); getCurrentException(); @@ -517,17 +514,27 @@ public class AMQSession_0_10 extends AMQSession //only set if msg list is null try { - // if (consumer.getMessageListener() != null) - // { - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, - MAX_PREFETCH); - // } + if (ClientProperties.MAX_PREFETCH == 0) + { + if (consumer.getMessageListener() != null) + { + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + } + } + else + { + getQpidSession() + .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, + ClientProperties.MAX_PREFETCH); + } getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); } - catch(Exception e) + catch (Exception e) { - throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error while trying to get the listener",e); + throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", e); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 9d24fbf953..c40ec1e5cb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -27,7 +27,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.Session; import org.apache.qpidity.transport.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.filter.MessageFilter; @@ -39,6 +38,7 @@ import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; /** * This is a 0.10 message consumer. @@ -72,6 +72,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ private boolean _isStarted = false; + /** + * Specify whether this consumer is performing a sync receive + */ + private final AtomicBoolean _syncReceive = new AtomicBoolean(false); + //--- constructor protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -136,6 +141,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } if (messageOk) { + if (isMessageListenerSet() && ClientProperties.MAX_PREFETCH == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + } _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage, channelId); } @@ -307,23 +317,33 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _logger.debug("messageOk " + messageOk); _logger.debug("_preAcquire " + _preAcquire); } - if (!messageOk && _preAcquire) + if (!messageOk) { - // this is the case for topics - // We need to ack this message - if (_logger.isDebugEnabled()) + if (_preAcquire) { - _logger.debug("filterMessage - trying to ack message"); + // this is the case for topics + // We need to ack this message + if (_logger.isDebugEnabled()) + { + _logger.debug("filterMessage - trying to ack message"); + } + acknowledgeMessage(message); } - acknowledgeMessage(message); - } - else if (!messageOk) - { - if (_logger.isDebugEnabled()) + else { - _logger.debug("Message not OK, releasing"); + if (_logger.isDebugEnabled()) + { + _logger.debug("Message not OK, releasing"); + } + releaseMessage(message); + } + // if we are syncrhonously waiting for a message + // and messages are not prefetched we then need to request another one + if(ClientProperties.MAX_PREFETCH == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); } - releaseMessage(message); } // now we need to acquire this message if needed // this is the case of queue with a message selector set @@ -429,6 +449,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); + if (messageListener != null && ClientProperties.MAX_PREFETCH == 0) + { + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + } if (messageListener != null && !_synchronousQueue.isEmpty()) { Iterator messages=_synchronousQueue.iterator(); @@ -449,6 +474,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By public void start() { _isStarted = true; + if (_syncReceive.get()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + } } public void stop() @@ -456,4 +486,32 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _isStarted = false; } + /** + * When messages are not prefetched we need to request a message from the + * broker. + * Note that if the timeout is too short a message may be queued in _synchronousQueue until + * this consumer closes or request it. + * @param l + * @return + * @throws InterruptedException + */ + public Object getMessageFromQueue(long l) throws InterruptedException + { + if (isStrated() && ClientProperties.MAX_PREFETCH == 0 && _synchronousQueue.isEmpty()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + } + if (ClientProperties.MAX_PREFETCH == 0) + { + _syncReceive.set(true); + } + Object o = super.getMessageFromQueue(l); + if (ClientProperties.MAX_PREFETCH == 0) + { + _syncReceive.set(false); + } + return o; + } + }
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java new file mode 100644 index 0000000000..73c59dcf96 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java @@ -0,0 +1,36 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.client; + +/** + * This class centralized the Qpid client properties. + */ +public class ClientProperties +{ + + /** + * The maximum number of pre-fetched messages per destination + */ + public static long MAX_PREFETCH = Long.valueOf(System.getProperties().getProperty("max_prefetch", "1000")); + + /** + * When true a sync command is sent after every persistent messages. + */ + public static boolean FULLY_SYNC = Boolean.getBoolean("fully_sync"); +} diff --git a/java/cpp.async.testprofile b/java/cpp.async.testprofile index 88c0d61a0e..d4daf47632 100644 --- a/java/cpp.async.testprofile +++ b/java/cpp.async.testprofile @@ -2,6 +2,7 @@ broker.version=0-10 broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --log-output ${build.data}/broker.log --load-module ${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --store-async yes broker.clean=${build.data} java.naming.provider.url=${project.root}/test-provider.properties +max_prefetch=1000 test.excludes=true test.excludesfile=${project.root}/010ExcludeList log=info diff --git a/java/cpp.sync.testprofile b/java/cpp.sync.testprofile index beb01feefd..44a2d71ea0 100644 --- a/java/cpp.sync.testprofile +++ b/java/cpp.sync.testprofile @@ -3,6 +3,7 @@ broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --log-output broker.clean=${build.data} java.naming.provider.url=${project.root}/test-provider.properties test.excludes=true +max_prefetch=1000 test.excludesfile=${project.root}/010ExcludeList log=info amqj.logging.level=$log diff --git a/java/module.xml b/java/module.xml index 7b63a768af..3dbab15962 100644 --- a/java/module.xml +++ b/java/module.xml @@ -185,6 +185,7 @@ <sysproperty key="broker.version" value="${broker.version}"/> <sysproperty key="test.excludes" value="${test.excludes}"/> <sysproperty key="test.excludesfile" value="${test.excludesfile}"/> + <sysproperty key="max_prefetch" value ="${max_prefetch}"/> <formatter type="plain"/> <formatter type="xml"/> |