diff options
author | Robert Gemmell <robbie@apache.org> | 2010-09-05 18:51:15 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-09-05 18:51:15 +0000 |
commit | 2e7e6fed93a10f90fd38228ffb034d53450f2413 (patch) | |
tree | 385b1a8867980e095b179d18e3af623b06b9624a /java/client/src | |
parent | ba3c83a6b74d8145ece6930272b880260e613a3c (diff) | |
download | qpid-python-2e7e6fed93a10f90fd38228ffb034d53450f2413.tar.gz |
QPID-2418: updates to fix test failures when using the 0-10 client test profiles.
Use a transacted session when querying for queue counts following consumption, as the 0-10 client batches auto-acks asynchronously.
Always send the selector filter argument even if empty, to allow querying the brokers via 0-10 to detect whether the selector is being added/removed/modified at subscribe time.
Enable the Java broker to perform argument matching during the 0-10 isBound check.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@992856 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
4 files changed, 39 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b96b32d990..0f7e0b0812 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,6 +25,7 @@ import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -1066,10 +1068,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + Map<String,Object> args = new HashMap<String,Object>(); + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + + // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); + boolean isQueueBoundForTopicAndSelector = + isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); + + if (isQueueBound && !isQueueBoundForTopicAndSelector) { deleteQueue(dest.getAMQQueueName()); } @@ -1089,6 +1102,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { unsubscribe(name, true); } + } _subscriberAccess.lock(); @@ -1957,10 +1971,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ft.addAll(rawSelector); } - if (messageSelector != null) - { - ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); - } + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, noLocal, exclusive, messageSelector, ft, noConsume, autoClose); @@ -2091,6 +2106,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws JMSException; public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; + + public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException; /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 75db5d5673..c1021e121c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -318,11 +318,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (destination.getDestSyntax() == DestSyntax.BURL) { Map args = FieldTableSupport.convertToMap(arguments); - // this is there only because the broker may expect a value for x-match - if( ! args.containsKey("x-match") ) - { - args.put("x-match", "any"); - } for (AMQShortString rk: destination.getBindingKeys()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 89fbd66e71..8cca92da1f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -566,4 +566,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B { } + + public boolean isQueueBound(String exchangeName, String queueName, + String bindingKey, Map<String, Object> args) throws JMSException + { + return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName), + queueName == null ? null : new AMQShortString(queueName), + bindingKey == null ? null : new AMQShortString(bindingKey)); + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index f7a37e4894..47c0359b94 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import javax.jms.*; + import java.util.Map; public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> @@ -188,4 +189,10 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe protected void flushAcknowledgments() { } + + public boolean isQueueBound(String exchangeName, String queueName, + String bindingKey, Map<String, Object> args) throws JMSException + { + return false; + } } |