diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2008-04-17 12:44:35 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-04-17 12:44:35 +0000 |
commit | d421eff48b17b37cb6c9ad226fde22dcc72cb6d4 (patch) | |
tree | 97c7190247a7eb4b4130f2927dab4b6ed9157c4d /java | |
parent | 3a2c546414d639b224d8c765a5fdc9dfd6f31732 (diff) | |
download | qpid-python-d421eff48b17b37cb6c9ad226fde22dcc72cb6d4.tar.gz |
QPID-796 Made connection URL property + use session level method
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649070 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 48 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index adbe03e986..566582e666 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -152,6 +152,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected AMQConnectionDelegate _delegate; + // this connection maximum number of prefetched messages + private long _maxPrefetch; + /** * @param broker brokerdetails * @param username username @@ -231,6 +234,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { + // set this connection maxPrefetch + if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) + { + _maxPrefetch = Long.parseLong( connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + } + else + { + // use the defaul value set for all connections + _maxPrefetch = ClientProperties.MAX_PREFETCH; + } + _failoverPolicy = new FailoverPolicy(connectionURL); if (_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM)) { @@ -1179,4 +1193,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions.get(channelId); } + + /** + * Get the maximum number of messages that this connection can pre-fetch. + * + * @return The maximum number of messages that this connection can pre-fetch. + */ + public long getMaxPrefetch() + { + return _maxPrefetch; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index f79523e546..ee55743d0e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2339,6 +2339,17 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + /** + * Indicates whether this session consumers pre-fetche messages + * + * @return true if this session consumers pre-fetche messages false otherwise + */ + public boolean prefetch() + { + return getAMQConnection().getMaxPrefetch() > 0; + } + + public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException; /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 2d60877c5e..d72668bb53 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -406,7 +406,7 @@ public class AMQSession_0_10 extends AMQSession new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); - if (ClientProperties.MAX_PREFETCH == 0) + if (! prefetch()) { getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT); } @@ -417,12 +417,12 @@ public class AMQSession_0_10 extends AMQSession getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch - if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || _immediatePrefetch)) + if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, - ClientProperties.MAX_PREFETCH); + getAMQConnection().getMaxPrefetch()); } getQpidSession().sync(); getCurrentException(); @@ -531,7 +531,7 @@ public class AMQSession_0_10 extends AMQSession //only set if msg list is null try { - if (ClientProperties.MAX_PREFETCH == 0) + if (! prefetch()) { if (consumer.getMessageListener() != null) { @@ -543,7 +543,7 @@ public class AMQSession_0_10 extends AMQSession { getQpidSession() .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE, - ClientProperties.MAX_PREFETCH); + getAMQConnection().getMaxPrefetch()); } getQpidSession() .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index d7cce986aa..4f2d5d8c34 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -141,7 +141,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } if (messageOk) { - if (isMessageListenerSet() && ClientProperties.MAX_PREFETCH == 0) + if (isMessageListenerSet() && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), MessageCreditUnit.MESSAGE, 1); @@ -330,7 +330,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } // if we are syncrhonously waiting for a message // and messages are not prefetched we then need to request another one - if(ClientProperties.MAX_PREFETCH == 0) + if(! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), MessageCreditUnit.MESSAGE, 1); @@ -422,7 +422,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By public void setMessageListener(final MessageListener messageListener) throws JMSException { super.setMessageListener(messageListener); - if (messageListener != null && ClientProperties.MAX_PREFETCH == 0) + if (messageListener != null && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), MessageCreditUnit.MESSAGE, 1); @@ -470,17 +470,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (isStrated() && ClientProperties.MAX_PREFETCH == 0 && _synchronousQueue.isEmpty()) + if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) { _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), MessageCreditUnit.MESSAGE, 1); } - if (ClientProperties.MAX_PREFETCH == 0) + if (! getSession().prefetch()) { _syncReceive.set(true); } Object o = super.getMessageFromQueue(l); - if (ClientProperties.MAX_PREFETCH == 0) + if (! getSession().prefetch()) { _syncReceive.set(false); } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 8ce302564b..ea0fecb278 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -32,6 +32,7 @@ import java.util.List; */ public interface ConnectionURL { + public static final String AMQ_MAXPREFETCH = "maxprefetch"; public static final String AMQ_PROTOCOL = "amqp"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; diff --git a/java/default.testprofile b/java/default.testprofile index db99ab93c9..08c9f08dc6 100644 --- a/java/default.testprofile +++ b/java/default.testprofile @@ -5,6 +5,7 @@ java.naming.provider.url=${project.root}/test-provider.properties test.excludes=true test.excludesfile=${project.root}/08ExcludeList log=info +max_prefetch=1000 amqj.logging.level=$log root.logging.level=$log log4j.configuration=file://${project.root}/log4j-test.xml |