summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java67
1 files changed, 31 insertions, 36 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 0b1aeef8e9..a7a06a357a 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.test.unit.topic;
+import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
@@ -30,7 +35,6 @@ import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQTopicSessionAdaptor;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -306,51 +310,42 @@ public class TopicSessionTest extends QpidBrokerTestCase
}
/**
- * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
- * due to a selector can be leaked.
- * @throws Exception
+ * This tests was added to demonstrate QPID-3542. The Java Client when used with the CPP Broker was failing to
+ * ack messages received that did not match the selector. This meant the messages remained indefinitely on the Broker.
*/
- public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+ public void testNonMatchingMessagesHandledCorrectly() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- // Setup Topic
- AMQTopic topic = new AMQTopic(con, "testNoLocal");
-
- TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+ final String topicName = getName();
+ final String clientId = "clientId" + topicName;
+ final Connection con1 = getConnection();
+ final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Topic topic1 = session1.createTopic(topicName);
// Setup subscriber with selector
- TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false);
- TopicPublisher publisher = session.createPublisher(topic);
+ final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
+ final MessageProducer publisher = session1.createProducer(topic1);
- con.start();
- TextMessage m;
- TextMessage message;
+ con1.start();
// Send non-matching message
- message = session.createTextMessage("non-matching 1");
- publisher.publish(message);
- session.commit();
-
- // Send and consume matching message
- message = session.createTextMessage("hello");
- message.setStringProperty("Selector", "select");
+ final Message sentMessage = session1.createTextMessage("hello");
+ sentMessage.setStringProperty("Selector", "nonMatch");
+ publisher.send(sentMessage);
- publisher.publish(message);
- session.commit();
+ // Try to consume non-message, expect this to fail.
+ final Message message1 = subscriberWithSelector.receive(1000);
+ assertNull("should not have received message", message1);
+ subscriberWithSelector.close();
- m = (TextMessage) selector.receive(1000);
- assertNotNull("should have received message", m);
- assertEquals("Message contents were wrong", "hello", m.getText());
-
- // Send non-matching message
- message = session.createTextMessage("non-matching 2");
- publisher.publish(message);
- session.commit();
+ session1.close();
- // Assert queue count is 0
- long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
- assertEquals("Queue depth was wrong", 0, depth);
+ // Now recreate the session and subscriber (same clientid) but without selector and check that the message still
+ // is not received. This defect meant that such a message would be received.
+ final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Topic topic2 = session2.createTopic(topicName);
+ final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false);
+ final Message message2 = sameSubscriberWithoutSelector.receive(1000);
+ assertNull("still should not have received message", message2);
}
}