diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java | 256 |
1 files changed, 236 insertions, 20 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java index 0604955290..244e547e02 100644 --- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java @@ -20,6 +20,9 @@ package org.apache.qpid.management.jmx; import org.apache.commons.lang.time.FastDateFormat; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.queue.AMQQueueMBean; import org.apache.qpid.test.utils.JMXTestUtils; @@ -27,15 +30,25 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.Session; -import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; + +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Tests the JMX API for the Managed Queue. @@ -43,17 +56,41 @@ import java.util.Map; */ public class ManagedQueueMBeanTest extends QpidBrokerTestCase { - /** - * JMX helper. - */ + protected static final Logger LOGGER = Logger.getLogger(ManagedQueueMBeanTest.class); + private JMXTestUtils _jmxUtils; + private Connection _connection; + private Session _session; + + private String _sourceQueueName; + private String _destinationQueueName; + private Destination _sourceQueue; + private Destination _destinationQueue; + private ManagedQueue _managedSourceQueue; + private ManagedQueue _managedDestinationQueue; public void setUp() throws Exception { _jmxUtils = new JMXTestUtils(this); _jmxUtils.setUp(); + super.setUp(); + _sourceQueueName = getTestQueueName() + "_src"; + _destinationQueueName = getTestQueueName() + "_dest"; + + _connection = getConnection(); + _connection.start(); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _sourceQueue = _session.createQueue(_sourceQueueName); + _destinationQueue = _session.createQueue(_destinationQueueName); + createQueueOnBroker(_sourceQueue); + createQueueOnBroker(_destinationQueue); + _jmxUtils.open(); + + _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); + _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); } public void tearDown() throws Exception @@ -70,28 +107,18 @@ public class ManagedQueueMBeanTest extends QpidBrokerTestCase */ public void testViewSingleMessage() throws Exception { - final String queueName = getTestQueueName(); - - // Create queue and send numMessages messages to it. - final Connection con = getConnection(); - final Session session = con.createSession(true, Session.SESSION_TRANSACTED); - final Destination dest = session.createQueue(queueName); - session.createConsumer(dest).close(); // Create a consumer only to cause queue creation - - final List<Message> sentMessages = sendMessage(session, dest, 1); + final List<Message> sentMessages = sendMessage(_session, _sourceQueue, 1); + syncSession(_session); final Message sentMessage = sentMessages.get(0); - // Obtain the management interface. - final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); - assertNotNull("ManagedQueue expected to be available", managedQueue); - assertEquals("Unexpected queue depth", 1, managedQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth", 1, _managedSourceQueue.getMessageCount().intValue()); // Check the contents of the message - final TabularData tab = managedQueue.viewMessages(1l, 1l); + final TabularData tab = _managedSourceQueue.viewMessages(1l, 1l); assertEquals("Unexpected number of rows in table", 1, tab.size()); - final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator(); + final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); - final CompositeDataSupport row1 = rowItr.next(); + final CompositeData row1 = rowItr.next(); assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID)); assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS)); assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED)); @@ -109,6 +136,184 @@ public class ManagedQueueMBeanTest extends QpidBrokerTestCase } /** + * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. + */ + public void testMoveMessagesBetweenQueues() throws Exception + { + final int numberOfMessagesToSend = 10; + + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Move first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first move", 3, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first move", 7, _managedSourceQueue.getMessageCount().intValue()); + + // Now move a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second move", 5, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second move", 5, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); + } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessagesBetweenQueues() throws Exception + { + final int numberOfMessagesToSend = 10; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Copy first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first copy", 3, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + // Now copy a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", 5, _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); + } + + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection asyncConnection = getConnection(); + asyncConnection.start(); + + final int numberOfMessagesToSend = 50; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); + + CountDownLatch consumerReadToHalfwayLatch = new CountDownLatch(numberOfMessagesToSend / 2); + AtomicInteger totalConsumed = new AtomicInteger(0); + startAsyncConsumerOn(_sourceQueue, asyncConnection, consumerReadToHalfwayLatch, totalConsumed); + + boolean halfwayPointReached = consumerReadToHalfwayLatch.await(5000, TimeUnit.MILLISECONDS); + assertTrue("Did not read half of messages within time allowed", halfwayPointReached); + + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + asyncConnection.stop(); + + // The exact number of messages moved will be non deterministic, as the number of messages processed + // by the consumer cannot be predicited. There is also the possibility that a message can remain + // on the source queue. This situation will arise if a message has been acquired by the consumer, but not + // yet delivered to the client application (i.e. MessageListener#onMessage()) when the Connection#stop() occurs. + // + // The number of messages moved + the number consumed + any messages remaining on source should + // *always* be equal to the number we originally sent. + + int numberOfMessagesReadByConsumer = totalConsumed.intValue(); + int numberOfMessagesOnDestinationQueue = _managedDestinationQueue.getMessageCount().intValue(); + int numberOfMessagesRemainingOnSourceQueue = _managedSourceQueue.getMessageCount().intValue(); + + LOGGER.debug("Async consumer read : " + numberOfMessagesReadByConsumer + + " Number of messages moved to destination : " + numberOfMessagesOnDestinationQueue + + " Number of messages remaining on source : " + numberOfMessagesRemainingOnSourceQueue); + assertEquals("Unexpected number of messages after move", numberOfMessagesToSend, numberOfMessagesReadByConsumer + numberOfMessagesOnDestinationQueue + numberOfMessagesRemainingOnSourceQueue); + } + + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnDestinationQueue() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + Connection asyncConnection = getConnection(); + asyncConnection.start(); + + final int numberOfMessagesToSend = 50; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); + + AtomicInteger totalConsumed = new AtomicInteger(0); + CountDownLatch allMessagesConsumedLatch = new CountDownLatch(numberOfMessagesToSend); + startAsyncConsumerOn(_destinationQueue, asyncConnection, allMessagesConsumedLatch, totalConsumed); + + _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); + + allMessagesConsumedLatch.await(5000, TimeUnit.MILLISECONDS); + assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); + } + + private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, + final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception + { + Session session = asyncConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() + { + + @Override + public void onMessage(Message arg0) + { + totalConsumed.incrementAndGet(); + requiredNumberOfMessagesRead.countDown(); + } + }); + } + + private void assertMessageIndicesOn(Destination queue, int... expectedIndices) throws Exception + { + MessageConsumer consumer = _session.createConsumer(queue); + + for (int i : expectedIndices) + { + Message message = consumer.receive(1000); + assertNotNull("Expected message with index " + i, message); + assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); + } + + assertNull("Unexpected message encountered", consumer.receive(1000)); + } + + private List<Long> getAMQMessageIdsOn(ManagedQueue managedQueue, long startIndex, long endIndex) throws Exception + { + final SortedSet<Long> messageIds = new TreeSet<Long>(); + + final TabularData tab = managedQueue.viewMessages(startIndex, endIndex); + final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator(); + while(rowItr.hasNext()) + { + final CompositeData row = rowItr.next(); + long amqMessageId = (Long)row.get(ManagedQueue.MSG_AMQ_ID); + messageIds.add(amqMessageId); + } + + return new ArrayList<Long>(messageIds); + } + + /** * * Utility method to convert array of Strings in the form x = y into a * map with key/value x => y. @@ -126,4 +331,15 @@ public class ManagedQueueMBeanTest extends QpidBrokerTestCase } return headerMap; } + + private void createQueueOnBroker(Destination destination) throws JMSException + { + _session.createConsumer(destination).close(); // Create a consumer only to cause queue creation + } + + private void syncSession(Session session) throws Exception + { + ((AMQSession<?,?>)session).sync(); + } + } |