summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java37
1 files changed, 24 insertions, 13 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
index 57370f490f..d1bcaa1bb8 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
@@ -1,7 +1,6 @@
package org.apache.qpid.test.client;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.testutil.QpidTestCase;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
@@ -53,6 +52,10 @@ public class DupsOkTest extends QpidTestCase
_queue = (Queue) getInitialContext().lookup("queue");
+ //Declare the queue
+ Connection consumerConnection = getConnection();
+ consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
//Create Producer put some messages on the queue
Connection producerConnection = getConnection();
@@ -84,12 +87,14 @@ public class DupsOkTest extends QpidTestCase
//Create Client
Connection clientConnection = getConnection();
- clientConnection.start();
-
final Session clientSession = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
MessageConsumer consumer = clientSession.createConsumer(_queue);
+ assertEquals("The queue should have msgs at start", MSG_COUNT, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue));
+
+ clientConnection.start();
+
consumer.setMessageListener(new MessageListener()
{
int _msgCount = 0;
@@ -110,13 +115,10 @@ public class DupsOkTest extends QpidTestCase
{
try
{
- long remainingMessages = ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue);
- fail("The queue should have 0 msgs left, seen " + _msgCount + " messages, left: "
- + remainingMessages);
- }
- catch (AMQException e)
- {
- fail("Got AMQException" + e.getMessage());
+ if(_msgCount != MSG_COUNT)
+ {
+ assertEquals("Wrong number of messages seen.", MSG_COUNT, _msgCount);
+ }
}
finally
{
@@ -124,7 +126,6 @@ public class DupsOkTest extends QpidTestCase
_awaitCompletion.countDown();
}
}
-
}
catch (JMSException e)
{
@@ -140,14 +141,24 @@ public class DupsOkTest extends QpidTestCase
try
{
- _awaitCompletion.await(60, TimeUnit.SECONDS);
+ if (!_awaitCompletion.await(120, TimeUnit.SECONDS))
+ {
+ fail("Test did not complete in 120 seconds");
+ }
}
catch (InterruptedException e)
{
fail("Unable to wait for test completion");
throw e;
}
+
+ //Close consumer to give broker time to process in bound Acks. As The main thread will be released while
+ // before the dispatcher has sent the ack back to the broker.
+ consumer.close();
+
assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue));
+
+ clientConnection.close();
}
}