diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-10-10 08:51:27 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-10-10 08:51:27 +0000 |
commit | 4a23d58e86a265c53ea583dee5e190e4642fbb74 (patch) | |
tree | b83ef350addfc7aee77dfeeb192f56a6f5e4dba9 | |
parent | 6b9dd91076f515aa6251b5973a8113af6edd15c8 (diff) | |
download | qpid-python-4a23d58e86a265c53ea583dee5e190e4642fbb74.tar.gz |
Merged revisions 583105 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r583105 | rgodfrey | 2007-10-09 11:48:25 +0100 (Tue, 09 Oct 2007) | 1 line
QPID-625 : Fix commit rollback test to prevent failures caused by incorrect assertions in the test
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@583389 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java | 99 |
1 files changed, 62 insertions, 37 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index a297011acd..03db3c5cca 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -32,7 +32,6 @@ import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -40,6 +39,7 @@ import javax.jms.Queue; import javax.jms.Session; import java.util.concurrent.atomic.AtomicInteger; +import java.util.UUID; public class MessageRequeueTest extends TestCase { @@ -50,7 +50,7 @@ public class MessageRequeueTest extends TestCase protected final int consumeTimeout = 3000; - protected final String queue = "direct://amq.direct//queue"; + protected String payload = "Message:"; protected final String BROKER = "vm://:1"; @@ -64,32 +64,13 @@ public class MessageRequeueTest extends TestCase super.setUp(); TransportConnection.createVMBroker(1); - QpidClientConnection conn = new QpidClientConnection(BROKER); - conn.connect(); - // clear queue - conn.consume(queue, consumeTimeout); - // load test data - _logger.info("creating test data, " + numTestMessages + " messages"); - conn.put(queue, payload, numTestMessages); - // close this connection - conn.disconnect(); } protected void tearDown() throws Exception { super.tearDown(); - if (!passed) // clean up - { - QpidClientConnection conn = new QpidClientConnection(BROKER); - - conn.connect(); - // clear queue - conn.consume(queue, consumeTimeout); - - conn.disconnect(); - } TransportConnection.killVMBroker(1); } @@ -102,12 +83,25 @@ public class MessageRequeueTest extends TestCase */ public void testDrain() throws JMSException, InterruptedException { + final String queueName = "direct://amq.direct//queue" + UUID.randomUUID().toString(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); + // clear queue + conn.consume(queueName, consumeTimeout); + // load test data + _logger.info("creating test data, " + numTestMessages + " messages"); + conn.put(queueName, payload, numTestMessages); + // close this connection + conn.disconnect(); + + conn = new QpidClientConnection(BROKER); + + conn.connect(); - _logger.info("consuming queue " + queue); - Queue q = conn.getSession().createQueue(queue); + _logger.info("consuming queue " + queueName); + Queue q = conn.getSession().createQueue(queueName); final MessageConsumer consumer = conn.getSession().createConsumer(q); int messagesReceived = 0; @@ -173,17 +167,33 @@ public class MessageRequeueTest extends TestCase _logger.info("consumed: " + messagesReceived); conn.disconnect(); passed = true; + } + + /** multiple consumers * Based on code subbmitted by client FT-304 */ - public void testTwoCompetingConsumers() + public void testCompetingConsumers() throws JMSException, InterruptedException { - Consumer c1 = new Consumer(); - Consumer c2 = new Consumer(); - Consumer c3 = new Consumer(); - Consumer c4 = new Consumer(); + final String queueName = "direct://amq.direct//queue" + UUID.randomUUID().toString(); + + QpidClientConnection conn = new QpidClientConnection(BROKER); + + conn.connect(); + // clear queue + conn.consume(queueName, consumeTimeout); + // load test data + _logger.info("creating test data, " + numTestMessages + " messages"); + conn.put(queueName, payload, numTestMessages); + // close this connection + conn.disconnect(); + + Consumer c1 = new Consumer(queueName); + Consumer c2 = new Consumer(queueName); + Consumer c3 = new Consumer(queueName); + Consumer c4 = new Consumer(queueName); Thread t1 = new Thread(c1); Thread t2 = new Thread(c2); @@ -193,7 +203,7 @@ public class MessageRequeueTest extends TestCase t1.start(); t2.start(); t3.start(); - // t4.start(); + t4.start(); try { @@ -237,16 +247,18 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); - passed = true; + } class Consumer implements Runnable { private Integer count = 0; private Integer id; + private final String _queueName; - public Consumer() + public Consumer(String queueName) { + _queueName = queueName; id = consumerIds.addAndGet(1); } @@ -263,7 +275,7 @@ public class MessageRequeueTest extends TestCase Message result; do { - result = conn.getNextMessage(queue, consumeTimeout); + result = conn.getNextMessage(_queueName, consumeTimeout); if (result != null) { @@ -322,8 +334,21 @@ public class MessageRequeueTest extends TestCase } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException + public void testRequeue() throws JMSException, AMQException, URLSyntaxException, InterruptedException { + final String queue = "direct://amq.direct//queue" + UUID.randomUUID().toString(); + + QpidClientConnection conn = new QpidClientConnection(BROKER); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + // load test data + _logger.info("creating test data, " + numTestMessages + " messages"); + conn.put(queue, payload, numTestMessages); + // close this connection + conn.disconnect(); + int run = 0; // while (run < 10) { @@ -338,14 +363,14 @@ public class MessageRequeueTest extends TestCase String brokerlist = BROKER; String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - Connection conn = new AMQConnection(brokerUrl); - Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + AMQConnection amqConn = new AMQConnection(brokerUrl); + Session session = amqConn.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue q = session.createQueue(queue); _logger.debug("Create Consumer"); MessageConsumer consumer = session.createConsumer(q); - conn.start(); + amqConn.start(); _logger.debug("Receiving msg"); Message msg = consumer.receive(2000); @@ -357,7 +382,7 @@ public class MessageRequeueTest extends TestCase consumer.close(); _logger.debug("Close Connection"); - conn.close(); + amqConn.close(); } } |