diff options
Diffstat (limited to 'java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java')
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java | 73 |
1 files changed, 51 insertions, 22 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index 8d7645c1fd..56904f20de 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -40,30 +40,40 @@ import javax.jms.Queue; import javax.jms.Session; import java.util.concurrent.atomic.AtomicInteger; +import java.util.Random; +import java.util.UUID; public class MessageRequeueTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class); + + protected static AtomicInteger consumerIds = new AtomicInteger(0); protected final Integer numTestMessages = 150; protected final int consumeTimeout = 3000; - protected final String queue = "direct://amq.direct//queue"; + //protected final String queue = "direct://amq.direct//queue"; protected String payload = "Message:"; protected final String BROKER = "vm://:1"; private boolean testReception = true; private long[] receieved = new long[numTestMessages + 1]; - private boolean passed = false; + //private boolean passed = false; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); + + } + + private void putMessagesOnQueueThenClose(String queue) + throws JMSException, InterruptedException + { QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -76,20 +86,25 @@ public class MessageRequeueTest extends TestCase conn.disconnect(); } - protected void tearDown() throws Exception + + + private void tearDownQueue(String queue) + throws JMSException, InterruptedException + { - super.tearDown(); + QpidClientConnection conn = new QpidClientConnection(BROKER); - if (!passed) // clean up - { - QpidClientConnection conn = new QpidClientConnection(BROKER); + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); - conn.connect(); - // clear queue - conn.consume(queue, consumeTimeout); + conn.disconnect(); + } - conn.disconnect(); - } + + protected void tearDown() throws Exception + { + super.tearDown(); TransportConnection.killVMBroker(1); } @@ -102,6 +117,11 @@ public class MessageRequeueTest extends TestCase */ public void testDrain() throws JMSException, InterruptedException { + + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + + putMessagesOnQueueThenClose(queue); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -172,18 +192,22 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); - passed = true; + tearDownQueue(queue); } /** multiple consumers * Based on code subbmitted by client FT-304 */ - public void testCompetingConsumers() + public void testCompetingConsumers() throws JMSException, InterruptedException { - Consumer c1 = new Consumer(); - Consumer c2 = new Consumer(); - Consumer c3 = new Consumer(); - Consumer c4 = new Consumer(); + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + + putMessagesOnQueueThenClose(queue); + + Consumer c1 = new Consumer(queue); + Consumer c2 = new Consumer(queue); + Consumer c3 = new Consumer(queue); + Consumer c4 = new Consumer(queue); Thread t1 = new Thread(c1); Thread t2 = new Thread(c2); @@ -237,16 +261,18 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertTrue("number of consumed messages does not match initial data: " + totalConsumed, numTestMessages <= totalConsumed); - passed = true; + tearDownQueue(queue); } class Consumer implements Runnable { private Integer count = 0; private Integer id; + private final String _queue; - public Consumer() + public Consumer(String queue) { + _queue = queue; id = consumerIds.addAndGet(1); } @@ -263,7 +289,7 @@ public class MessageRequeueTest extends TestCase Message result; do { - result = conn.getNextMessage(queue, consumeTimeout); + result = conn.getNextMessage(_queue, consumeTimeout); if (result != null) { @@ -322,8 +348,11 @@ public class MessageRequeueTest extends TestCase } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException + public void testRequeue() throws JMSException, AMQException, URLSyntaxException, InterruptedException { + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + putMessagesOnQueueThenClose(queue); + int run = 0; // while (run < 10) { |