summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-09-05 18:51:15 +0000
committerRobert Gemmell <robbie@apache.org>2010-09-05 18:51:15 +0000
commit8b2e499dc048359c2bc37d1e9e36b2f8cd3cb3bc (patch)
tree214edd653b834ca5efcf0e1521937b063a85acab
parentba143bd7b07e39ce07f1fb7bbf3cd107a515b469 (diff)
downloadqpid-python-8b2e499dc048359c2bc37d1e9e36b2f8cd3cb3bc.tar.gz
QPID-2418: updates to fix test failures when using the 0-10 client test profiles.
Use a transacted session when querying for queue counts following consumption, as the 0-10 client batches auto-acks asynchronously. Always send the selector filter argument even if empty, to allow querying the brokers via 0-10 to detect whether the selector is being added/removed/modified at subscribe time. Enable the Java broker to perform argument matching during the 0-10 isBound check. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@992856 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java8
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java37
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes7
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes7
-rw-r--r--qpid/java/test-profiles/cpp.async.excludes3
11 files changed, 67 insertions, 48 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 44a677a76d..7b51b68e61 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -756,7 +756,9 @@ public class ServerSessionDelegate extends SessionDelegate
if(method.hasArguments())
{
- // TODO
+ FieldTable args = FieldTable.convertToFieldTable(method.getArguments());
+
+ result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue));
}
if(queueMatched)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b96b32d990..0f7e0b0812 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -90,6 +91,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -1066,10 +1068,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ Map<String,Object> args = new HashMap<String,Object>();
+
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+ // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+ // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
+ // argument, as specifying null for the arguments when querying means they should not be checked at all
+ args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+
+ // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
+ boolean isQueueBoundForTopicAndSelector =
+ isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args);
+
+ if (isQueueBound && !isQueueBoundForTopicAndSelector)
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1089,6 +1102,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
unsubscribe(name, true);
}
+
}
_subscriberAccess.lock();
@@ -1957,10 +1971,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
ft.addAll(rawSelector);
}
- if (messageSelector != null)
- {
- ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
- }
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+ // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+ // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
+ // argument, as specifying null for the arguments when querying means they should not be checked at all
+ ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
@@ -2091,6 +2106,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throws JMSException;
public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
+
+ public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException;
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 75db5d5673..c1021e121c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -318,11 +318,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (destination.getDestSyntax() == DestSyntax.BURL)
{
Map args = FieldTableSupport.convertToMap(arguments);
- // this is there only because the broker may expect a value for x-match
- if( ! args.containsKey("x-match") )
- {
- args.put("x-match", "any");
- }
for (AMQShortString rk: destination.getBindingKeys())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 89fbd66e71..8cca92da1f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -566,4 +566,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
{
}
+
+ public boolean isQueueBound(String exchangeName, String queueName,
+ String bindingKey, Map<String, Object> args) throws JMSException
+ {
+ return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName),
+ queueName == null ? null : new AMQShortString(queueName),
+ bindingKey == null ? null : new AMQShortString(bindingKey));
+ }
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index f7a37e4894..47c0359b94 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -29,6 +29,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import javax.jms.*;
+
import java.util.Map;
public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
@@ -188,4 +189,10 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
protected void flushAcknowledgments()
{
}
+
+ public boolean isQueueBound(String exchangeName, String queueName,
+ String bindingKey, Map<String, Object> args) throws JMSException
+ {
+ return false;
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
index afda7d4ba9..51815e2adc 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
@@ -94,7 +94,7 @@ public class BindingLoggingTest extends AbstractTestLogging
* 2. New Client requests that a Queue is bound to a new exchange.
* Output:
*
- * <date> BND-1001 : Create
+ * <date> BND-1001 : Create : Arguments : {x-filter-jms-selector=}
*
* Validation Steps:
* 3. The BND ID is correct
@@ -117,6 +117,7 @@ public class BindingLoggingTest extends AbstractTestLogging
validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName);
exchange = "direct/amq.direct";
+ message = "Create : Arguments : {x-filter-jms-selector=}";
validateLogMessage(getLogMessage(results, 1), messageID, message, exchange, queueName, queueName);
}
@@ -129,7 +130,7 @@ public class BindingLoggingTest extends AbstractTestLogging
* 2. Java Client consumes from a topic with a JMS selector.
* Output:
*
- * <date> BND-1001 : Create : Arguments : <key=value>
+ * <date> BND-1001 : Create : Arguments : {x-filter-jms-selector=<value>}
*
* Validation Steps:
* 3. The BND ID is correct
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 98d59982e5..0f799073b4 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -75,6 +75,7 @@ public class StreamMessageTest extends QpidBrokerTestCase
ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME
+ "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
FieldTable ft = new FieldTable();
+ ft.setString("x-match", "any");
ft.setString("F1000", "1");
MessageConsumer consumer =
consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 3dd3c72024..d73761d12a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -544,23 +544,22 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector","Match = False", false);
- //verify no messages are now present as changing selector should have issued
- //an unsubscribe and thus deleted the previous backing queue for the subscription.
+ //verify no messages are now recieved.
rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertNull("Should not have received message as the queue underlying the " +
- "subscription should have been cleared/deleted when the selector was changed", rMsg);
+ assertNull("Should not have received message as the selector was changed", rMsg);
// Check that new messages are received properly
sendMatchingAndNonMatchingMessage(session, producer);
rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull("Message should not be received", rMsg);
+ assertNotNull("Message should have been received", rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector2",
((TextMessage) rMsg).getText());
+
rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertNull("Message should be received",rMsg);
+ assertNull("Message should not have been received",rMsg);
session.unsubscribe("testResubscribeWithChangedSelector");
}
@@ -613,7 +612,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
{
Connection conn = getConnection();
conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector");
//create and register a durable subscriber with a message selector and then close it
@@ -630,15 +629,17 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
message.setBooleanProperty("testprop", false);
producer.send(message);
}
+ session.commit();
producer.close();
+ // should be 5 or 10 messages on queue now
+ // (5 for the java broker due to use of server side selectors, and 10 for the cpp broker due to client side selectors only)
+ AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector");
+ assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
// now recreate the durable subscriber and check the received messages
TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false);
- // should be 5 messages on queue now
- AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector");
- assertEquals("Queue depth is wrong", 5, ((AMQSession<?, ?>) session).getQueueDepth(queue));
-
for (int i = 0; i < 5; i++)
{
Message message = subTwo.receive(1000);
@@ -653,6 +654,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
}
}
+ session.commit();
+
// Check queue has no messages
assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
@@ -710,10 +713,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
msg.setBooleanProperty("Match", false);
producer.send(msg);
- // should be 1 message on queue now
+ // should be 1 or 2 messages on queue now
+ // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only)
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");
- assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue));
-
+ assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
Message rMsg = subB.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
@@ -773,9 +777,10 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
msg.setBooleanProperty("testprop", false);
producer.send(msg);
- // should be 1 message on queue now
+ // should be 1 or 2 messages on queue now
+ // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only)
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName");
- assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue));
Message rMsg = subTwo.receive(1000);
assertNotNull(rMsg);
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index fe1ce82b7e..711a3954e4 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -125,13 +125,6 @@ org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
// Temporarily adding the following until the issues are sorted out.
org.apache.qpid.test.unit.client.AMQConnectionTest#testHeartBeat
-//QPID-2418 : Not yet implemented on 0-10
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelector
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubSameMessageSelector
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelectorNoClose
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubAddMessageSelectorNoClose
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubNoSelectorResubscribeNoClose
-
//Excluded due to QPID-1447 : CPP broker does not have SlowConsumer Disconnection
org.apache.qpid.systest.GlobalQueuesTest#*
org.apache.qpid.systest.GlobalTopicsTest#*
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index 47fca90500..6da362f33a 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -62,10 +62,3 @@ org.apache.qpid.transport.network.mina.MINANetworkDriverTest#*
org.apache.qpid.test.unit.basic.LargeMessageTest#*
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchange
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode
-
-//QPID-2418 : Not yet implemented on 0-10
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelector
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubSameMessageSelector
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelectorNoClose
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubAddMessageSelectorNoClose
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubNoSelectorResubscribeNoClose
diff --git a/qpid/java/test-profiles/cpp.async.excludes b/qpid/java/test-profiles/cpp.async.excludes
index 72d79fb754..b6479a00ba 100644
--- a/qpid/java/test-profiles/cpp.async.excludes
+++ b/qpid/java/test-profiles/cpp.async.excludes
@@ -1,5 +1,2 @@
-// the C++ broker doesn't implement selectors, so they are not persisted with the subscription
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#testDurSubRestoresMessageSelector
-
// the C++ broker doesn't guarantee the order of messages on recovery
org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash