summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-04-17 12:44:35 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-04-17 12:44:35 +0000
commitd421eff48b17b37cb6c9ad226fde22dcc72cb6d4 (patch)
tree97c7190247a7eb4b4130f2927dab4b6ed9157c4d /java
parent3a2c546414d639b224d8c765a5fdc9dfd6f31732 (diff)
downloadqpid-python-d421eff48b17b37cb6c9ad226fde22dcc72cb6d4.tar.gz
QPID-796 Made connection URL property + use session level method
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649070 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java1
-rw-r--r--java/default.testprofile1
6 files changed, 48 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index adbe03e986..566582e666 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -152,6 +152,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
protected AMQConnectionDelegate _delegate;
+ // this connection maximum number of prefetched messages
+ private long _maxPrefetch;
+
/**
* @param broker brokerdetails
* @param username username
@@ -231,6 +234,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
+ // set this connection maxPrefetch
+ if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+ {
+ _maxPrefetch = Long.parseLong( connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _maxPrefetch = ClientProperties.MAX_PREFETCH;
+ }
+
_failoverPolicy = new FailoverPolicy(connectionURL);
if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM))
{
@@ -1179,4 +1193,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _sessions.get(channelId);
}
+
+ /**
+ * Get the maximum number of messages that this connection can pre-fetch.
+ *
+ * @return The maximum number of messages that this connection can pre-fetch.
+ */
+ public long getMaxPrefetch()
+ {
+ return _maxPrefetch;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index f79523e546..ee55743d0e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2339,6 +2339,17 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
+ /**
+ * Indicates whether this session consumers pre-fetche messages
+ *
+ * @return true if this session consumers pre-fetche messages false otherwise
+ */
+ public boolean prefetch()
+ {
+ return getAMQConnection().getMaxPrefetch() > 0;
+ }
+
+
public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException;
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 2d60877c5e..d72668bb53 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -406,7 +406,7 @@ public class AMQSession_0_10 extends AMQSession
new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
- if (ClientProperties.MAX_PREFETCH == 0)
+ if (! prefetch())
{
getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT);
}
@@ -417,12 +417,12 @@ public class AMQSession_0_10 extends AMQSession
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
- if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || _immediatePrefetch))
+ if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
MessageCreditUnit.MESSAGE,
- ClientProperties.MAX_PREFETCH);
+ getAMQConnection().getMaxPrefetch());
}
getQpidSession().sync();
getCurrentException();
@@ -531,7 +531,7 @@ public class AMQSession_0_10 extends AMQSession
//only set if msg list is null
try
{
- if (ClientProperties.MAX_PREFETCH == 0)
+ if (! prefetch())
{
if (consumer.getMessageListener() != null)
{
@@ -543,7 +543,7 @@ public class AMQSession_0_10 extends AMQSession
{
getQpidSession()
.messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE,
- ClientProperties.MAX_PREFETCH);
+ getAMQConnection().getMaxPrefetch());
}
getQpidSession()
.messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index d7cce986aa..4f2d5d8c34 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -141,7 +141,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
if (messageOk)
{
- if (isMessageListenerSet() && ClientProperties.MAX_PREFETCH == 0)
+ if (isMessageListenerSet() && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
MessageCreditUnit.MESSAGE, 1);
@@ -330,7 +330,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
// if we are syncrhonously waiting for a message
// and messages are not prefetched we then need to request another one
- if(ClientProperties.MAX_PREFETCH == 0)
+ if(! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
MessageCreditUnit.MESSAGE, 1);
@@ -422,7 +422,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener != null && ClientProperties.MAX_PREFETCH == 0)
+ if (messageListener != null && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
MessageCreditUnit.MESSAGE, 1);
@@ -470,17 +470,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (isStrated() && ClientProperties.MAX_PREFETCH == 0 && _synchronousQueue.isEmpty())
+ if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
{
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
MessageCreditUnit.MESSAGE, 1);
}
- if (ClientProperties.MAX_PREFETCH == 0)
+ if (! getSession().prefetch())
{
_syncReceive.set(true);
}
Object o = super.getMessageFromQueue(l);
- if (ClientProperties.MAX_PREFETCH == 0)
+ if (! getSession().prefetch())
{
_syncReceive.set(false);
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 8ce302564b..ea0fecb278 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -32,6 +32,7 @@ import java.util.List;
*/
public interface ConnectionURL
{
+ public static final String AMQ_MAXPREFETCH = "maxprefetch";
public static final String AMQ_PROTOCOL = "amqp";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
diff --git a/java/default.testprofile b/java/default.testprofile
index db99ab93c9..08c9f08dc6 100644
--- a/java/default.testprofile
+++ b/java/default.testprofile
@@ -5,6 +5,7 @@ java.naming.provider.url=${project.root}/test-provider.properties
test.excludes=true
test.excludesfile=${project.root}/08ExcludeList
log=info
+max_prefetch=1000
amqj.logging.level=$log
root.logging.level=$log
log4j.configuration=file://${project.root}/log4j-test.xml