summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-09-05 18:51:15 +0000
committerRobert Gemmell <robbie@apache.org>2010-09-05 18:51:15 +0000
commit2e7e6fed93a10f90fd38228ffb034d53450f2413 (patch)
tree385b1a8867980e095b179d18e3af623b06b9624a /java/client/src
parentba3c83a6b74d8145ece6930272b880260e613a3c (diff)
downloadqpid-python-2e7e6fed93a10f90fd38228ffb034d53450f2413.tar.gz
QPID-2418: updates to fix test failures when using the 0-10 client test profiles.
Use a transacted session when querying for queue counts following consumption, as the 0-10 client batches auto-acks asynchronously. Always send the selector filter argument even if empty, to allow querying the brokers via 0-10 to detect whether the selector is being added/removed/modified at subscribe time. Enable the Java broker to perform argument matching during the 0-10 isBound check. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@992856 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java7
4 files changed, 39 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b96b32d990..0f7e0b0812 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -90,6 +91,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -1066,10 +1068,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ Map<String,Object> args = new HashMap<String,Object>();
+
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+ // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+ // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
+ // argument, as specifying null for the arguments when querying means they should not be checked at all
+ args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+
+ // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
+ boolean isQueueBoundForTopicAndSelector =
+ isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
+
+ if (isQueueBound && !isQueueBoundForTopicAndSelector)
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1089,6 +1102,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
unsubscribe(name, true);
}
+
}
_subscriberAccess.lock();
@@ -1957,10 +1971,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ft.addAll(rawSelector);
}
- if (messageSelector != null)
- {
- ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
- }
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+ // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+ // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
+ // argument, as specifying null for the arguments when querying means they should not be checked at all
+ ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
@@ -2091,6 +2106,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throws JMSException;
public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
+
+ public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException;
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 75db5d5673..c1021e121c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -318,11 +318,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (destination.getDestSyntax() == DestSyntax.BURL)
{
Map args = FieldTableSupport.convertToMap(arguments);
- // this is there only because the broker may expect a value for x-match
- if( ! args.containsKey("x-match") )
- {
- args.put("x-match", "any");
- }
for (AMQShortString rk: destination.getBindingKeys())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 89fbd66e71..8cca92da1f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -566,4 +566,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
{
}
+
+ public boolean isQueueBound(String exchangeName, String queueName,
+ String bindingKey, Map<String, Object> args) throws JMSException
+ {
+ return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName),
+ queueName == null ? null : new AMQShortString(queueName),
+ bindingKey == null ? null : new AMQShortString(bindingKey));
+ }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index f7a37e4894..47c0359b94 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -29,6 +29,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import javax.jms.*;
+
import java.util.Map;
public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
@@ -188,4 +189,10 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
protected void flushAcknowledgments()
{
}
+
+ public boolean isQueueBound(String exchangeName, String queueName,
+ String bindingKey, Map<String, Object> args) throws JMSException
+ {
+ return false;
+ }
}