diff options
author | Robert Gemmell <robbie@apache.org> | 2010-09-05 18:51:15 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-09-05 18:51:15 +0000 |
commit | 8b2e499dc048359c2bc37d1e9e36b2f8cd3cb3bc (patch) | |
tree | 214edd653b834ca5efcf0e1521937b063a85acab | |
parent | ba143bd7b07e39ce07f1fb7bbf3cd107a515b469 (diff) | |
download | qpid-python-8b2e499dc048359c2bc37d1e9e36b2f8cd3cb3bc.tar.gz |
QPID-2418: updates to fix test failures when using the 0-10 client test profiles.
Use a transacted session when querying for queue counts following consumption, as the 0-10 client batches auto-acks asynchronously.
Always send the selector filter argument even if empty, to allow querying the brokers via 0-10 to detect whether the selector is being added/removed/modified at subscribe time.
Enable the Java broker to perform argument matching during the 0-10 isBound check.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@992856 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 67 insertions, 48 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 44a677a76d..7b51b68e61 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -756,7 +756,9 @@ public class ServerSessionDelegate extends SessionDelegate if(method.hasArguments()) { - // TODO + FieldTable args = FieldTable.convertToFieldTable(method.getArguments()); + + result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue)); } if(queueMatched) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b96b32d990..0f7e0b0812 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,6 +25,7 @@ import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -1066,10 +1068,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + Map<String,Object> args = new HashMap<String,Object>(); + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + + // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); + boolean isQueueBoundForTopicAndSelector = + isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); + + if (isQueueBound && !isQueueBoundForTopicAndSelector) { deleteQueue(dest.getAMQQueueName()); } @@ -1089,6 +1102,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { unsubscribe(name, true); } + } _subscriberAccess.lock(); @@ -1957,10 +1971,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ft.addAll(rawSelector); } - if (messageSelector != null) - { - ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); - } + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, noLocal, exclusive, messageSelector, ft, noConsume, autoClose); @@ -2091,6 +2106,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws JMSException; public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; + + public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException; /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 75db5d5673..c1021e121c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -318,11 +318,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (destination.getDestSyntax() == DestSyntax.BURL) { Map args = FieldTableSupport.convertToMap(arguments); - // this is there only because the broker may expect a value for x-match - if( ! args.containsKey("x-match") ) - { - args.put("x-match", "any"); - } for (AMQShortString rk: destination.getBindingKeys()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 89fbd66e71..8cca92da1f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -566,4 +566,12 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B { } + + public boolean isQueueBound(String exchangeName, String queueName, + String bindingKey, Map<String, Object> args) throws JMSException + { + return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName), + queueName == null ? null : new AMQShortString(queueName), + bindingKey == null ? null : new AMQShortString(bindingKey)); + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index f7a37e4894..47c0359b94 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import javax.jms.*; + import java.util.Map; public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> @@ -188,4 +189,10 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe protected void flushAcknowledgments() { } + + public boolean isQueueBound(String exchangeName, String queueName, + String bindingKey, Map<String, Object> args) throws JMSException + { + return false; + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java index afda7d4ba9..51815e2adc 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java @@ -94,7 +94,7 @@ public class BindingLoggingTest extends AbstractTestLogging * 2. New Client requests that a Queue is bound to a new exchange. * Output: * - * <date> BND-1001 : Create + * <date> BND-1001 : Create : Arguments : {x-filter-jms-selector=} * * Validation Steps: * 3. The BND ID is correct @@ -117,6 +117,7 @@ public class BindingLoggingTest extends AbstractTestLogging validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName); exchange = "direct/amq.direct"; + message = "Create : Arguments : {x-filter-jms-selector=}"; validateLogMessage(getLogMessage(results, 1), messageID, message, exchange, queueName, queueName); } @@ -129,7 +130,7 @@ public class BindingLoggingTest extends AbstractTestLogging * 2. Java Client consumes from a topic with a JMS selector. * Output: * - * <date> BND-1001 : Create : Arguments : <key=value> + * <date> BND-1001 : Create : Arguments : {x-filter-jms-selector=<value>} * * Validation Steps: * 3. The BND ID is correct diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 98d59982e5..0f799073b4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -75,6 +75,7 @@ public class StreamMessageTest extends QpidBrokerTestCase ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); FieldTable ft = new FieldTable(); + ft.setString("x-match", "any"); ft.setString("F1000", "1"); MessageConsumer consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 3dd3c72024..d73761d12a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -544,23 +544,22 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector","Match = False", false); - //verify no messages are now present as changing selector should have issued - //an unsubscribe and thus deleted the previous backing queue for the subscription. + //verify no messages are now recieved. rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); - assertNull("Should not have received message as the queue underlying the " + - "subscription should have been cleared/deleted when the selector was changed", rMsg); + assertNull("Should not have received message as the selector was changed", rMsg); // Check that new messages are received properly sendMatchingAndNonMatchingMessage(session, producer); rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); - assertNotNull("Message should not be received", rMsg); + assertNotNull("Message should have been received", rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelector2", ((TextMessage) rMsg).getText()); + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); - assertNull("Message should be received",rMsg); + assertNull("Message should not have been received",rMsg); session.unsubscribe("testResubscribeWithChangedSelector"); } @@ -613,7 +612,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase { Connection conn = getConnection(); conn.start(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector"); //create and register a durable subscriber with a message selector and then close it @@ -630,15 +629,17 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase message.setBooleanProperty("testprop", false); producer.send(message); } + session.commit(); producer.close(); + // should be 5 or 10 messages on queue now + // (5 for the java broker due to use of server side selectors, and 10 for the cpp broker due to client side selectors only) + AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); + assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + // now recreate the durable subscriber and check the received messages TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); - // should be 5 messages on queue now - AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); - assertEquals("Queue depth is wrong", 5, ((AMQSession<?, ?>) session).getQueueDepth(queue)); - for (int i = 0; i < 5; i++) { Message message = subTwo.receive(1000); @@ -653,6 +654,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase } } + session.commit(); + // Check queue has no messages assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); @@ -710,10 +713,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg.setBooleanProperty("Match", false); producer.send(msg); - // should be 1 message on queue now + // should be 1 or 2 messages on queue now + // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); - assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue)); - + assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + Message rMsg = subB.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", @@ -773,9 +777,10 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg.setBooleanProperty("testprop", false); producer.send(msg); - // should be 1 message on queue now + // should be 1 or 2 messages on queue now + // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); - assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue)); Message rMsg = subTwo.receive(1000); assertNotNull(rMsg); diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index fe1ce82b7e..711a3954e4 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -125,13 +125,6 @@ org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage // Temporarily adding the following until the issues are sorted out. org.apache.qpid.test.unit.client.AMQConnectionTest#testHeartBeat -//QPID-2418 : Not yet implemented on 0-10 -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelector -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubSameMessageSelector -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelectorNoClose -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubAddMessageSelectorNoClose -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubNoSelectorResubscribeNoClose - //Excluded due to QPID-1447 : CPP broker does not have SlowConsumer Disconnection org.apache.qpid.systest.GlobalQueuesTest#* org.apache.qpid.systest.GlobalTopicsTest#* diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 47fca90500..6da362f33a 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -62,10 +62,3 @@ org.apache.qpid.transport.network.mina.MINANetworkDriverTest#* org.apache.qpid.test.unit.basic.LargeMessageTest#* org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchange org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode - -//QPID-2418 : Not yet implemented on 0-10 -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelector -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubSameMessageSelector -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testResubscribeWithChangedSelectorNoClose -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubAddMessageSelectorNoClose -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubNoSelectorResubscribeNoClose diff --git a/qpid/java/test-profiles/cpp.async.excludes b/qpid/java/test-profiles/cpp.async.excludes index 72d79fb754..b6479a00ba 100644 --- a/qpid/java/test-profiles/cpp.async.excludes +++ b/qpid/java/test-profiles/cpp.async.excludes @@ -1,5 +1,2 @@ -// the C++ broker doesn't implement selectors, so they are not persisted with the subscription -org.apache.qpid.test.unit.ct.DurableSubscriberTest#testDurSubRestoresMessageSelector - // the C++ broker doesn't guarantee the order of messages on recovery org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash |