summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-04-04 12:02:52 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-04-04 12:02:52 +0000
commit78fa82c7b48711f06c03e176f6e24b70af65e692 (patch)
tree24655c9e1c70c0cea76a97ac6b0976b64612342d
parent83026d0f664b69cc8ed74b1ed52df212f0a66ab7 (diff)
downloadqpid-python-78fa82c7b48711f06c03e176f6e24b70af65e692.tar.gz
QPID-796: Added ability to enable/disable message prefetching. Prefetching is controlled through the property max_prefetch, it is turned off when max_prefetch =0. (this is 0.10 code path change)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@644688 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/010ExcludeList5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java49
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java84
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/ClientProperties.java36
-rw-r--r--java/cpp.async.testprofile1
-rw-r--r--java/cpp.sync.testprofile1
-rw-r--r--java/module.xml1
7 files changed, 141 insertions, 36 deletions
diff --git a/java/010ExcludeList b/java/010ExcludeList
index 996332afa8..709c846068 100644
--- a/java/010ExcludeList
+++ b/java/010ExcludeList
@@ -5,5 +5,6 @@ org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only
org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber
-// this test needs durable subscribe states to be persisted
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent \ No newline at end of file
+// those tests need durable subscribe states to be persisted
+org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
+org.apache.qpid.test.unit.ct.DurableSubscriberTests#testDurSubRestoresMessageSelector \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 27feba694c..16d5a07141 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,7 +27,6 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -45,7 +44,6 @@ import javax.jms.IllegalStateException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
-import java.util.Iterator;
/**
* This is a 0.10 Session
@@ -58,10 +56,6 @@ public class AMQSession_0_10 extends AMQSession
*/
private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
- /**
- * The maximum number of pre-fetched messages per destination
- */
- public static long MAX_PREFETCH = 1000;
/**
* The underlying QpidSession
@@ -101,8 +95,6 @@ public class AMQSession_0_10 extends AMQSession
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
- MAX_PREFETCH = Integer.parseInt(System.getProperty("max_prefetch","1000"));
-
// create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = qpidConnection.createSession(0);
// set the exception listnere for this session
@@ -404,18 +396,23 @@ public class AMQSession_0_10 extends AMQSession
new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
-
- getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
+ if (ClientProperties.MAX_PREFETCH == 0)
+ {
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ }
+ else
+ {
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
+ }
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
- if(consumer.isStrated() || _immediatePrefetch)
+ if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
-
+ ClientProperties.MAX_PREFETCH);
}
getQpidSession().sync();
getCurrentException();
@@ -517,17 +514,27 @@ public class AMQSession_0_10 extends AMQSession
//only set if msg list is null
try
{
- // if (consumer.getMessageListener() != null)
- // {
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
- MAX_PREFETCH);
- // }
+ if (ClientProperties.MAX_PREFETCH == 0)
+ {
+ if (consumer.getMessageListener() != null)
+ {
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
+ }
+ else
+ {
+ getQpidSession()
+ .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ ClientProperties.MAX_PREFETCH);
+ }
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
}
- catch(Exception e)
+ catch (Exception e)
{
- throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error while trying to get the listener",e);
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", e);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 9d24fbf953..c40ec1e5cb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -27,7 +27,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.transport.*;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.filter.MessageFilter;
@@ -39,6 +38,7 @@ import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is a 0.10 message consumer.
@@ -72,6 +72,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
private boolean _isStarted = false;
+ /**
+ * Specify whether this consumer is performing a sync receive
+ */
+ private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -136,6 +141,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
if (messageOk)
{
+ if (isMessageListenerSet() && ClientProperties.MAX_PREFETCH == 0)
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage, channelId);
}
@@ -307,23 +317,33 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
- if (!messageOk && _preAcquire)
+ if (!messageOk)
{
- // this is the case for topics
- // We need to ack this message
- if (_logger.isDebugEnabled())
+ if (_preAcquire)
{
- _logger.debug("filterMessage - trying to ack message");
+ // this is the case for topics
+ // We need to ack this message
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to ack message");
+ }
+ acknowledgeMessage(message);
}
- acknowledgeMessage(message);
- }
- else if (!messageOk)
- {
- if (_logger.isDebugEnabled())
+ else
{
- _logger.debug("Message not OK, releasing");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message not OK, releasing");
+ }
+ releaseMessage(message);
+ }
+ // if we are syncrhonously waiting for a message
+ // and messages are not prefetched we then need to request another one
+ if(ClientProperties.MAX_PREFETCH == 0)
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
}
- releaseMessage(message);
}
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
@@ -429,6 +449,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
+ if (messageListener != null && ClientProperties.MAX_PREFETCH == 0)
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
if (messageListener != null && !_synchronousQueue.isEmpty())
{
Iterator messages=_synchronousQueue.iterator();
@@ -449,6 +474,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void start()
{
_isStarted = true;
+ if (_syncReceive.get())
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
}
public void stop()
@@ -456,4 +486,32 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_isStarted = false;
}
+ /**
+ * When messages are not prefetched we need to request a message from the
+ * broker.
+ * Note that if the timeout is too short a message may be queued in _synchronousQueue until
+ * this consumer closes or request it.
+ * @param l
+ * @return
+ * @throws InterruptedException
+ */
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ if (isStrated() && ClientProperties.MAX_PREFETCH == 0 && _synchronousQueue.isEmpty())
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
+ if (ClientProperties.MAX_PREFETCH == 0)
+ {
+ _syncReceive.set(true);
+ }
+ Object o = super.getMessageFromQueue(l);
+ if (ClientProperties.MAX_PREFETCH == 0)
+ {
+ _syncReceive.set(false);
+ }
+ return o;
+ }
+
} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
new file mode 100644
index 0000000000..73c59dcf96
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.client;
+
+/**
+ * This class centralized the Qpid client properties.
+ */
+public class ClientProperties
+{
+
+ /**
+ * The maximum number of pre-fetched messages per destination
+ */
+ public static long MAX_PREFETCH = Long.valueOf(System.getProperties().getProperty("max_prefetch", "1000"));
+
+ /**
+ * When true a sync command is sent after every persistent messages.
+ */
+ public static boolean FULLY_SYNC = Boolean.getBoolean("fully_sync");
+}
diff --git a/java/cpp.async.testprofile b/java/cpp.async.testprofile
index 88c0d61a0e..d4daf47632 100644
--- a/java/cpp.async.testprofile
+++ b/java/cpp.async.testprofile
@@ -2,6 +2,7 @@ broker.version=0-10
broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --log-output ${build.data}/broker.log --load-module ${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --store-async yes
broker.clean=${build.data}
java.naming.provider.url=${project.root}/test-provider.properties
+max_prefetch=1000
test.excludes=true
test.excludesfile=${project.root}/010ExcludeList
log=info
diff --git a/java/cpp.sync.testprofile b/java/cpp.sync.testprofile
index beb01feefd..44a2d71ea0 100644
--- a/java/cpp.sync.testprofile
+++ b/java/cpp.sync.testprofile
@@ -3,6 +3,7 @@ broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --log-output
broker.clean=${build.data}
java.naming.provider.url=${project.root}/test-provider.properties
test.excludes=true
+max_prefetch=1000
test.excludesfile=${project.root}/010ExcludeList
log=info
amqj.logging.level=$log
diff --git a/java/module.xml b/java/module.xml
index 7b63a768af..3dbab15962 100644
--- a/java/module.xml
+++ b/java/module.xml
@@ -185,6 +185,7 @@
<sysproperty key="broker.version" value="${broker.version}"/>
<sysproperty key="test.excludes" value="${test.excludes}"/>
<sysproperty key="test.excludesfile" value="${test.excludesfile}"/>
+ <sysproperty key="max_prefetch" value ="${max_prefetch}"/>
<formatter type="plain"/>
<formatter type="xml"/>