diff options
author | Keith Wall <kwall@apache.org> | 2011-10-06 18:36:38 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2011-10-06 18:36:38 +0000 |
commit | a383c2e4f5acb3654ce8e927d7035752e75674e9 (patch) | |
tree | 6fb9ca35944f0c3e37e794b7e8115a58f6e34961 | |
parent | b60aee6f1f3e167fd42d343cd33235b07b5ffe6b (diff) | |
download | qpid-python-a383c2e4f5acb3654ce8e927d7035752e75674e9.tar.gz |
QPID-2442: Make 0-10 TemporaryQueue#delete and TemporaryTopic#delete perform the delete on the Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179754 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 333 insertions, 299 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 30c7403a90..4f7d344655 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2772,6 +2772,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + /** + * Undeclares the specified temporary queue/topic. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param amqQueue The name of the temporary destination to delete. + * + * @throws JMSException If the queue could not be deleted for any reason. + * @todo Be aware of possible changes to parameter order as versions change. + */ + protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException + { + deleteQueue(amqQueue.getAMQQueueName()); + } + public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; private long getNextProducerId() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index ccb2b00947..369c8a6e9d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory; public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { - /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -92,7 +91,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. + * @param acknowledgeMode The acknowledgement mode for the session. * @param messageFactoryRegistry The message factory factory for the session. * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. @@ -110,7 +109,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B * @param con The connection on which to create the session. * @param channelId The unique identifier for the session. * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. + * @param acknowledgeMode The acknowledgement mode for the session. * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. */ @@ -169,7 +168,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B // we also need to check the state manager for 08/09 as the // _connection variable may not be updated in time by the error receiving // thread. - // We can't close the session if we are alreadying in the process of + // We can't close the session if we are already in the process of // closing/closed the connection. if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED) @@ -605,6 +604,18 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } + @Override + protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) + throws JMSException + { + // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted + // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects). + // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the + // client explicitly deletes it. + + /* intentional no-op */ + } + public boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String, Object> args) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index f54cb782c8..28f838057e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -20,14 +20,13 @@ */ package org.apache.qpid.client; +import java.util.UUID; + import javax.jms.JMSException; import javax.jms.TemporaryQueue; import org.apache.qpid.framing.AMQShortString; -import java.util.Random; -import java.util.UUID; - /** AMQ implementation of a TemporaryQueue. */ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination { @@ -50,11 +49,15 @@ final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, Tempor { throw new JMSException("Temporary Queue has consumers so cannot be deleted"); } - _deleted = true; - // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted - // by the server when there are no more subscriptions to that queue. This is probably not - // quite right for JMSCompliance. + try + { + _session.deleteTemporaryDestination(this); + } + finally + { + _deleted = true; + } } public AMQSession getSession() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java index 7b5781530b..db54b320dc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java @@ -53,10 +53,14 @@ class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDes throw new JMSException("Temporary Topic has consumers so cannot be deleted"); } - _deleted = true; - // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted - // by the server when there are no more subscriptions to that queue. This is probably not - // quite right for JMSCompliance. + try + { + _session.deleteTemporaryDestination(this); + } + finally + { + _deleted = true; + } } public AMQSession getSession() diff --git a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java index 7f8e80c73a..ca137f5a51 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java @@ -24,13 +24,16 @@ package org.apache.qpid.client; import javax.jms.Destination; import javax.jms.JMSException; +import org.apache.qpid.framing.AMQShortString; + /** - * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue + * Provides support for convenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue * so that operations related to their "temporary-ness" can be abstracted out. */ interface TemporaryDestination extends Destination { + public AMQShortString getAMQQueueName(); public void delete() throws JMSException; public AMQSession getSession(); public boolean isDeleted(); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index 8c806fa2da..c98e403671 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -22,237 +22,145 @@ package org.apache.qpid.test.unit.client.temporaryqueue; import javax.jms.Connection; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; -import junit.framework.Assert; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.jms.ConnectionListener; -import java.util.ArrayList; -import java.util.List; -import java.util.LinkedList; - -public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener +/** + * Tests the behaviour of {@link TemporaryQueue}. + */ +public class TemporaryQueueTest extends QpidBrokerTestCase { - private List<Exception> _exceptions = new ArrayList<Exception>(); - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception + /** + * Tests the basic produce/consume behaviour of a temporary queue. + */ + public void testMessageDeliveryUsingTemporaryQueue() throws Exception { - super.tearDown(); - } - - protected Connection createConnection() throws Exception - { - return getConnection("guest", "guest"); - } - - public void testTemporaryQueue() throws Exception - { - Connection conn = createConnection(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue queue = session.createTemporaryQueue(); + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session.createTemporaryQueue(); assertNotNull(queue); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); + final MessageProducer producer = session.createProducer(queue); + final MessageConsumer consumer = session.createConsumer(queue); conn.start(); producer.send(session.createTextMessage("hello")); TextMessage tm = (TextMessage) consumer.receive(2000); - assertNotNull(tm); + assertNotNull("Message not received", tm); assertEquals("hello", tm.getText()); + } - try - { - queue.delete(); - fail("Expected JMSException : should not be able to delete while there are active consumers"); - } - catch (JMSException je) - { - ; //pass - } - - consumer.close(); + /** + * Tests that a temporary queue cannot be used by another {@link Session}. + */ + public void testUseFromAnotherSessionProhibited() throws Exception + { + final Connection conn = getConnection(); + final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session1.createTemporaryQueue(); + assertNotNull(queue); try { - queue.delete(); + session2.createConsumer(queue); + fail("Expected a JMSException when subscribing to a temporary queue created on a different session"); } catch (JMSException je) { - fail("Unexpected Exception: " + je.getMessage()); + //pass + assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage()); } - - conn.close(); - } - - public void tUniqueness() throws Exception - { - int numProcs = Runtime.getRuntime().availableProcessors(); - final int threadsProc = 5; - - runUniqueness(1, 10); - runUniqueness(numProcs * threadsProc, 10); - runUniqueness(numProcs * threadsProc, 100); - runUniqueness(numProcs * threadsProc, 500); } - void runUniqueness(int makers, int queues) throws Exception + /** + * Tests that the client is able to explicitly delete a temporary queue using + * {@link TemporaryQueue#delete()} and is prevented from deleting one that + * still has consumers. + * + * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted + * on the client. 0-10 causes the queue to be deleted from the Broker. + */ + public void testExplictTemporaryQueueDeletion() throws Exception { - Connection connection = createConnection(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>(); - - //Create Makers - for (int m = 0; m < makers; m++) - { - tqList.add(new TempQueueMaker(session, queues)); - } - - - List<Thread> threadList = new LinkedList<Thread>(); - - //Create Makers - for (TempQueueMaker maker : tqList) - { - threadList.add(new Thread(maker)); - } - - //Start threads - for (Thread thread : threadList) - { - thread.start(); - } - - // Join Threads - for (Thread thread : threadList) - { - try - { - thread.join(); - } - catch (InterruptedException e) - { - fail("Couldn't correctly join threads"); - } - } - + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + final MessageConsumer consumer = session.createConsumer(queue); + conn.start(); - List<AMQQueue> list = new LinkedList<AMQQueue>(); + assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue)); - // Test values - for (TempQueueMaker maker : tqList) + try { - check(maker, list); + queue.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); } - - Assert.assertEquals("Not enough queues made.", makers * queues, list.size()); - - connection.close(); - } - - private void check(TempQueueMaker tq, List<AMQQueue> list) - { - for (AMQQueue q : tq.getList()) + catch (JMSException je) { - if (list.contains(q)) - { - fail(q + " already exists."); - } - else - { - list.add(q); - } + //pass + assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage()); } - } - - - class TempQueueMaker implements Runnable - { - List<AMQQueue> _queues; - Session _session; - private int _count; + consumer.close(); + // Now deletion should succeed. + queue.delete(); - TempQueueMaker(Session session, int queues) throws JMSException + try { - _queues = new LinkedList<AMQQueue>(); - - _count = queues; - - _session = session; + session.createConsumer(queue); + fail("Exception not thrown"); } - - public void run() + catch (JMSException je) { - int i = 0; - try - { - for (; i < _count; i++) - { - _queues.add((AMQQueue) _session.createTemporaryQueue()); - } - } - catch (JMSException jmse) - { - //stop - } + //pass + assertEquals("Cannot consume from a deleted destination", je.getMessage()); } - List<AMQQueue> getList() + if (isBroker010()) { - return _queues; + assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue)); } } - public void testQPID1217() throws Exception - { - Connection conA = getConnection(); - conA.setExceptionListener(this); - Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue temp = sessA.createTemporaryQueue(); - - MessageProducer prod = sessA.createProducer(temp); - prod.send(sessA.createTextMessage("hi")); - - Thread.sleep(500); - assertTrue("Exception received", _exceptions.isEmpty()); - - Connection conB = getConnection(); - Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE); - - JMSException ex = null; - try - { - MessageConsumer consB = sessB.createConsumer(temp); - } - catch (JMSException e) - { - ex = e; - } - assertNotNull(ex); - } - - public static junit.framework.Test suite() + /** + * Tests that a temporary queue remains available for reuse even after its initial + * consumer has disconnected. + * + * This test would fail under < 0-10 as their temporary queues are deleted automatically + * (broker side) after the last consumer disconnects (so message2 would be lost). For this + * reason this test is excluded from those profiles. + */ + public void testTemporaryQueueReused() throws Exception { - return new junit.framework.TestSuite(TemporaryQueueTest.class); - } + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); - public void onException(JMSException arg0) - { - _exceptions.add(arg0); + final MessageProducer producer1 = session.createProducer(queue); + final MessageConsumer consumer1 = session.createConsumer(queue); + conn.start(); + producer1.send(session.createTextMessage("message1")); + producer1.send(session.createTextMessage("message2")); + TextMessage tm = (TextMessage) consumer1.receive(2000); + assertNotNull("Message not received by first consumer", tm); + assertEquals("message1", tm.getText()); + consumer1.close(); + + final MessageConsumer consumer2 = session.createConsumer(queue); + conn.start(); + tm = (TextMessage) consumer2.receive(2000); + assertNotNull("Message not received by second consumer", tm); + assertEquals("message2", tm.getText()); + consumer2.close(); } - } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java new file mode 100644 index 0000000000..c89b13a0f9 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TemporaryTopicTest.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.topic; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + + +/** + * Tests the behaviour of {@link TemporaryTopic}. + */ +public class TemporaryTopicTest extends QpidBrokerTestCase +{ + /** + * Tests the basic publish/subscribe behaviour of a temporary topic. Single + * message is sent to two subscribers. + */ + public void testMessageDeliveryUsingTemporaryTopic() throws Exception + { + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + final MessageProducer producer = session.createProducer(topic); + final MessageConsumer consumer1 = session.createConsumer(topic); + final MessageConsumer consumer2 = session.createConsumer(topic); + conn.start(); + producer.send(session.createTextMessage("hello")); + + final TextMessage tm1 = (TextMessage) consumer1.receive(2000); + final TextMessage tm2 = (TextMessage) consumer2.receive(2000); + + assertNotNull("Message not received by subscriber1", tm1); + assertEquals("hello", tm1.getText()); + assertNotNull("Message not received by subscriber2", tm2); + assertEquals("hello", tm2.getText()); + } + + /** + * Tests that the client is able to explicitly delete a temporary topic using + * {@link TemporaryTopic#delete()} and is prevented from deleting one that + * still has consumers. + * + * Note: Under < 0-10 {@link TemporaryTopic#delete()} only marks the queue as deleted + * on the client. 0-10 causes the topic to be deleted from the Broker. + */ + public void testExplictTemporaryTopicDeletion() throws Exception + { + final Connection conn = getConnection(); + + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + final MessageConsumer consumer = session.createConsumer(topic); + conn.start(); + try + { + topic.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); + } + catch (JMSException je) + { + //pass + assertEquals("Temporary Topic has consumers so cannot be deleted", je.getMessage()); + } + + consumer.close(); + + // Now deletion should succeed. + topic.delete(); + + try + { + session.createConsumer(topic); + fail("Exception not thrown"); + } + catch (JMSException je) + { + //pass + assertEquals("Cannot consume from a deleted destination", je.getMessage()); + } + } + + /** + * Tests that a temporary topic cannot be used by another {@link Session}. + */ + public void testUseFromAnotherSessionProhibited() throws Exception + { + final Connection conn = getConnection(); + final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session1.createTemporaryTopic(); + + try + { + session2.createConsumer(topic); + fail("Expected a JMSException when subscribing to a temporary topic created on a different session"); + } + catch (JMSException je) + { + // pass + assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage()); + } + } + + /** + * Tests that the client is prohibited from creating a durable subscriber for a temporary + * queue. + */ + public void testDurableSubscriptionProhibited() throws Exception + { + final Connection conn = getConnection(); + + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + try + { + session.createDurableSubscriber(topic, null); + fail("Expected JMSException : should not be able to create durable subscription from temp topic"); + } + catch (JMSException je) + { + //pass + assertEquals("Cannot create a durable subscription with a temporary topic: " + topic.toString(), je.getMessage()); + } + } + + /** + * Tests that a temporary topic remains available for reuse even after its initial + * subscribers have disconnected. + */ + public void testTemporaryTopicReused() throws Exception + { + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryTopic topic = session.createTemporaryTopic(); + assertNotNull(topic); + + final MessageProducer producer = session.createProducer(topic); + final MessageConsumer consumer1 = session.createConsumer(topic); + conn.start(); + producer.send(session.createTextMessage("message1")); + TextMessage tm = (TextMessage) consumer1.receive(2000); + assertNotNull("Message not received by first consumer", tm); + assertEquals("message1", tm.getText()); + consumer1.close(); + + final MessageConsumer consumer2 = session.createConsumer(topic); + conn.start(); + producer.send(session.createTextMessage("message2")); + tm = (TextMessage) consumer2.receive(2000); + assertNotNull("Message not received by second consumer", tm); + assertEquals("message2", tm.getText()); + consumer2.close(); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index eee232e113..0b1aeef8e9 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -21,10 +21,7 @@ package org.apache.qpid.test.unit.topic; import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.TopicPublisher; import javax.jms.TopicSession; @@ -40,18 +37,6 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; /** @author Apache Software Foundation */ public class TopicSessionTest extends QpidBrokerTestCase { - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - public void testTopicSubscriptionUnsubscription() throws Exception { @@ -228,83 +213,6 @@ public class TopicSessionTest extends QpidBrokerTestCase con.close(); } - public void testSendingSameMessage() throws Exception - { - AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); - TemporaryTopic topic = session.createTemporaryTopic(); - assertNotNull(topic); - TopicPublisher producer = session.createPublisher(topic); - MessageConsumer consumer = session.createConsumer(topic); - conn.start(); - TextMessage sentMessage = session.createTextMessage("Test Message"); - producer.send(sentMessage); - session.commit(); - TextMessage receivedMessage = (TextMessage) consumer.receive(2000); - assertNotNull(receivedMessage); - assertEquals(sentMessage.getText(), receivedMessage.getText()); - producer.send(sentMessage); - session.commit(); - receivedMessage = (TextMessage) consumer.receive(2000); - assertNotNull(receivedMessage); - assertEquals(sentMessage.getText(), receivedMessage.getText()); - session.commit(); - conn.close(); - - } - - public void testTemporaryTopic() throws Exception - { - AMQConnection conn = (AMQConnection) getConnection("guest", "guest"); - TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); - TemporaryTopic topic = session.createTemporaryTopic(); - assertNotNull(topic); - TopicPublisher producer = session.createPublisher(topic); - MessageConsumer consumer = session.createConsumer(topic); - conn.start(); - producer.send(session.createTextMessage("hello")); - session.commit(); - TextMessage tm = (TextMessage) consumer.receive(2000); - assertNotNull(tm); - assertEquals("hello", tm.getText()); - session.commit(); - try - { - topic.delete(); - fail("Expected JMSException : should not be able to delete while there are active consumers"); - } - catch (JMSException je) - { - ; //pass - } - - consumer.close(); - - try - { - topic.delete(); - } - catch (JMSException je) - { - fail("Unexpected Exception: " + je.getMessage()); - } - - TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - try - { - session2.createConsumer(topic); - fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session"); - } - catch (JMSException je) - { - ; // pass - } - - - conn.close(); - } - - public void testNoLocal() throws Exception { @@ -445,9 +353,4 @@ public class TopicSessionTest extends QpidBrokerTestCase assertEquals("Queue depth was wrong", 0, depth); } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TopicSessionTest.class); - } } diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes index c31add9224..ed6d2ff1ac 100644 --- a/java/test-profiles/JavaPre010Excludes +++ b/java/test-profiles/JavaPre010Excludes @@ -60,3 +60,8 @@ org.apache.qpid.test.client.queue.LVQTest#* // Verification of unique client id is 0-10 specific org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForSameUser org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForDifferentUsers + +// Under AMQP 0-8..0-9-1 temporary queues are deleted on consumer close, rather than connection close +// and for this reason this test would fail. +org.apache.qpid.test.unit.client.temporaryqueue.TemporaryQueueTest#testTemporaryQueueReused + |