diff options
Diffstat (limited to 'M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java')
-rw-r--r-- | M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java | 419 |
1 files changed, 0 insertions, 419 deletions
diff --git a/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java deleted file mode 100644 index 9f4c9e53aa..0000000000 --- a/M4-RCs/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.topic; - -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQTopicSessionAdaptor; -import org.apache.qpid.test.utils.QpidTestCase; - - -/** @author Apache Software Foundation */ -public class TopicSessionTest extends QpidTestCase -{ - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - - public void testTopicSubscriptionUnsubscription() throws Exception - { - - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic"); - TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); - TopicPublisher publisher = session1.createPublisher(topic); - - con.start(); - - TextMessage tm = session1.createTextMessage("Hello"); - publisher.publish(tm); - - tm = (TextMessage) sub.receive(2000); - assertNotNull(tm); - - session1.unsubscribe("subscription0"); - - try - { - session1.unsubscribe("not a subscription"); - fail("expected InvalidDestinationException when unsubscribing from unknown subscription"); - } - catch (InvalidDestinationException e) - { - ; // PASS - } - catch (Exception e) - { - fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e); - } - - con.close(); - } - - public void testSubscriptionNameReuseForDifferentTopicSingleConnection() throws Exception - { - subscriptionNameReuseForDifferentTopic(false); - } - - public void testSubscriptionNameReuseForDifferentTopicTwoConnections() throws Exception - { - subscriptionNameReuseForDifferentTopic(true); - } - - private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown)); - AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown)); - - TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); - TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); - TopicPublisher publisher = session1.createPublisher(null); - - con.start(); - - publisher.publish(topic, session1.createTextMessage("hello")); - TextMessage m = (TextMessage) sub.receive(2000); - assertNotNull(m); - - if (shutdown) - { - session1.close(); - con.close(); - con = (AMQConnection) getConnection("guest", "guest"); - con.start(); - session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - publisher = session1.createPublisher(null); - } - TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0"); - publisher.publish(topic, session1.createTextMessage("hello")); - if (!shutdown) - { - m = (TextMessage) sub.receive(2000); - assertNull(m); - } - publisher.publish(topic2, session1.createTextMessage("goodbye")); - m = (TextMessage) sub2.receive(2000); - assertNotNull(m); - assertEquals("goodbye", m.getText()); - session1.unsubscribe("subscription0"); - con.close(); - } - - public void testUnsubscriptionAfterConnectionClose() throws Exception - { - AMQConnection con1 = (AMQConnection) getConnection("guest", "guest", "clientid"); - AMQTopic topic = new AMQTopic(con1, "MyTopic3"); - - TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); - TopicPublisher publisher = session1.createPublisher(topic); - - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "clientid"); - TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); - TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); - - con2.start(); - - publisher.publish(session1.createTextMessage("Hello")); - TextMessage tm = (TextMessage) sub.receive(2000); - assertNotNull(tm); - con2.close(); - publisher.publish(session1.createTextMessage("Hello2")); - con2 = (AMQConnection) getConnection("guest", "guest", "clientid"); - session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - sub = session2.createDurableSubscriber(topic, "subscription0"); - con2.start(); - tm = (TextMessage) sub.receive(2000); - assertNotNull(tm); - assertEquals("Hello2", tm.getText()); - session2.unsubscribe("subscription0"); - con1.close(); - con2.close(); - } - - public void testTextMessageCreation() throws Exception - { - - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - AMQTopic topic = new AMQTopic(con, "MyTopic4"); - TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - TopicPublisher publisher = session1.createPublisher(topic); - MessageConsumer consumer1 = session1.createConsumer(topic); - con.start(); - TextMessage tm = session1.createTextMessage("Hello"); - publisher.publish(tm); - tm = (TextMessage) consumer1.receive(10000L); - assertNotNull(tm); - String msgText = tm.getText(); - assertEquals("Hello", msgText); - tm = session1.createTextMessage(); - msgText = tm.getText(); - assertNull(msgText); - publisher.publish(tm); - tm = (TextMessage) consumer1.receive(10000L); - assertNotNull(tm); - msgText = tm.getText(); - assertNull(msgText); - tm.clearBody(); - tm.setText("Now we are not null"); - publisher.publish(tm); - tm = (TextMessage) consumer1.receive(2000); - assertNotNull(tm); - msgText = tm.getText(); - assertEquals("Now we are not null", msgText); - - tm = session1.createTextMessage(""); - msgText = tm.getText(); - assertEquals("Empty string not returned", "", msgText); - publisher.publish(tm); - tm = (TextMessage) consumer1.receive(2000); - assertNotNull(tm); - assertEquals("Empty string not returned", "", msgText); - con.close(); - } - - public void testSendingSameMessage() throws Exception - { - AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryTopic topic = session.createTemporaryTopic(); - assertNotNull(topic); - TopicPublisher producer = session.createPublisher(topic); - MessageConsumer consumer = session.createConsumer(topic); - conn.start(); - TextMessage sentMessage = session.createTextMessage("Test Message"); - producer.send(sentMessage); - TextMessage receivedMessage = (TextMessage) consumer.receive(2000); - assertNotNull(receivedMessage); - assertEquals(sentMessage.getText(), receivedMessage.getText()); - producer.send(sentMessage); - receivedMessage = (TextMessage) consumer.receive(2000); - assertNotNull(receivedMessage); - assertEquals(sentMessage.getText(), receivedMessage.getText()); - - conn.close(); - - } - - public void testTemporaryTopic() throws Exception - { - AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryTopic topic = session.createTemporaryTopic(); - assertNotNull(topic); - TopicPublisher producer = session.createPublisher(topic); - MessageConsumer consumer = session.createConsumer(topic); - conn.start(); - producer.send(session.createTextMessage("hello")); - TextMessage tm = (TextMessage) consumer.receive(2000); - assertNotNull(tm); - assertEquals("hello", tm.getText()); - - try - { - topic.delete(); - fail("Expected JMSException : should not be able to delete while there are active consumers"); - } - catch (JMSException je) - { - ; //pass - } - - consumer.close(); - - try - { - topic.delete(); - } - catch (JMSException je) - { - fail("Unexpected Exception: " + je.getMessage()); - } - - TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - try - { - session2.createConsumer(topic); - fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session"); - } - catch (JMSException je) - { - ; // pass - } - - - conn.close(); - } - - - public void testNoLocal() throws Exception - { - - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - AMQTopic topic = new AMQTopic(con, "testNoLocal"); - - TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - TopicSubscriber noLocal = session1.createSubscriber(topic, "", true); - TopicSubscriber select = session1.createSubscriber(topic, "Selector = 'select'", false); - TopicSubscriber normal = session1.createSubscriber(topic); - - TopicPublisher publisher = session1.createPublisher(topic); - - con.start(); - TextMessage m; - TextMessage message; - - //send message to all consumers - publisher.publish(session1.createTextMessage("hello-new2")); - - //test normal subscriber gets message - m = (TextMessage) normal.receive(1000); - assertNotNull(m); - - //test selector subscriber doesn't message - m = (TextMessage) select.receive(1000); - assertNull(m); - - //test nolocal subscriber doesn't message - m = (TextMessage) noLocal.receive(1000); - if (m != null) - { - System.out.println("Message:" + m.getText()); - } - assertNull(m); - - //send message to all consumers - message = session1.createTextMessage("hello2"); - message.setStringProperty("Selector", "select"); - - publisher.publish(message); - - //test normal subscriber gets message - m = (TextMessage) normal.receive(1000); - assertNotNull(m); - - //test selector subscriber does get message - m = (TextMessage) select.receive(1000); - assertNotNull(m); - - //test nolocal subscriber doesn't message - m = (TextMessage) noLocal.receive(100); - assertNull(m); - - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest", "foo"); - TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - TopicPublisher publisher2 = session2.createPublisher(topic); - - - message = session2.createTextMessage("hello2"); - message.setStringProperty("Selector", "select"); - - publisher2.publish(message); - - //test normal subscriber gets message - m = (TextMessage) normal.receive(1000); - assertNotNull(m); - - //test selector subscriber does get message - m = (TextMessage) select.receive(1000); - assertNotNull(m); - - //test nolocal subscriber does message - m = (TextMessage) noLocal.receive(100); - assertNotNull(m); - - - con.close(); - con2.close(); - } - - /** - * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber - * due to a selector can be leaked. - * @throws Exception - */ - public void testNonMatchingMessagesDoNotFillQueue() throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - // Setup Topic - AMQTopic topic = new AMQTopic(con, "testNoLocal"); - - TopicSession session = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - - // Setup subscriber with selector - TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false); - TopicPublisher publisher = session.createPublisher(topic); - - con.start(); - TextMessage m; - TextMessage message; - - // Send non-matching message - message = session.createTextMessage("non-matching 1"); - publisher.publish(message); - - // Send and consume matching message - message = session.createTextMessage("hello"); - message.setStringProperty("Selector", "select"); - - publisher.publish(message); - - m = (TextMessage) selector.receive(1000); - assertNotNull("should have received message", m); - assertEquals("Message contents were wrong", "hello", m.getText()); - - // Send non-matching message - message = session.createTextMessage("non-matching 2"); - publisher.publish(message); - - // Assert queue count is 0 - long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic); - assertEquals("Queue depth was wrong", 0, depth); - - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TopicSessionTest.class); - } -} |