summaryrefslogtreecommitdiff
path: root/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java73
1 files changed, 51 insertions, 22 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index 8d7645c1fd..56904f20de 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -40,30 +40,40 @@ import javax.jms.Queue;
import javax.jms.Session;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Random;
+import java.util.UUID;
public class MessageRequeueTest extends TestCase
{
private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class);
+
+
protected static AtomicInteger consumerIds = new AtomicInteger(0);
protected final Integer numTestMessages = 150;
protected final int consumeTimeout = 3000;
- protected final String queue = "direct://amq.direct//queue";
+ //protected final String queue = "direct://amq.direct//queue";
protected String payload = "Message:";
protected final String BROKER = "vm://:1";
private boolean testReception = true;
private long[] receieved = new long[numTestMessages + 1];
- private boolean passed = false;
+ //private boolean passed = false;
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
+
+ }
+
+ private void putMessagesOnQueueThenClose(String queue)
+ throws JMSException, InterruptedException
+ {
QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -76,20 +86,25 @@ public class MessageRequeueTest extends TestCase
conn.disconnect();
}
- protected void tearDown() throws Exception
+
+
+ private void tearDownQueue(String queue)
+ throws JMSException, InterruptedException
+
{
- super.tearDown();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
- if (!passed) // clean up
- {
- QpidClientConnection conn = new QpidClientConnection(BROKER);
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
- conn.connect();
- // clear queue
- conn.consume(queue, consumeTimeout);
+ conn.disconnect();
+ }
- conn.disconnect();
- }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
TransportConnection.killVMBroker(1);
}
@@ -102,6 +117,11 @@ public class MessageRequeueTest extends TestCase
*/
public void testDrain() throws JMSException, InterruptedException
{
+
+ String queue = "direct://amq.direct//queue" + UUID.randomUUID();
+
+ putMessagesOnQueueThenClose(queue);
+
QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -172,18 +192,22 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString(), 0, failed);
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
- passed = true;
+ tearDownQueue(queue);
}
/** multiple consumers
* Based on code subbmitted by client FT-304
*/
- public void testCompetingConsumers()
+ public void testCompetingConsumers() throws JMSException, InterruptedException
{
- Consumer c1 = new Consumer();
- Consumer c2 = new Consumer();
- Consumer c3 = new Consumer();
- Consumer c4 = new Consumer();
+ String queue = "direct://amq.direct//queue" + UUID.randomUUID();
+
+ putMessagesOnQueueThenClose(queue);
+
+ Consumer c1 = new Consumer(queue);
+ Consumer c2 = new Consumer(queue);
+ Consumer c3 = new Consumer(queue);
+ Consumer c4 = new Consumer(queue);
Thread t1 = new Thread(c1);
Thread t2 = new Thread(c2);
@@ -237,16 +261,18 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertTrue("number of consumed messages does not match initial data: " + totalConsumed, numTestMessages <= totalConsumed);
- passed = true;
+ tearDownQueue(queue);
}
class Consumer implements Runnable
{
private Integer count = 0;
private Integer id;
+ private final String _queue;
- public Consumer()
+ public Consumer(String queue)
{
+ _queue = queue;
id = consumerIds.addAndGet(1);
}
@@ -263,7 +289,7 @@ public class MessageRequeueTest extends TestCase
Message result;
do
{
- result = conn.getNextMessage(queue, consumeTimeout);
+ result = conn.getNextMessage(_queue, consumeTimeout);
if (result != null)
{
@@ -322,8 +348,11 @@ public class MessageRequeueTest extends TestCase
}
}
- public void testRequeue() throws JMSException, AMQException, URLSyntaxException
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException, InterruptedException
{
+ String queue = "direct://amq.direct//queue" + UUID.randomUUID();
+ putMessagesOnQueueThenClose(queue);
+
int run = 0;
// while (run < 10)
{