diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java | 280 |
1 files changed, 94 insertions, 186 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index 8c806fa2da..c98e403671 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -22,237 +22,145 @@ package org.apache.qpid.test.unit.client.temporaryqueue; import javax.jms.Connection; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; -import junit.framework.Assert; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.jms.ConnectionListener; -import java.util.ArrayList; -import java.util.List; -import java.util.LinkedList; - -public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener +/** + * Tests the behaviour of {@link TemporaryQueue}. + */ +public class TemporaryQueueTest extends QpidBrokerTestCase { - private List<Exception> _exceptions = new ArrayList<Exception>(); - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception + /** + * Tests the basic produce/consume behaviour of a temporary queue. + */ + public void testMessageDeliveryUsingTemporaryQueue() throws Exception { - super.tearDown(); - } - - protected Connection createConnection() throws Exception - { - return getConnection("guest", "guest"); - } - - public void testTemporaryQueue() throws Exception - { - Connection conn = createConnection(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue queue = session.createTemporaryQueue(); + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session.createTemporaryQueue(); assertNotNull(queue); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); + final MessageProducer producer = session.createProducer(queue); + final MessageConsumer consumer = session.createConsumer(queue); conn.start(); producer.send(session.createTextMessage("hello")); TextMessage tm = (TextMessage) consumer.receive(2000); - assertNotNull(tm); + assertNotNull("Message not received", tm); assertEquals("hello", tm.getText()); + } - try - { - queue.delete(); - fail("Expected JMSException : should not be able to delete while there are active consumers"); - } - catch (JMSException je) - { - ; //pass - } - - consumer.close(); + /** + * Tests that a temporary queue cannot be used by another {@link Session}. + */ + public void testUseFromAnotherSessionProhibited() throws Exception + { + final Connection conn = getConnection(); + final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session1.createTemporaryQueue(); + assertNotNull(queue); try { - queue.delete(); + session2.createConsumer(queue); + fail("Expected a JMSException when subscribing to a temporary queue created on a different session"); } catch (JMSException je) { - fail("Unexpected Exception: " + je.getMessage()); + //pass + assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage()); } - - conn.close(); - } - - public void tUniqueness() throws Exception - { - int numProcs = Runtime.getRuntime().availableProcessors(); - final int threadsProc = 5; - - runUniqueness(1, 10); - runUniqueness(numProcs * threadsProc, 10); - runUniqueness(numProcs * threadsProc, 100); - runUniqueness(numProcs * threadsProc, 500); } - void runUniqueness(int makers, int queues) throws Exception + /** + * Tests that the client is able to explicitly delete a temporary queue using + * {@link TemporaryQueue#delete()} and is prevented from deleting one that + * still has consumers. + * + * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted + * on the client. 0-10 causes the queue to be deleted from the Broker. + */ + public void testExplictTemporaryQueueDeletion() throws Exception { - Connection connection = createConnection(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>(); - - //Create Makers - for (int m = 0; m < makers; m++) - { - tqList.add(new TempQueueMaker(session, queues)); - } - - - List<Thread> threadList = new LinkedList<Thread>(); - - //Create Makers - for (TempQueueMaker maker : tqList) - { - threadList.add(new Thread(maker)); - } - - //Start threads - for (Thread thread : threadList) - { - thread.start(); - } - - // Join Threads - for (Thread thread : threadList) - { - try - { - thread.join(); - } - catch (InterruptedException e) - { - fail("Couldn't correctly join threads"); - } - } - + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + final MessageConsumer consumer = session.createConsumer(queue); + conn.start(); - List<AMQQueue> list = new LinkedList<AMQQueue>(); + assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue)); - // Test values - for (TempQueueMaker maker : tqList) + try { - check(maker, list); + queue.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); } - - Assert.assertEquals("Not enough queues made.", makers * queues, list.size()); - - connection.close(); - } - - private void check(TempQueueMaker tq, List<AMQQueue> list) - { - for (AMQQueue q : tq.getList()) + catch (JMSException je) { - if (list.contains(q)) - { - fail(q + " already exists."); - } - else - { - list.add(q); - } + //pass + assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage()); } - } - - - class TempQueueMaker implements Runnable - { - List<AMQQueue> _queues; - Session _session; - private int _count; + consumer.close(); + // Now deletion should succeed. + queue.delete(); - TempQueueMaker(Session session, int queues) throws JMSException + try { - _queues = new LinkedList<AMQQueue>(); - - _count = queues; - - _session = session; + session.createConsumer(queue); + fail("Exception not thrown"); } - - public void run() + catch (JMSException je) { - int i = 0; - try - { - for (; i < _count; i++) - { - _queues.add((AMQQueue) _session.createTemporaryQueue()); - } - } - catch (JMSException jmse) - { - //stop - } + //pass + assertEquals("Cannot consume from a deleted destination", je.getMessage()); } - List<AMQQueue> getList() + if (isBroker010()) { - return _queues; + assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue)); } } - public void testQPID1217() throws Exception - { - Connection conA = getConnection(); - conA.setExceptionListener(this); - Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue temp = sessA.createTemporaryQueue(); - - MessageProducer prod = sessA.createProducer(temp); - prod.send(sessA.createTextMessage("hi")); - - Thread.sleep(500); - assertTrue("Exception received", _exceptions.isEmpty()); - - Connection conB = getConnection(); - Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE); - - JMSException ex = null; - try - { - MessageConsumer consB = sessB.createConsumer(temp); - } - catch (JMSException e) - { - ex = e; - } - assertNotNull(ex); - } - - public static junit.framework.Test suite() + /** + * Tests that a temporary queue remains available for reuse even after its initial + * consumer has disconnected. + * + * This test would fail under < 0-10 as their temporary queues are deleted automatically + * (broker side) after the last consumer disconnects (so message2 would be lost). For this + * reason this test is excluded from those profiles. + */ + public void testTemporaryQueueReused() throws Exception { - return new junit.framework.TestSuite(TemporaryQueueTest.class); - } + final Connection conn = getConnection(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); - public void onException(JMSException arg0) - { - _exceptions.add(arg0); + final MessageProducer producer1 = session.createProducer(queue); + final MessageConsumer consumer1 = session.createConsumer(queue); + conn.start(); + producer1.send(session.createTextMessage("message1")); + producer1.send(session.createTextMessage("message2")); + TextMessage tm = (TextMessage) consumer1.receive(2000); + assertNotNull("Message not received by first consumer", tm); + assertEquals("message1", tm.getText()); + consumer1.close(); + + final MessageConsumer consumer2 = session.createConsumer(queue); + conn.start(); + tm = (TextMessage) consumer2.receive(2000); + assertNotNull("Message not received by second consumer", tm); + assertEquals("message2", tm.getText()); + consumer2.close(); } - } |