diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid')
2 files changed, 101 insertions, 18 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index fedb88d008..e606df3f7d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -21,15 +21,8 @@ package org.apache.qpid.server.queue; -import org.junit.Assert; -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import javax.jms.Connection; import javax.jms.JMSException; @@ -39,8 +32,17 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TopicSubscriber; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import javax.naming.NamingException; + +import org.apache.log4j.Logger; +import org.junit.Assert; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class TimeToLiveTest extends QpidBrokerTestCase { @@ -53,18 +55,29 @@ public class TimeToLiveTest extends QpidBrokerTestCase private static final int MSG_COUNT = 50; private static final long SERVER_TTL_TIMEOUT = 60000L; + public void testPassiveTTLWithPrefetch() throws Exception + { + doTestPassiveTTL(true); + } + public void testPassiveTTL() throws Exception { + doTestPassiveTTL(false); + + } + + private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException + { //Create Client 1 Connection clientConnection = getConnection(); - + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = clientSession.createQueue(QUEUE); - + Queue queue = clientSession.createQueue(QUEUE); + // Create then close the consumer so the queue is actually created // Closing it then reopening it ensures that the consumer shouldn't get messages // which should have expired and allows a shorter sleep period. See QPID-1418 - + MessageConsumer consumer = clientSession.createConsumer(queue); consumer.close(); @@ -79,6 +92,12 @@ public class TimeToLiveTest extends QpidBrokerTestCase MessageProducer producer = producerSession.createProducer(queue); + consumer = clientSession.createConsumer(queue); + if(prefetchMessages) + { + clientConnection.start(); + } + //Set TTL int msg = 0; producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); @@ -96,7 +115,6 @@ public class TimeToLiveTest extends QpidBrokerTestCase producerSession.commit(); - consumer = clientSession.createConsumer(queue); // Ensure we sleep the required amount of time. ReentrantLock waitLock = new ReentrantLock(); @@ -124,6 +142,16 @@ public class TimeToLiveTest extends QpidBrokerTestCase } + if(prefetchMessages) + { + clientConnection.close(); + clientConnection = getConnection(); + + clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = clientSession.createQueue(QUEUE); + consumer = clientSession.createConsumer(queue); + } + clientConnection.start(); //Receive Message 0 @@ -131,14 +159,14 @@ public class TimeToLiveTest extends QpidBrokerTestCase Message receivedFirst = consumer.receive(5000); Message receivedSecond = consumer.receive(5000); Message receivedThird = consumer.receive(1000); - + // Log the messages to help diagnosis incase of failure _logger.info("First:"+receivedFirst); _logger.info("Second:"+receivedSecond); _logger.info("Third:"+receivedThird); // Only first and last messages sent should survive expiry - Assert.assertNull("More messages received", receivedThird); + Assert.assertNull("More messages received", receivedThird); Assert.assertNotNull("First message not received", receivedFirst); Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index a6a08d83f9..d0f133aa73 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -456,6 +456,61 @@ public class QueueManagementTest extends QpidBrokerTestCase assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessagesBetweenQueuesWithDuplicates() throws Exception + { + final int numberOfMessagesToSend = 10; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Copy first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first copy", + 3, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + // Now copy a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", + 5, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + // Attempt to copy mixture of messages already on and some not already on the queue + + fromMessageId = amqMessagesIds.get(5); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", + 7, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8, 5, 6); + + + } + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception { setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); |