summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-10-10 08:51:27 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-10-10 08:51:27 +0000
commit4a23d58e86a265c53ea583dee5e190e4642fbb74 (patch)
treeb83ef350addfc7aee77dfeeb192f56a6f5e4dba9
parent6b9dd91076f515aa6251b5973a8113af6edd15c8 (diff)
downloadqpid-python-4a23d58e86a265c53ea583dee5e190e4642fbb74.tar.gz
Merged revisions 583105 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r583105 | rgodfrey | 2007-10-09 11:48:25 +0100 (Tue, 09 Oct 2007) | 1 line QPID-625 : Fix commit rollback test to prevent failures caused by incorrect assertions in the test ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@583389 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java99
1 files changed, 62 insertions, 37 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index a297011acd..03db3c5cca 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -32,7 +32,6 @@ import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -40,6 +39,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.UUID;
public class MessageRequeueTest extends TestCase
{
@@ -50,7 +50,7 @@ public class MessageRequeueTest extends TestCase
protected final int consumeTimeout = 3000;
- protected final String queue = "direct://amq.direct//queue";
+
protected String payload = "Message:";
protected final String BROKER = "vm://:1";
@@ -64,32 +64,13 @@ public class MessageRequeueTest extends TestCase
super.setUp();
TransportConnection.createVMBroker(1);
- QpidClientConnection conn = new QpidClientConnection(BROKER);
- conn.connect();
- // clear queue
- conn.consume(queue, consumeTimeout);
- // load test data
- _logger.info("creating test data, " + numTestMessages + " messages");
- conn.put(queue, payload, numTestMessages);
- // close this connection
- conn.disconnect();
}
protected void tearDown() throws Exception
{
super.tearDown();
- if (!passed) // clean up
- {
- QpidClientConnection conn = new QpidClientConnection(BROKER);
-
- conn.connect();
- // clear queue
- conn.consume(queue, consumeTimeout);
-
- conn.disconnect();
- }
TransportConnection.killVMBroker(1);
}
@@ -102,12 +83,25 @@ public class MessageRequeueTest extends TestCase
*/
public void testDrain() throws JMSException, InterruptedException
{
+ final String queueName = "direct://amq.direct//queue" + UUID.randomUUID().toString();
+
QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
+ // clear queue
+ conn.consume(queueName, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queueName, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+
+ conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
- _logger.info("consuming queue " + queue);
- Queue q = conn.getSession().createQueue(queue);
+ _logger.info("consuming queue " + queueName);
+ Queue q = conn.getSession().createQueue(queueName);
final MessageConsumer consumer = conn.getSession().createConsumer(q);
int messagesReceived = 0;
@@ -173,17 +167,33 @@ public class MessageRequeueTest extends TestCase
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
passed = true;
+
}
+
+
/** multiple consumers
* Based on code subbmitted by client FT-304
*/
- public void testTwoCompetingConsumers()
+ public void testCompetingConsumers() throws JMSException, InterruptedException
{
- Consumer c1 = new Consumer();
- Consumer c2 = new Consumer();
- Consumer c3 = new Consumer();
- Consumer c4 = new Consumer();
+ final String queueName = "direct://amq.direct//queue" + UUID.randomUUID().toString();
+
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
+ // clear queue
+ conn.consume(queueName, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queueName, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+
+ Consumer c1 = new Consumer(queueName);
+ Consumer c2 = new Consumer(queueName);
+ Consumer c3 = new Consumer(queueName);
+ Consumer c4 = new Consumer(queueName);
Thread t1 = new Thread(c1);
Thread t2 = new Thread(c2);
@@ -193,7 +203,7 @@ public class MessageRequeueTest extends TestCase
t1.start();
t2.start();
t3.start();
- // t4.start();
+ t4.start();
try
{
@@ -237,16 +247,18 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
- passed = true;
+
}
class Consumer implements Runnable
{
private Integer count = 0;
private Integer id;
+ private final String _queueName;
- public Consumer()
+ public Consumer(String queueName)
{
+ _queueName = queueName;
id = consumerIds.addAndGet(1);
}
@@ -263,7 +275,7 @@ public class MessageRequeueTest extends TestCase
Message result;
do
{
- result = conn.getNextMessage(queue, consumeTimeout);
+ result = conn.getNextMessage(_queueName, consumeTimeout);
if (result != null)
{
@@ -322,8 +334,21 @@ public class MessageRequeueTest extends TestCase
}
}
- public void testRequeue() throws JMSException, AMQException, URLSyntaxException
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException, InterruptedException
{
+ final String queue = "direct://amq.direct//queue" + UUID.randomUUID().toString();
+
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queue, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+
int run = 0;
// while (run < 10)
{
@@ -338,14 +363,14 @@ public class MessageRequeueTest extends TestCase
String brokerlist = BROKER;
String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- Connection conn = new AMQConnection(brokerUrl);
- Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ AMQConnection amqConn = new AMQConnection(brokerUrl);
+ Session session = amqConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue q = session.createQueue(queue);
_logger.debug("Create Consumer");
MessageConsumer consumer = session.createConsumer(q);
- conn.start();
+ amqConn.start();
_logger.debug("Receiving msg");
Message msg = consumer.receive(2000);
@@ -357,7 +382,7 @@ public class MessageRequeueTest extends TestCase
consumer.close();
_logger.debug("Close Connection");
- conn.close();
+ amqConn.close();
}
}