/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.test.unit.topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.management.common.JMXConnnectionFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.InvalidDestinationException; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import javax.management.MBeanServerConnection; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import java.io.IOException; import java.util.Set; /** * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as * a static on a base test helper class, e.g. TestUtils. * * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit, * driven by the test model. */ public class DurableSubscriptionTest extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class); /** Timeout for receive() if we are expecting a message */ private static final long POSITIVE_RECEIVE_TIMEOUT = 2000; /** Timeout for receive() if we are not expecting a message */ private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000; private JMXConnector _jmxc; private MBeanServerConnection _mbsc; private static final String USER = "admin"; private static final String PASSWORD = "admin"; private boolean _jmxConnected; public void setUp() throws Exception { setConfigurationProperty("management.enabled", "true"); _jmxConnected=false; super.setUp(); } public void tearDown() throws Exception { if(_jmxConnected) { try { _jmxc.close(); } catch (IOException e) { e.printStackTrace(); } } super.tearDown(); } public void testUnsubscribe() throws Exception { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic"); _logger.info("Create Session 1"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); _logger.info("Create Consumer on Session 1"); MessageConsumer consumer1 = session1.createConsumer(topic); _logger.info("Create Producer on Session 1"); MessageProducer producer = session1.createProducer(topic); _logger.info("Create Session 2"); Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); _logger.info("Create Durable Subscriber on Session 2"); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); _logger.info("Starting connection"); con.start(); _logger.info("Producer sending message A"); producer.send(session1.createTextMessage("A")); //check the dur sub's underlying queue now has msg count 1 AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); assertEquals("Msg count should be 1", 1, ((AMQSession) session1).getQueueDepth(subQueue, true)); Message msg; _logger.info("Receive message on consumer 1:expecting A"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); _logger.info("Receive message on consumer 2:expecting A"); msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); //check the dur sub's underlying queue now has msg count 0 assertEquals("Msg count should be 0", 0, ((AMQSession) session2).getQueueDepth(subQueue, true)); consumer2.close(); _logger.info("Unsubscribe session2/consumer2"); session2.unsubscribe("MySubscription"); ((AMQSession) session2).sync(); if(isJavaBroker()) { //Verify that the queue was deleted by querying for its JMX MBean _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", getManagementPort(getPort()), USER, PASSWORD); _jmxConnected = true; _mbsc = _jmxc.getMBeanServerConnection(); //must replace the occurrence of ':' in queue name with '-' String queueObjectNameText = "clientid" + "-" + "MySubscription"; ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=" + queueObjectNameText + ",*"); Set objectInstances = _mbsc.queryNames(objName, null); if(objectInstances.size() != 0) { fail("Queue MBean was found. Expected queue to have been deleted"); } else { _logger.info("Underlying dueue for the durable subscription was confirmed deleted."); } } //verify unsubscribing the durable subscriber did not affect the non-durable one _logger.info("Producer sending message B"); producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); _logger.info("Close connection"); con.close(); } public void testDurabilityNOACK() throws Exception { durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false); } public void testDurabilityAUTOACK() throws Exception { durabilityImpl(Session.AUTO_ACKNOWLEDGE, false); } public void testDurabilityAUTOACKwithRestartIfPersistent() throws Exception { if(!isBrokerStorePersistent()) { System.out.println("The broker store is not persistent, skipping this test."); return; } durabilityImpl(Session.AUTO_ACKNOWLEDGE, true); } public void testDurabilityNOACKSessionPerConnection() throws Exception { durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE); } public void testDurabilityAUTOACKSessionPerConnection() throws Exception { durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE); } private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con, "MyTopic"); Session session1 = con.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); Session sessionProd = con.createSession(false, ackMode); MessageProducer producer = sessionProd.createProducer(topic); Session session2 = con.createSession(false, ackMode); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); con.start(); //send message A and check both consumers receive producer.send(session1.createTextMessage("A")); Message msg; _logger.info("Receive message on consumer 1 :expecting A"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); _logger.info("Receive message on consumer 2 :expecting A"); msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK) producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 1 should get message 'B'.", msg); assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); consumer2.close(); session2.close(); //Send message C, then connect consumer 3 to durable subscription and get //message B if not using NO_ACK, then receive C with consumer 1 and 3 producer.send(session1.createTextMessage("C")); Session session3 = con.createSession(false, ackMode); MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); if(ackMode == AMQSession.NO_ACKNOWLEDGE) { //Do nothing if NO_ACK was used, as prefetch means the message was dropped //when we didn't call receive() to get it before closing consumer 2 } else { _logger.info("Receive message on consumer 3 :expecting B"); msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 3 should get message 'B'.", msg); assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText()); } _logger.info("Receive message on consumer 1 :expecting C"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 1 should get message 'C'.", msg); assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); _logger.info("Receive message on consumer 3 :expecting C"); msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 3 should get message 'C'.", msg); assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); msg = consumer3.receive(500); assertNull("There should be no more messages for consumption on consumer3.", msg); consumer1.close(); consumer3.close(); session3.unsubscribe("MySubscription"); con.close(); if(restartBroker) { try { restartBroker(); } catch (Exception e) { fail("Error restarting the broker"); } } } private void durabilityImplSessionPerConnection(int ackMode) throws Exception { Message msg; // Create producer. AMQConnection con0 = (AMQConnection) getConnection("guest", "guest"); con0.start(); Session session0 = con0.createSession(false, ackMode); AMQTopic topic = new AMQTopic(con0, "MyTopic"); Session sessionProd = con0.createSession(false, ackMode); MessageProducer producer = sessionProd.createProducer(topic); // Create consumer 1. AMQConnection con1 = (AMQConnection) getConnection("guest", "guest"); con1.start(); Session session1 = con1.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); // Create consumer 2. AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); con2.start(); Session session2 = con2.createSession(false, ackMode); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); // Send message and check that both consumers get it and only it. producer.send(session0.createTextMessage("A")); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should be available", msg); assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText()); msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("There should be no more messages for consumption on consumer2.", msg); // Send message and receive on consumer 1. producer.send(session0.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertEquals("B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); // Detach the durable subscriber. consumer2.close(); session2.close(); con2.close(); // Send message C and receive on consumer 1 producer.send(session0.createTextMessage("C")); _logger.info("Receive message on consumer 1 :expecting C"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertEquals("C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK) // and also gets message C sent after it was disconnected. AMQConnection con3 = (AMQConnection) getConnection("guest", "guest"); con3.start(); Session session3 = con3.createSession(false, ackMode); TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); if(ackMode == AMQSession.NO_ACKNOWLEDGE) { //Do nothing if NO_ACK was used, as prefetch means the message was dropped //when we didn't call receive() to get it before closing consumer 2 } else { _logger.info("Receive message on consumer 3 :expecting B"); msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull(msg); assertEquals("B", ((TextMessage) msg).getText()); } _logger.info("Receive message on consumer 3 :expecting C"); msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Consumer 3 should get message 'C'.", msg); assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("There should be no more messages for consumption on consumer3.", msg); consumer1.close(); consumer3.close(); session3.unsubscribe("MySubscription"); con0.close(); con1.close(); con3.close(); } /** * This tests the fix for QPID-1085 * Creates a durable subscriber with an invalid selector, checks that the * exception is thrown correctly and that the subscription is not created. * @throws Exception */ public void testDurableWithInvalidSelector() throws Exception { Connection conn = getConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); AMQTopic topic = new AMQTopic((AMQConnection) conn, "MyTestDurableWithInvalidSelectorTopic"); MessageProducer producer = session.createProducer(topic); producer.send(session.createTextMessage("testDurableWithInvalidSelector1")); try { TopicSubscriber deadSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub", "=TEST 'test", true); assertNull("Subscriber should not have been created", deadSubscriber); } catch (JMSException e) { assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException); } TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub"); assertNotNull("Subscriber should have been created", liveSubscriber); producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull ("Message should have been received", msg); assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); liveSubscriber.close(); session.unsubscribe("testDurableWithInvalidSelectorSub"); } /** * This tests the fix for QPID-1085 * Creates a durable subscriber with an invalid destination, checks that the * exception is thrown correctly and that the subscription is not created. * @throws Exception */ public void testDurableWithInvalidDestination() throws Exception { Connection conn = getConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); AMQTopic topic = new AMQTopic((AMQConnection) conn, "testDurableWithInvalidDestinationTopic"); try { TopicSubscriber deadSubscriber = session.createDurableSubscriber(null, "testDurableWithInvalidDestinationsub"); assertNull("Subscriber should not have been created", deadSubscriber); } catch (InvalidDestinationException e) { // This was expected } MessageProducer producer = session.createProducer(topic); producer.send(session.createTextMessage("testDurableWithInvalidSelector1")); TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidDestinationsub"); assertNotNull("Subscriber should have been created", liveSubscriber); producer.send(session.createTextMessage("testDurableWithInvalidSelector2")); Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull ("Message should have been received", msg); assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText()); assertNull("Should not receive subsequent message", liveSubscriber.receive(200)); session.unsubscribe("testDurableWithInvalidDestinationsub"); } /** * Creates a durable subscription with a selector, then changes that selector on resubscription *

* QPID-1202, QPID-2418 */ public void testResubscribeWithChangedSelector() throws Exception { Connection conn = getConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelector"); MessageProducer producer = session.createProducer(topic); // Create durable subscriber that matches A TopicSubscriber subA = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector", "Match = True", false); // Send 1 matching message and 1 non-matching message sendMatchingAndNonMatchingMessage(session, producer); Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelector1", ((TextMessage) rMsg).getText()); rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull(rMsg); // Disconnect subscriber subA.close(); // Reconnect with new selector that matches B TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector","Match = False", false); //verify no messages are now recieved. rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("Should not have received message as the selector was changed", rMsg); // Check that new messages are received properly sendMatchingAndNonMatchingMessage(session, producer); rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received", rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelector2", ((TextMessage) rMsg).getText()); rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("Message should not have been received",rMsg); session.unsubscribe("testResubscribeWithChangedSelector"); } public void testDurableSubscribeWithTemporaryTopic() throws Exception { Connection conn = getConnection(); conn.start(); Session ssn = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = ssn.createTemporaryTopic(); try { ssn.createDurableSubscriber(topic, "test"); fail("expected InvalidDestinationException"); } catch (InvalidDestinationException ex) { // this is expected } try { ssn.createDurableSubscriber(topic, "test", null, false); fail("expected InvalidDestinationException"); } catch (InvalidDestinationException ex) { // this is expected } } private void sendMatchingAndNonMatchingMessage(Session session, MessageProducer producer) throws JMSException { TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelector1"); msg.setBooleanProperty("Match", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelector2"); msg.setBooleanProperty("Match", false); producer.send(msg); } /** * create and register a durable subscriber with a message selector and then close it * create a publisher and send 5 right messages and 5 wrong messages * create another durable subscriber with the same selector and name * check messages are still there *

* QPID-2418 */ public void testDurSubSameMessageSelector() throws Exception { Connection conn = getConnection(); conn.start(); Session session = conn.createSession(true, Session.SESSION_TRANSACTED); AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector"); //create and register a durable subscriber with a message selector and then close it TopicSubscriber subOne = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); subOne.close(); MessageProducer producer = session.createProducer(topic); for (int i = 0; i < 5; i++) { Message message = session.createMessage(); message.setBooleanProperty("testprop", true); producer.send(message); message = session.createMessage(); message.setBooleanProperty("testprop", false); producer.send(message); } session.commit(); producer.close(); // should be 5 or 10 messages on queue now // (5 for the java broker due to use of server side selectors, and 10 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession) session).getQueueDepth(queue, true)); // now recreate the durable subscriber and check the received messages TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); for (int i = 0; i < 5; i++) { Message message = subTwo.receive(1000); if (message == null) { fail("sameMessageSelector test failed. no message was returned"); } else { assertEquals("sameMessageSelector test failed. message selector not reset", "true", message.getStringProperty("testprop")); } } session.commit(); // Check queue has no messages if (isJavaBroker()) { assertEquals("Queue should be empty", 0, ((AMQSession) session).getQueueDepth(queue)); } else { assertTrue("At most the queue should have only 1 message", ((AMQSession) session).getQueueDepth(queue) <= 1); } // Unsubscribe session.unsubscribe("sameMessageSelector"); conn.close(); } /** *

*

* QPID-2418 */ public void testResubscribeWithChangedSelectorNoClose() throws Exception { Connection conn = getConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorNoClose"); // Create durable subscriber that matches A TopicSubscriber subA = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelectorNoClose", "Match = True", false); // Reconnect with new selector that matches B TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelectorNoClose", "Match = false", false); // First subscription has been closed try { subA.receive(1000); fail("First subscription was not closed"); } catch (Exception e) { e.printStackTrace(); } conn.stop(); // Send 1 matching message and 1 non-matching message MessageProducer producer = session.createProducer(topic); TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); msg.setBooleanProperty("Match", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); // should be 1 or 2 messages on queue now // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession) session).getQueueDepth(queue, true)); conn.start(); Message rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart2", ((TextMessage) rMsg).getText()); rMsg = subB.receive(1000); assertNull(rMsg); // Check queue has no messages assertEquals("Queue should be empty", 0, ((AMQSession) session).getQueueDepth(queue, true)); conn.close(); } /** *

*

* QPID-2418 */ public void testDurSubAddMessageSelectorNoClose() throws Exception { Connection conn = getConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName"); // create and register a durable subscriber with no message selector TopicSubscriber subOne = session.createDurableSubscriber(topic, "subscriptionName", null, false); // now create a durable subscriber with a selector TopicSubscriber subTwo = session.createDurableSubscriber(topic, "subscriptionName", "testprop = TRUE", false); // First subscription has been closed try { subOne.receive(1000); fail("First subscription was not closed"); } catch (Exception e) { e.printStackTrace(); } conn.stop(); // Send 1 matching message and 1 non-matching message MessageProducer producer = session.createProducer(topic); TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); msg.setBooleanProperty("testprop", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("testprop", false); producer.send(msg); // should be 1 or 2 messages on queue now // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession) session).getQueueDepth(queue, true)); conn.start(); Message rMsg = subTwo.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart1", ((TextMessage) rMsg).getText()); rMsg = subTwo.receive(1000); assertNull(rMsg); // Check queue has no messages assertEquals("Queue should be empty", 0, ((AMQSession) session).getQueueDepth(queue, true)); conn.close(); } /** *

*

* QPID-2418 */ public void testDurSubNoSelectorResubscribeNoClose() throws Exception { Connection conn = getConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName"); // create and register a durable subscriber with no message selector session.createDurableSubscriber(topic, "subscriptionName", null, false); // try to recreate the durable subscriber try { session.createDurableSubscriber(topic, "subscriptionName", null, false); fail("Subscription should not have been created"); } catch (Exception e) { e.printStackTrace(); } } /** * Tests that a subscriber created on a same session as producer with * no local true does not receive messages. */ public void testNoLocalOnSameSession() throws Exception { Connection connection = getConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(getTestQueueName()); MessageProducer producer = session.createProducer(topic); TopicSubscriber subscriber = null; try { subscriber = session.createDurableSubscriber(topic, getTestName(), null, true); connection.start(); producer.send(createNextMessage(session, 1)); Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("Unexpected message received", m); } finally { session.unsubscribe(getTestName()); } } /** * Tests that a subscriber created on a same connection but separate * sessionM as producer with no local true does not receive messages. */ public void testNoLocalOnSameConnection() throws Exception { Connection connection = getConnection(); Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = consumerSession.createTopic(getTestQueueName()); MessageProducer producer = producerSession.createProducer(topic); TopicSubscriber subscriber = null; try { subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); connection.start(); producer.send(createNextMessage(producerSession, 1)); Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("Unexpected message received", m); } finally { consumerSession.unsubscribe(getTestName()); } } /** * Tests that if no-local is in use, that the messages are delivered when * the client reconnects. * * Currently fails on the Java Broker due to QPID-3605. */ public void testNoLocalMessagesNotDeliveredAfterReconnection() throws Exception { Connection connection = getConnection(); Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = consumerSession.createTopic(getTestQueueName()); MessageProducer producer = producerSession.createProducer(topic); TopicSubscriber subscriber = null; try { subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); connection.start(); producer.send(createNextMessage(producerSession, 1)); Message m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("Unexpected message received", m); connection.close(); connection = getConnection(); consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); subscriber = consumerSession.createDurableSubscriber(topic, getTestName(), null, true); connection.start(); m = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("Message should not be received on a new connection", m); } finally { consumerSession.unsubscribe(getTestName()); } } /** * Tests that messages are delivered normally to a subscriber on a separate connection despite * the use of durable subscriber with no-local on the first connection. */ public void testNoLocalSubscriberAndSubscriberOnSeparateConnection() throws Exception { Connection noLocalConnection = getConnection(); Connection connection = getConnection(); String noLocalSubId1 = getTestName() + "subId1"; String subId = getTestName() + "subId2"; Session noLocalSession = noLocalConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic noLocalTopic = noLocalSession.createTopic(getTestQueueName()); Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = consumerSession.createTopic(getTestQueueName()); TopicSubscriber noLocalSubscriber = null; TopicSubscriber subscriber = null; try { MessageProducer producer = noLocalSession.createProducer(noLocalTopic); noLocalSubscriber = noLocalSession.createDurableSubscriber(noLocalTopic, noLocalSubId1, null, true); subscriber = consumerSession.createDurableSubscriber(topic, subId, null, true); noLocalConnection.start(); connection.start(); producer.send(createNextMessage(noLocalSession, 1)); Message m1 = noLocalSubscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("Subscriber on nolocal connection should not receive message", m1); Message m2 = subscriber.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNotNull("Subscriber on non-nolocal connection should receive message", m2); } finally { noLocalSession.unsubscribe(noLocalSubId1); consumerSession.unsubscribe(subId); } } }