diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java | 95 |
1 files changed, 50 insertions, 45 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java index 181fe9d34e..14dee60124 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java @@ -19,11 +19,15 @@ */ package org.apache.qpid.server.queue; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.JMSException; @@ -34,14 +38,12 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.NamingException; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; public class SortedQueueTest extends QpidBrokerTestCase { @@ -54,6 +56,7 @@ public class SortedQueueTest extends QpidBrokerTestCase private Connection _producerConnection; private Session _producerSession; private Connection _consumerConnection; + private long _receiveInterval; protected void setUp() throws Exception { @@ -64,6 +67,7 @@ public class SortedQueueTest extends QpidBrokerTestCase _producerConnection = getConnection(); _consumerConnection = getConnection(); _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED); + _receiveInterval = isBrokerStorePersistent() ? 3000l : 1500l; } protected void tearDown() throws Exception @@ -94,7 +98,7 @@ public class SortedQueueTest extends QpidBrokerTestCase _consumerConnection.start(); TextMessage received; int messageCount = 0; - while((received = (TextMessage) consumer.receive(1000)) != null) + while((received = (TextMessage) consumer.receive(_receiveInterval)) != null) { assertEquals("Received message with unexpected sorted key value", VALUES_SORTED[messageCount], received.getStringProperty(TEST_SORT_KEY)); @@ -138,17 +142,15 @@ public class SortedQueueTest extends QpidBrokerTestCase _producerSession.commit(); } - synchronized(consumerThread) + try { - try - { - consumerThread.join(5000L); - } - catch(InterruptedException e) - { - fail("Test failed waiting for consumer to complete"); - } + consumerThread.join(getConsumerThreadJoinInterval()); } + catch(InterruptedException e) + { + fail("Test failed waiting for consumer to complete"); + } + assertTrue("Consumer timed out", consumerThread.isStopped()); assertEquals("Incorrect number of messages received", VALUES.length, consumerThread.getConsumed()); @@ -172,23 +174,26 @@ public class SortedQueueTest extends QpidBrokerTestCase _producerSession.commit(); } - synchronized(consumerThread) + try { - try - { - consumerThread.join(5000L); - } - catch(InterruptedException e) - { - fail("Test failed waiting for consumer to complete"); - } + consumerThread.join(getConsumerThreadJoinInterval()); } + catch(InterruptedException e) + { + fail("Test failed waiting for consumer to complete"); + } + assertTrue("Consumer timed out", consumerThread.isStopped()); assertEquals("Incorrect number of messages received", 200, consumerThread.getConsumed()); producer.close(); } + private long getConsumerThreadJoinInterval() + { + return isBrokerStorePersistent() ? 50000L: 5000L; + } + public void testSortOrderWithNonUniqueKeys() throws JMSException, NamingException, AMQException { final Queue queue = createQueue(); @@ -211,7 +216,7 @@ public class SortedQueueTest extends QpidBrokerTestCase TextMessage received = null; int messageCount = 0; - while((received = (TextMessage) consumer.receive(1000)) != null) + while((received = (TextMessage) consumer.receive(_receiveInterval)) != null) { assertEquals("Received message with unexpected sorted key value", "samesortkeyvalue", received.getStringProperty(TEST_SORT_KEY)); @@ -247,7 +252,7 @@ public class SortedQueueTest extends QpidBrokerTestCase TextMessage received; int messageCount = 0; - while((received = (TextMessage) consumer.receive(1000)) != null) + while((received = (TextMessage) consumer.receive(_receiveInterval)) != null) { assertEquals("Received message with unexpected sorted key value", SUBSET_KEYS[messageCount / 10], received.getStringProperty(TEST_SORT_KEY)); @@ -362,16 +367,16 @@ public class SortedQueueTest extends QpidBrokerTestCase private Message assertReceiveMessage(final MessageConsumer consumer) throws JMSException { - final Message received = (TextMessage) consumer.receive(10000); + final Message received = (TextMessage) consumer.receive(_receiveInterval); assertNotNull("Received message is unexpectedly null", received); return received; } private class TestConsumerThread extends Thread { - private boolean _stopped = false; + private final AtomicInteger _consumed = new AtomicInteger(0); + private volatile boolean _stopped = false; private int _count = 0; - private int _consumed = 0; private int _sessionType = Session.AUTO_ACKNOWLEDGE; private Queue _queue; @@ -402,7 +407,7 @@ public class SortedQueueTest extends QpidBrokerTestCase conn.start(); Message msg; - while((msg = consumer.receive(1000)) != null) + while((msg = consumer.receive(_receiveInterval)) != null) { if(_sessionType == Session.SESSION_TRANSACTED) { @@ -415,7 +420,7 @@ public class SortedQueueTest extends QpidBrokerTestCase { LOGGER.debug("transacted session commit"); session.commit(); - _consumed++; + _consumed.incrementAndGet(); } } else if(_sessionType == Session.CLIENT_ACKNOWLEDGE) @@ -429,18 +434,18 @@ public class SortedQueueTest extends QpidBrokerTestCase { LOGGER.debug("client ack session acknowledge"); msg.acknowledge(); - _consumed++; + _consumed.incrementAndGet(); } } else { LOGGER.debug("auto ack session"); - _consumed++; + _consumed.incrementAndGet(); } _count++; LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY)); - LOGGER.debug("Message consumed with consumed index: " + _consumed); + LOGGER.debug("Message consumed with consumed index: " + _consumed.get()); } _stopped = true; @@ -453,14 +458,14 @@ public class SortedQueueTest extends QpidBrokerTestCase } } - public synchronized boolean isStopped() + public boolean isStopped() { return _stopped; } - public synchronized int getConsumed() + public int getConsumed() { - return _consumed; + return _consumed.get(); } } |