summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java95
1 files changed, 50 insertions, 45 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
index 181fe9d34e..14dee60124 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
@@ -19,11 +19,15 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -34,14 +38,12 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
public class SortedQueueTest extends QpidBrokerTestCase
{
@@ -54,6 +56,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
private Connection _producerConnection;
private Session _producerSession;
private Connection _consumerConnection;
+ private long _receiveInterval;
protected void setUp() throws Exception
{
@@ -64,6 +67,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
_producerConnection = getConnection();
_consumerConnection = getConnection();
_producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ _receiveInterval = isBrokerStorePersistent() ? 3000l : 1500l;
}
protected void tearDown() throws Exception
@@ -94,7 +98,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
_consumerConnection.start();
TextMessage received;
int messageCount = 0;
- while((received = (TextMessage) consumer.receive(1000)) != null)
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
{
assertEquals("Received message with unexpected sorted key value", VALUES_SORTED[messageCount],
received.getStringProperty(TEST_SORT_KEY));
@@ -138,17 +142,15 @@ public class SortedQueueTest extends QpidBrokerTestCase
_producerSession.commit();
}
- synchronized(consumerThread)
+ try
{
- try
- {
- consumerThread.join(5000L);
- }
- catch(InterruptedException e)
- {
- fail("Test failed waiting for consumer to complete");
- }
+ consumerThread.join(getConsumerThreadJoinInterval());
}
+ catch(InterruptedException e)
+ {
+ fail("Test failed waiting for consumer to complete");
+ }
+
assertTrue("Consumer timed out", consumerThread.isStopped());
assertEquals("Incorrect number of messages received", VALUES.length, consumerThread.getConsumed());
@@ -172,23 +174,26 @@ public class SortedQueueTest extends QpidBrokerTestCase
_producerSession.commit();
}
- synchronized(consumerThread)
+ try
{
- try
- {
- consumerThread.join(5000L);
- }
- catch(InterruptedException e)
- {
- fail("Test failed waiting for consumer to complete");
- }
+ consumerThread.join(getConsumerThreadJoinInterval());
}
+ catch(InterruptedException e)
+ {
+ fail("Test failed waiting for consumer to complete");
+ }
+
assertTrue("Consumer timed out", consumerThread.isStopped());
assertEquals("Incorrect number of messages received", 200, consumerThread.getConsumed());
producer.close();
}
+ private long getConsumerThreadJoinInterval()
+ {
+ return isBrokerStorePersistent() ? 50000L: 5000L;
+ }
+
public void testSortOrderWithNonUniqueKeys() throws JMSException, NamingException, AMQException
{
final Queue queue = createQueue();
@@ -211,7 +216,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
TextMessage received = null;
int messageCount = 0;
- while((received = (TextMessage) consumer.receive(1000)) != null)
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
{
assertEquals("Received message with unexpected sorted key value", "samesortkeyvalue",
received.getStringProperty(TEST_SORT_KEY));
@@ -247,7 +252,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
TextMessage received;
int messageCount = 0;
- while((received = (TextMessage) consumer.receive(1000)) != null)
+ while((received = (TextMessage) consumer.receive(_receiveInterval)) != null)
{
assertEquals("Received message with unexpected sorted key value", SUBSET_KEYS[messageCount / 10],
received.getStringProperty(TEST_SORT_KEY));
@@ -362,16 +367,16 @@ public class SortedQueueTest extends QpidBrokerTestCase
private Message assertReceiveMessage(final MessageConsumer consumer)
throws JMSException
{
- final Message received = (TextMessage) consumer.receive(10000);
+ final Message received = (TextMessage) consumer.receive(_receiveInterval);
assertNotNull("Received message is unexpectedly null", received);
return received;
}
private class TestConsumerThread extends Thread
{
- private boolean _stopped = false;
+ private final AtomicInteger _consumed = new AtomicInteger(0);
+ private volatile boolean _stopped = false;
private int _count = 0;
- private int _consumed = 0;
private int _sessionType = Session.AUTO_ACKNOWLEDGE;
private Queue _queue;
@@ -402,7 +407,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
conn.start();
Message msg;
- while((msg = consumer.receive(1000)) != null)
+ while((msg = consumer.receive(_receiveInterval)) != null)
{
if(_sessionType == Session.SESSION_TRANSACTED)
{
@@ -415,7 +420,7 @@ public class SortedQueueTest extends QpidBrokerTestCase
{
LOGGER.debug("transacted session commit");
session.commit();
- _consumed++;
+ _consumed.incrementAndGet();
}
}
else if(_sessionType == Session.CLIENT_ACKNOWLEDGE)
@@ -429,18 +434,18 @@ public class SortedQueueTest extends QpidBrokerTestCase
{
LOGGER.debug("client ack session acknowledge");
msg.acknowledge();
- _consumed++;
+ _consumed.incrementAndGet();
}
}
else
{
LOGGER.debug("auto ack session");
- _consumed++;
+ _consumed.incrementAndGet();
}
_count++;
LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY));
- LOGGER.debug("Message consumed with consumed index: " + _consumed);
+ LOGGER.debug("Message consumed with consumed index: " + _consumed.get());
}
_stopped = true;
@@ -453,14 +458,14 @@ public class SortedQueueTest extends QpidBrokerTestCase
}
}
- public synchronized boolean isStopped()
+ public boolean isStopped()
{
return _stopped;
}
- public synchronized int getConsumed()
+ public int getConsumed()
{
- return _consumed;
+ return _consumed.get();
}
}