summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java280
1 files changed, 94 insertions, 186 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
index 8c806fa2da..c98e403671 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -22,237 +22,145 @@
package org.apache.qpid.test.unit.client.temporaryqueue;
import javax.jms.Connection;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
-import junit.framework.Assert;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.ConnectionListener;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-
-public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener
+/**
+ * Tests the behaviour of {@link TemporaryQueue}.
+ */
+public class TemporaryQueueTest extends QpidBrokerTestCase
{
- private List<Exception> _exceptions = new ArrayList<Exception>();
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
+ /**
+ * Tests the basic produce/consume behaviour of a temporary queue.
+ */
+ public void testMessageDeliveryUsingTemporaryQueue() throws Exception
{
- super.tearDown();
- }
-
- protected Connection createConnection() throws Exception
- {
- return getConnection("guest", "guest");
- }
-
- public void testTemporaryQueue() throws Exception
- {
- Connection conn = createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue queue = session.createTemporaryQueue();
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session.createTemporaryQueue();
assertNotNull(queue);
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
+ final MessageProducer producer = session.createProducer(queue);
+ final MessageConsumer consumer = session.createConsumer(queue);
conn.start();
producer.send(session.createTextMessage("hello"));
TextMessage tm = (TextMessage) consumer.receive(2000);
- assertNotNull(tm);
+ assertNotNull("Message not received", tm);
assertEquals("hello", tm.getText());
+ }
- try
- {
- queue.delete();
- fail("Expected JMSException : should not be able to delete while there are active consumers");
- }
- catch (JMSException je)
- {
- ; //pass
- }
-
- consumer.close();
+ /**
+ * Tests that a temporary queue cannot be used by another {@link Session}.
+ */
+ public void testUseFromAnotherSessionProhibited() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session1.createTemporaryQueue();
+ assertNotNull(queue);
try
{
- queue.delete();
+ session2.createConsumer(queue);
+ fail("Expected a JMSException when subscribing to a temporary queue created on a different session");
}
catch (JMSException je)
{
- fail("Unexpected Exception: " + je.getMessage());
+ //pass
+ assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage());
}
-
- conn.close();
- }
-
- public void tUniqueness() throws Exception
- {
- int numProcs = Runtime.getRuntime().availableProcessors();
- final int threadsProc = 5;
-
- runUniqueness(1, 10);
- runUniqueness(numProcs * threadsProc, 10);
- runUniqueness(numProcs * threadsProc, 100);
- runUniqueness(numProcs * threadsProc, 500);
}
- void runUniqueness(int makers, int queues) throws Exception
+ /**
+ * Tests that the client is able to explicitly delete a temporary queue using
+ * {@link TemporaryQueue#delete()} and is prevented from deleting one that
+ * still has consumers.
+ *
+ * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted
+ * on the client. 0-10 causes the queue to be deleted from the Broker.
+ */
+ public void testExplictTemporaryQueueDeletion() throws Exception
{
- Connection connection = createConnection();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
-
- //Create Makers
- for (int m = 0; m < makers; m++)
- {
- tqList.add(new TempQueueMaker(session, queues));
- }
-
-
- List<Thread> threadList = new LinkedList<Thread>();
-
- //Create Makers
- for (TempQueueMaker maker : tqList)
- {
- threadList.add(new Thread(maker));
- }
-
- //Start threads
- for (Thread thread : threadList)
- {
- thread.start();
- }
-
- // Join Threads
- for (Thread thread : threadList)
- {
- try
- {
- thread.join();
- }
- catch (InterruptedException e)
- {
- fail("Couldn't correctly join threads");
- }
- }
-
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
- List<AMQQueue> list = new LinkedList<AMQQueue>();
+ assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue));
- // Test values
- for (TempQueueMaker maker : tqList)
+ try
{
- check(maker, list);
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
}
-
- Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
-
- connection.close();
- }
-
- private void check(TempQueueMaker tq, List<AMQQueue> list)
- {
- for (AMQQueue q : tq.getList())
+ catch (JMSException je)
{
- if (list.contains(q))
- {
- fail(q + " already exists.");
- }
- else
- {
- list.add(q);
- }
+ //pass
+ assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage());
}
- }
-
-
- class TempQueueMaker implements Runnable
- {
- List<AMQQueue> _queues;
- Session _session;
- private int _count;
+ consumer.close();
+ // Now deletion should succeed.
+ queue.delete();
- TempQueueMaker(Session session, int queues) throws JMSException
+ try
{
- _queues = new LinkedList<AMQQueue>();
-
- _count = queues;
-
- _session = session;
+ session.createConsumer(queue);
+ fail("Exception not thrown");
}
-
- public void run()
+ catch (JMSException je)
{
- int i = 0;
- try
- {
- for (; i < _count; i++)
- {
- _queues.add((AMQQueue) _session.createTemporaryQueue());
- }
- }
- catch (JMSException jmse)
- {
- //stop
- }
+ //pass
+ assertEquals("Cannot consume from a deleted destination", je.getMessage());
}
- List<AMQQueue> getList()
+ if (isBroker010())
{
- return _queues;
+ assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue));
}
}
- public void testQPID1217() throws Exception
- {
- Connection conA = getConnection();
- conA.setExceptionListener(this);
- Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue temp = sessA.createTemporaryQueue();
-
- MessageProducer prod = sessA.createProducer(temp);
- prod.send(sessA.createTextMessage("hi"));
-
- Thread.sleep(500);
- assertTrue("Exception received", _exceptions.isEmpty());
-
- Connection conB = getConnection();
- Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- JMSException ex = null;
- try
- {
- MessageConsumer consB = sessB.createConsumer(temp);
- }
- catch (JMSException e)
- {
- ex = e;
- }
- assertNotNull(ex);
- }
-
- public static junit.framework.Test suite()
+ /**
+ * Tests that a temporary queue remains available for reuse even after its initial
+ * consumer has disconnected.
+ *
+ * This test would fail under < 0-10 as their temporary queues are deleted automatically
+ * (broker side) after the last consumer disconnects (so message2 would be lost). For this
+ * reason this test is excluded from those profiles.
+ */
+ public void testTemporaryQueueReused() throws Exception
{
- return new junit.framework.TestSuite(TemporaryQueueTest.class);
- }
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
- public void onException(JMSException arg0)
- {
- _exceptions.add(arg0);
+ final MessageProducer producer1 = session.createProducer(queue);
+ final MessageConsumer consumer1 = session.createConsumer(queue);
+ conn.start();
+ producer1.send(session.createTextMessage("message1"));
+ producer1.send(session.createTextMessage("message2"));
+ TextMessage tm = (TextMessage) consumer1.receive(2000);
+ assertNotNull("Message not received by first consumer", tm);
+ assertEquals("message1", tm.getText());
+ consumer1.close();
+
+ final MessageConsumer consumer2 = session.createConsumer(queue);
+ conn.start();
+ tm = (TextMessage) consumer2.receive(2000);
+ assertNotNull("Message not received by second consumer", tm);
+ assertEquals("message2", tm.getText());
+ consumer2.close();
}
-
}