/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.qpid.systest.management.jmx; import org.apache.commons.lang.time.FastDateFormat; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQSession; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.NotificationCheckTest; import org.apache.qpid.server.queue.SimpleAMQQueueTest; import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.management.Notification; import javax.management.NotificationListener; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; import javax.naming.NamingException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** * Tests the JMX API for the Managed Queue. * */ public class QueueManagementTest extends QpidBrokerTestCase { private static final Logger LOGGER = Logger.getLogger(QueueManagementTest.class); private static final String VIRTUAL_HOST = "test"; private static final String TEST_QUEUE_DESCRIPTION = "my description"; private JMXTestUtils _jmxUtils; private Connection _connection; private Session _session; private String _sourceQueueName; private String _destinationQueueName; private Destination _sourceQueue; private Destination _destinationQueue; private ManagedQueue _managedSourceQueue; private ManagedQueue _managedDestinationQueue; public void setUp() throws Exception { _jmxUtils = new JMXTestUtils(this); _jmxUtils.setUp(); super.setUp(); _sourceQueueName = getTestQueueName() + "_src"; _destinationQueueName = getTestQueueName() + "_dest"; createConnectionAndSession(); _sourceQueue = _session.createQueue(_sourceQueueName); _destinationQueue = _session.createQueue(_destinationQueueName); createQueueOnBroker(_sourceQueue); createQueueOnBroker(_destinationQueue); _jmxUtils.open(); createManagementInterfacesForQueues(); } public void tearDown() throws Exception { if (_jmxUtils != null) { _jmxUtils.close(); } super.tearDown(); } public void testQueueAttributes() throws Exception { Queue queue = _session.createQueue(getTestQueueName()); createQueueOnBroker(queue); final String queueName = queue.getQueueName(); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals("Unexpected name", queueName, managedQueue.getName()); assertEquals("Unexpected queue type", "standard", managedQueue.getQueueType()); } public void testExclusiveQueueHasJmsClientIdAsOwner() throws Exception { Queue tmpQueue = _session.createTemporaryQueue(); final String queueName = tmpQueue.getQueueName(); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertNotNull(_connection.getClientID()); assertEquals("Unexpected owner", _connection.getClientID(), managedQueue.getOwner()); } public void testNonExclusiveQueueHasNoOwner() throws Exception { Queue nonExclusiveQueue = _session.createQueue(getTestQueueName()); createQueueOnBroker(nonExclusiveQueue); final String queueName = nonExclusiveQueue.getQueueName(); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertNull("Unexpected owner", managedQueue.getOwner()); } public void testSetNewQueueDescriptionOnExistingQueue() throws Exception { Queue queue = _session.createQueue(getTestQueueName()); createQueueOnBroker(queue); final String queueName = queue.getQueueName(); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertNull("Unexpected description", managedQueue.getDescription()); managedQueue.setDescription(TEST_QUEUE_DESCRIPTION); assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); } public void testNewQueueWithDescription() throws Exception { String queueName = getTestQueueName(); Map arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); ((AMQSession)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); } /** * Requires persistent store. */ public void testQueueDescriptionSurvivesRestart() throws Exception { String queueName = getTestQueueName(); Map arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_QUEUE_DESCRIPTION); ((AMQSession)_session).createQueue(AMQShortString.valueOf(queueName), false, true, false, arguments); ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); restartBroker(); managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals(TEST_QUEUE_DESCRIPTION, managedQueue.getDescription()); } /** * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}. */ public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception { final String queueName = getName(); final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); final Integer deliveryCount = 1; final Map arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount); managedBroker.createNewQueue(queueName, null, true, arguments); // Ensure the queue exists assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount()); } public void testCreateQueueWithAlertingThresholdsSet() throws Exception { final String queueName = getName(); final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); final Long maximumMessageCount = 100l; final Long maximumMessageSize = 200l; final Long maximumQueueDepth = 300l; final Long maximumMessageAge = 400l; final Map arguments = new HashMap(); arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT, maximumMessageCount); arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE, maximumMessageSize); arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH, maximumQueueDepth); arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE, maximumMessageAge); managedBroker.createNewQueue(queueName, null, true, arguments); // Ensure the queue exists assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals("Unexpected maximum message count", maximumMessageCount, managedQueue.getMaximumMessageCount()); assertEquals("Unexpected maximum message size", maximumMessageSize, managedQueue.getMaximumMessageSize()); assertEquals("Unexpected maximum queue depth", maximumQueueDepth, managedQueue.getMaximumQueueDepth()); assertEquals("Unexpected maximum message age", maximumMessageAge, managedQueue.getMaximumMessageAge()); } /** * Requires 0-10 as relies on ADDR addresses. * @see AddressBasedDestinationTest for the testing of message routing to the alternate exchange */ public void testGetSetAlternateExchange() throws Exception { String queueName = getTestQueueName(); String altExchange = "amq.fanout"; String addrWithAltExch = String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName, altExchange); Queue queue = _session.createQueue(addrWithAltExch); createQueueOnBroker(queue); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals("Newly created queue does not have expected alternate exchange", altExchange, managedQueue.getAlternateExchange()); String newAltExch = "amq.topic"; managedQueue.setAlternateExchange(newAltExch); assertEquals("Unexpected alternate exchange after set", newAltExch, managedQueue.getAlternateExchange()); } /** * Requires 0-10 as relies on ADDR addresses. */ public void testRemoveAlternateExchange() throws Exception { String queueName = getTestQueueName(); String altExchange = "amq.fanout"; String addrWithAltExch = String.format("ADDR:%s;{create:always,node:{type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName, altExchange); Queue queue = _session.createQueue(addrWithAltExch); createQueueOnBroker(queue); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals("Newly created queue does not have expected alternate exchange", altExchange, managedQueue.getAlternateExchange()); managedQueue.setAlternateExchange(""); assertNull("Unexpected alternate exchange after set", managedQueue.getAlternateExchange()); } /** * Requires persistent store * Requires 0-10 as relies on ADDR addresses. */ public void testAlternateExchangeSurvivesRestart() throws Exception { String nonMandatoryExchangeName = "exch" + getName(); final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); managedBroker.createNewExchange(nonMandatoryExchangeName, "fanout", true); String queueName1 = getTestQueueName() + "1"; String altExchange1 = "amq.fanout"; String addr1WithAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue,x-declare:{alternate-exchange:'%s'}}}", queueName1, altExchange1); Queue queue1 = _session.createQueue(addr1WithAltExch); String queueName2 = getTestQueueName() + "2"; String addr2WithoutAltExch = String.format("ADDR:%s;{create:always,node:{durable: true,type:queue}}", queueName2); Queue queue2 = _session.createQueue(addr2WithoutAltExch); createQueueOnBroker(queue1); createQueueOnBroker(queue2); ManagedQueue managedQueue1 = _jmxUtils.getManagedQueue(queueName1); assertEquals("Newly created queue1 does not have expected alternate exchange", altExchange1, managedQueue1.getAlternateExchange()); ManagedQueue managedQueue2 = _jmxUtils.getManagedQueue(queueName2); assertNull("Newly created queue2 does not have expected alternate exchange", managedQueue2.getAlternateExchange()); String altExchange2 = nonMandatoryExchangeName; managedQueue2.setAlternateExchange(altExchange2); restartBroker(); managedQueue1 = _jmxUtils.getManagedQueue(queueName1); assertEquals("Queue1 does not have expected alternate exchange after restart", altExchange1, managedQueue1.getAlternateExchange()); managedQueue2 = _jmxUtils.getManagedQueue(queueName2); assertEquals("Queue2 does not have expected updated alternate exchange after restart", altExchange2, managedQueue2.getAlternateExchange()); } /** * Tests the ability to receive queue alerts as JMX notifications. * * @see NotificationCheckTest * @see SimpleAMQQueueTest#testNotificationFiredAsync() * @see SimpleAMQQueueTest#testNotificationFiredOnEnqueue() */ public void testQueueNotification() throws Exception { final String queueName = getName(); final long maximumMessageCount = 3; Queue queue = _session.createQueue(queueName); createQueueOnBroker(queue); ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); managedQueue.setMaximumMessageCount(maximumMessageCount); RecordingNotificationListener listener = new RecordingNotificationListener(1); _jmxUtils.addNotificationListener(_jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName), listener, null, null); // Send two messages - this should *not* trigger the notification sendMessage(_session, queue, 2); assertEquals("Premature notification received", 0, listener.getNumberOfNotificationsReceived()); // A further message should trigger the message count alert sendMessage(_session, queue, 1); listener.awaitExpectedNotifications(5, TimeUnit.SECONDS); assertEquals("Unexpected number of JMX notifications received", 1, listener.getNumberOfNotificationsReceived()); Notification notification = listener.getLastNotification(); assertEquals("Unexpected notification message", "MESSAGE_COUNT_ALERT 3: Maximum count on queue threshold (3) breached.", notification.getMessage()); } /** * Tests {@link ManagedQueue#viewMessages(long, long)} interface. */ public void testViewSingleMessage() throws Exception { final List sentMessages = sendMessage(_session, _sourceQueue, 1); syncSession(_session); final Message sentMessage = sentMessages.get(0); assertEquals("Unexpected queue depth", 1, _managedSourceQueue.getMessageCount().intValue()); // Check the contents of the message final TabularData tab = _managedSourceQueue.viewMessages(1l, 1l); assertEquals("Unexpected number of rows in table", 1, tab.size()); final Iterator rowItr = (Iterator) tab.values().iterator(); final CompositeData row1 = rowItr.next(); assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID)); assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS)); assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED)); // Check the contents of header (encoded in a string array) final String[] headerArray = (String[]) row1.get(ManagedQueue.MSG_HEADER); assertNotNull("Expected message header array", headerArray); final Map headers = headerArrayToMap(headerArray); final String expectedJMSMessageID = isBroker010() ? sentMessage.getJMSMessageID().replace("ID:", "") : sentMessage.getJMSMessageID(); final String expectedFormattedJMSTimestamp = FastDateFormat.getInstance(ManagedQueue.JMSTIMESTAMP_DATETIME_FORMAT).format(sentMessage.getJMSTimestamp()); assertEquals("Unexpected JMSMessageID within header", expectedJMSMessageID, headers.get("JMSMessageID")); assertEquals("Unexpected JMSPriority within header", String.valueOf(sentMessage.getJMSPriority()), headers.get("JMSPriority")); assertEquals("Unexpected JMSTimestamp within header", expectedFormattedJMSTimestamp, headers.get("JMSTimestamp")); } /** * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. */ public void testMoveMessagesBetweenQueues() throws Exception { final int numberOfMessagesToSend = 10; sendMessage(_session, _sourceQueue, numberOfMessagesToSend); syncSession(_session); assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); // Move first three messages to destination long fromMessageId = amqMessagesIds.get(0); long toMessageId = amqMessagesIds.get(2); _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); assertEquals("Unexpected queue depth on destination queue after first move", 3, _managedDestinationQueue.getMessageCount().intValue()); assertEquals("Unexpected queue depth on source queue after first move", 7, _managedSourceQueue.getMessageCount().intValue()); // Now move a further two messages to destination fromMessageId = amqMessagesIds.get(7); toMessageId = amqMessagesIds.get(8); _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); assertEquals("Unexpected queue depth on destination queue after second move", 5, _managedDestinationQueue.getMessageCount().intValue()); assertEquals("Unexpected queue depth on source queue after second move", 5, _managedSourceQueue.getMessageCount().intValue()); assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); } /** * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. */ public void testCopyMessagesBetweenQueues() throws Exception { final int numberOfMessagesToSend = 10; sendMessage(_session, _sourceQueue, numberOfMessagesToSend); syncSession(_session); assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); // Copy first three messages to destination long fromMessageId = amqMessagesIds.get(0); long toMessageId = amqMessagesIds.get(2); _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); assertEquals("Unexpected queue depth on destination queue after first copy", 3, _managedDestinationQueue.getMessageCount().intValue()); assertEquals("Unexpected queue depth on source queue after first copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); // Now copy a further two messages to destination fromMessageId = amqMessagesIds.get(7); toMessageId = amqMessagesIds.get(8); _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); assertEquals("Unexpected queue depth on destination queue after second copy", 5, _managedDestinationQueue.getMessageCount().intValue()); assertEquals("Unexpected queue depth on source queue after second copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); } public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception { setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); Connection asyncConnection = getConnection(); asyncConnection.start(); final int numberOfMessagesToSend = 50; sendMessage(_session, _sourceQueue, numberOfMessagesToSend); syncSession(_session); assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); long fromMessageId = amqMessagesIds.get(0); long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); CountDownLatch consumerReadToHalfwayLatch = new CountDownLatch(numberOfMessagesToSend / 2); AtomicInteger totalConsumed = new AtomicInteger(0); startAsyncConsumerOn(_sourceQueue, asyncConnection, consumerReadToHalfwayLatch, totalConsumed); boolean halfwayPointReached = consumerReadToHalfwayLatch.await(5000, TimeUnit.MILLISECONDS); assertTrue("Did not read half of messages within time allowed", halfwayPointReached); _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); asyncConnection.stop(); // The exact number of messages moved will be non deterministic, as the number of messages processed // by the consumer cannot be predicted. There is also the possibility that a message can remain // on the source queue. This situation will arise if a message has been acquired by the consumer, but not // yet delivered to the client application (i.e. MessageListener#onMessage()) when the Connection#stop() occurs. // // The number of messages moved + the number consumed + any messages remaining on source should // *always* be equal to the number we originally sent. int numberOfMessagesReadByConsumer = totalConsumed.intValue(); int numberOfMessagesOnDestinationQueue = _managedDestinationQueue.getMessageCount().intValue(); int numberOfMessagesRemainingOnSourceQueue = _managedSourceQueue.getMessageCount().intValue(); LOGGER.debug("Async consumer read : " + numberOfMessagesReadByConsumer + " Number of messages moved to destination : " + numberOfMessagesOnDestinationQueue + " Number of messages remaining on source : " + numberOfMessagesRemainingOnSourceQueue); assertEquals("Unexpected number of messages after move", numberOfMessagesToSend, numberOfMessagesReadByConsumer + numberOfMessagesOnDestinationQueue + numberOfMessagesRemainingOnSourceQueue); } public void testMoveMessagesBetweenQueuesWithActiveConsumerOnDestinationQueue() throws Exception { setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); Connection asyncConnection = getConnection(); asyncConnection.start(); final int numberOfMessagesToSend = 50; sendMessage(_session, _sourceQueue, numberOfMessagesToSend); syncSession(_session); assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); long fromMessageId = amqMessagesIds.get(0); long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1); AtomicInteger totalConsumed = new AtomicInteger(0); CountDownLatch allMessagesConsumedLatch = new CountDownLatch(numberOfMessagesToSend); startAsyncConsumerOn(_destinationQueue, asyncConnection, allMessagesConsumedLatch, totalConsumed); _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName); allMessagesConsumedLatch.await(5000, TimeUnit.MILLISECONDS); assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue()); } /** * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface. */ public void testMoveMessageBetweenQueuesWithBrokerRestart() throws Exception { final int numberOfMessagesToSend = 1; sendMessage(_session, _sourceQueue, numberOfMessagesToSend); syncSession(_session); assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); restartBroker(); createManagementInterfacesForQueues(); createConnectionAndSession(); List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); // Move messages to destination long messageId = amqMessagesIds.get(0); _managedSourceQueue.moveMessages(messageId, messageId, _destinationQueueName); assertEquals("Unexpected queue depth on destination queue after move", 1, _managedDestinationQueue.getMessageCount().intValue()); assertEquals("Unexpected queue depth on source queue after move", 0, _managedSourceQueue.getMessageCount().intValue()); assertMessageIndicesOn(_destinationQueue, 0); } /** * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. */ public void testCopyMessageBetweenQueuesWithBrokerRestart() throws Exception { final int numberOfMessagesToSend = 1; sendMessage(_session, _sourceQueue, numberOfMessagesToSend); syncSession(_session); assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); restartBroker(); createManagementInterfacesForQueues(); createConnectionAndSession(); List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); // Move messages to destination long messageId = amqMessagesIds.get(0); _managedSourceQueue.copyMessages(messageId, messageId, _destinationQueueName); assertEquals("Unexpected queue depth on destination queue after copy", 1, _managedDestinationQueue.getMessageCount().intValue()); assertEquals("Unexpected queue depth on source queue after copy", 1, _managedSourceQueue.getMessageCount().intValue()); assertMessageIndicesOn(_destinationQueue, 0); } /** * Tests {@link ManagedQueue#deleteMessages(long, long)} interface. */ public void testDeleteMessages() throws Exception { final int numberOfMessagesToSend = 15; sendMessage(_session, _sourceQueue, numberOfMessagesToSend); syncSession(_session); assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue()); List amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); // Current expected queue state, in terms of message header indices: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14] // Delete the first message (Remember the amqMessagesIds list, and the message indices added as a property when sending, are both 0-based index) long fromMessageId = amqMessagesIds.get(0); long toMessageId = fromMessageId; _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); assertEquals("Unexpected message count after first deletion", numberOfMessagesToSend - 1, _managedSourceQueue.getMessageCount().intValue()); // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,7,8,9,10,11,12,13,14] // Delete the 9th-10th messages, in the middle of the queue fromMessageId = amqMessagesIds.get(8); toMessageId = amqMessagesIds.get(9); _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); assertEquals("Unexpected message count after third deletion", numberOfMessagesToSend - 3, _managedSourceQueue.getMessageCount().intValue()); // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,7,X,X,10,11,12,13,14] // Delete the 11th and 12th messages, but still include the IDs for the 9th and 10th messages in the // range to ensure their IDs are 'skipped' until the matching messages are found fromMessageId = amqMessagesIds.get(8); toMessageId = amqMessagesIds.get(11); _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); assertEquals("Unexpected message count after fourth deletion", numberOfMessagesToSend - 5, _managedSourceQueue.getMessageCount().intValue()); // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,7,X,X,X,X,12,13,14] // Delete the 8th message and the 13th message, including the IDs for the 9th-12th messages in the // range to ensure their IDs are 'skipped' and the other matching message is found fromMessageId = amqMessagesIds.get(7); toMessageId = amqMessagesIds.get(12); _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); assertEquals("Unexpected message count after fourth deletion", numberOfMessagesToSend - 7, _managedSourceQueue.getMessageCount().intValue()); // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,X,X,X,X,X,X,13,14] // Delete the last message message fromMessageId = amqMessagesIds.get(numberOfMessagesToSend -1); toMessageId = fromMessageId; _managedSourceQueue.deleteMessages(fromMessageId, toMessageId); assertEquals("Unexpected message count after second deletion", numberOfMessagesToSend - 8, _managedSourceQueue.getMessageCount().intValue()); // Current expected queue state, in terms of message header indices: [X,1,2,3,4,5,6,X,X,X,X,X,X,13,X] // Verify the message indices with a consumer assertMessageIndicesOn(_sourceQueue, 1,2,3,4,5,6,13); } @Override public Message createNextMessage(Session session, int messageNumber) throws JMSException { Message message = session.createTextMessage(getContentForMessageNumber(messageNumber)); message.setIntProperty(INDEX, messageNumber); return message; } private void startAsyncConsumerOn(Destination queue, Connection asyncConnection, final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception { Session session = asyncConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message arg0) { totalConsumed.incrementAndGet(); requiredNumberOfMessagesRead.countDown(); } }); } private void assertMessageIndicesOn(Destination queue, int... expectedIndices) throws Exception { MessageConsumer consumer = _session.createConsumer(queue); for (int i : expectedIndices) { TextMessage message = (TextMessage)consumer.receive(1000); assertNotNull("Expected message with index " + i, message); assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX)); assertEquals("Expected message content", getContentForMessageNumber(i), message.getText()); } assertNull("Unexpected message encountered", consumer.receive(1000)); } private List getAMQMessageIdsOn(ManagedQueue managedQueue, long startIndex, long endIndex) throws Exception { final SortedSet messageIds = new TreeSet(); final TabularData tab = managedQueue.viewMessages(startIndex, endIndex); final Iterator rowItr = (Iterator) tab.values().iterator(); while(rowItr.hasNext()) { final CompositeData row = rowItr.next(); long amqMessageId = (Long)row.get(ManagedQueue.MSG_AMQ_ID); messageIds.add(amqMessageId); } return new ArrayList(messageIds); } /** * * Utility method to convert array of Strings in the form x = y into a * map with key/value x => y. * */ private Map headerArrayToMap(final String[] headerArray) { final Map headerMap = new HashMap(); final List headerList = Arrays.asList(headerArray); for (Iterator iterator = headerList.iterator(); iterator.hasNext();) { final String nameValuePair = iterator.next(); final String[] nameValue = nameValuePair.split(" *= *", 2); headerMap.put(nameValue[0], nameValue[1]); } return headerMap; } private void createQueueOnBroker(Destination destination) throws JMSException { _session.createConsumer(destination).close(); // Create a consumer only to cause queue creation } private void syncSession(Session session) throws Exception { ((AMQSession)session).sync(); } private void createConnectionAndSession() throws JMSException, NamingException { _connection = getConnection(); _connection.start(); _session = _connection.createSession(true, Session.SESSION_TRANSACTED); } private void createManagementInterfacesForQueues() { _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName); _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName); } private String getContentForMessageNumber(int msgCount) { return "Message count " + msgCount; } private final class RecordingNotificationListener implements NotificationListener { private final CountDownLatch _notificationReceivedLatch; private final AtomicInteger _numberOfNotifications; private final AtomicReference _lastNotification; private RecordingNotificationListener(int expectedNumberOfNotifications) { _notificationReceivedLatch = new CountDownLatch(expectedNumberOfNotifications); _numberOfNotifications = new AtomicInteger(0); _lastNotification = new AtomicReference(); } @Override public void handleNotification(Notification notification, Object handback) { _lastNotification.set(notification); _numberOfNotifications.incrementAndGet(); _notificationReceivedLatch.countDown(); } public int getNumberOfNotificationsReceived() { return _numberOfNotifications.get(); } public Notification getLastNotification() { return _lastNotification.get(); } public void awaitExpectedNotifications(long timeout, TimeUnit timeunit) throws InterruptedException { _notificationReceivedLatch.await(timeout, timeunit); } } }