diff options
author | Robert Gemmell <robbie@apache.org> | 2012-02-13 00:56:23 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-02-13 00:56:23 +0000 |
commit | 99c38d6c25eb381ba856bf4f08899b931fb39af0 (patch) | |
tree | f4bc28c5dec61778117d878bdc15d6f5a9462e83 | |
parent | 7fc447475dd3d186e139ff708c512b18e36f1bc4 (diff) | |
download | qpid-python-99c38d6c25eb381ba856bf4f08899b931fb39af0.tar.gz |
QPID-3831: use AcquireMode=PRE_ACQUIRED when using server-side selectors and consuming from Queues. Remove unused method parameter for selector filter.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1243384 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 12 insertions, 16 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index f579dcbf45..49447189b6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -47,7 +47,6 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -2581,7 +2580,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @param queueName */ private void consumeFromQueue(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException + AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException, FailoverException { int tagId = _nextTag++; @@ -2598,7 +2597,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); + sendConsume(consumer, queueName, protocolHandler, nowait, tagId); } catch (AMQException e) { @@ -2609,7 +2608,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendConsume(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException; + AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException; private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate) throws JMSException @@ -2954,7 +2953,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter()); + consumeFromQueue(consumer, queueName, protocolHandler, nowait); } catch (FailoverException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index e1138c5356..a27c52c686 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -599,7 +599,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Registers the consumer with the broker */ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, MessageFilter messageSelector, int tag) + boolean nowait, int tag) throws AMQException, FailoverException { boolean preAcquire = consumer.isPreAcquire(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index c86fb6529a..a49fb256a7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -364,7 +364,6 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, - MessageFilter messageSelector, int tag) throws AMQException, FailoverException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ccde720673..26bb51b821 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -91,11 +91,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; - _preAcquire = evaluatePreAcquire(browseOnly, destination); - - _capacity = evaluateCapacity(destination); _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); + _preAcquire = evaluatePreAcquire(browseOnly, destination, _serverJmsSelectorSupport); + _capacity = evaluateCapacity(destination); if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { @@ -222,7 +221,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM boolean messageOk = true; try { - if (getMessageSelectorFilter() != null && !_serverJmsSelectorSupport) + if (!_serverJmsSelectorSupport && getMessageSelectorFilter() != null) { messageOk = getMessageSelectorFilter().matches(message); } @@ -525,7 +524,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _preAcquire; } - private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination) + private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination, boolean serverJmsSelectorSupport) { boolean preAcquire; if (browseOnly) @@ -535,7 +534,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM else { boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE); - if (isQueue && getMessageSelectorFilter() != null) + if (!serverJmsSelectorSupport && isQueue && getMessageSelectorFilter() != null) { preAcquire = false; } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 4bfbd3c726..028e2d5cc3 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -276,7 +276,7 @@ public class AMQSession_0_10Test extends QpidTestCase { BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, null, null, false, true); - session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1); + session.sendConsume(consumer, new AMQShortString("test"), null, true, 1); } catch (Exception e) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index ecd8eb99d1..84d91ee57e 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -29,7 +29,6 @@ import org.apache.qpid.client.BasicMessageProducer_0_8; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -125,7 +124,7 @@ public class TestAMQSession extends AMQSession_0_8 return false; } - public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException + public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException { } |