diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java | 67 |
1 files changed, 31 insertions, 36 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 0b1aeef8e9..a7a06a357a 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.test.unit.topic; +import javax.jms.Connection; import javax.jms.InvalidDestinationException; +import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; @@ -30,7 +35,6 @@ import javax.jms.TopicSubscriber; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQTopicSessionAdaptor; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -306,51 +310,42 @@ public class TopicSessionTest extends QpidBrokerTestCase } /** - * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber - * due to a selector can be leaked. - * @throws Exception + * This tests was added to demonstrate QPID-3542. The Java Client when used with the CPP Broker was failing to + * ack messages received that did not match the selector. This meant the messages remained indefinitely on the Broker. */ - public void testNonMatchingMessagesDoNotFillQueue() throws Exception + public void testNonMatchingMessagesHandledCorrectly() throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - // Setup Topic - AMQTopic topic = new AMQTopic(con, "testNoLocal"); - - TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE); + final String topicName = getName(); + final String clientId = "clientId" + topicName; + final Connection con1 = getConnection(); + final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Topic topic1 = session1.createTopic(topicName); // Setup subscriber with selector - TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false); - TopicPublisher publisher = session.createPublisher(topic); + final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false); + final MessageProducer publisher = session1.createProducer(topic1); - con.start(); - TextMessage m; - TextMessage message; + con1.start(); // Send non-matching message - message = session.createTextMessage("non-matching 1"); - publisher.publish(message); - session.commit(); - - // Send and consume matching message - message = session.createTextMessage("hello"); - message.setStringProperty("Selector", "select"); + final Message sentMessage = session1.createTextMessage("hello"); + sentMessage.setStringProperty("Selector", "nonMatch"); + publisher.send(sentMessage); - publisher.publish(message); - session.commit(); + // Try to consume non-message, expect this to fail. + final Message message1 = subscriberWithSelector.receive(1000); + assertNull("should not have received message", message1); + subscriberWithSelector.close(); - m = (TextMessage) selector.receive(1000); - assertNotNull("should have received message", m); - assertEquals("Message contents were wrong", "hello", m.getText()); - - // Send non-matching message - message = session.createTextMessage("non-matching 2"); - publisher.publish(message); - session.commit(); + session1.close(); - // Assert queue count is 0 - long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic); - assertEquals("Queue depth was wrong", 0, depth); + // Now recreate the session and subscriber (same clientid) but without selector and check that the message still + // is not received. This defect meant that such a message would be received. + final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Topic topic2 = session2.createTopic(topicName); + final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false); + final Message message2 = sameSubscriberWithoutSelector.receive(1000); + assertNull("still should not have received message", message2); } } |