diff options
author | Robert Gemmell <robbie@apache.org> | 2012-07-30 13:36:00 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-07-30 13:36:00 +0000 |
commit | 4a84a99169a1598c466d8658290da46d90eaf33d (patch) | |
tree | 8c3aea5ee024ac19266546b702082b0d266cb713 | |
parent | fa71a13c8c1ca098e1725a8663d8d719a2f6437d (diff) | |
download | qpid-python-4a84a99169a1598c466d8658290da46d90eaf33d.tar.gz |
QPID-4170: prevent JMX threads from spinning in the Queue MBean if the content retrieval fails, log an error if it does. Add unit tests to expose issue and verify fix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1367084 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 84 insertions, 8 deletions
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 1416cfdd89..5c8b0f7194 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -43,6 +43,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import org.apache.commons.lang.time.FastDateFormat; +import org.apache.log4j.Logger; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObject; @@ -59,6 +60,8 @@ import org.apache.qpid.server.queue.QueueEntryVisitor; public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { + private static final Logger LOGGER = Logger.getLogger(QueueMBean.class); + private static final String[] VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY = VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]); @@ -370,12 +373,14 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN byte[] msgContent = new byte[bodySize]; ByteBuffer buf = ByteBuffer.wrap(msgContent); - int position = 0; + int stored = serverMsg.getContent(buf, 0); - while(position < bodySize) + if(bodySize != stored) { - position += serverMsg.getContent(buf, position); - + LOGGER.error(String.format("An unexpected amount of content was retrieved " + + "(expected %d, got %d bytes) when viewing content for message with ID %d " + + "on queue '%s' in virtual host '%s'", + bodySize, stored, messageId, _queue.getName(), _vhostMBean.getName())); } AMQMessageHeader header = serverMsg.getMessageHeader(); @@ -591,7 +596,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN } - private static class GetMessageVisitor implements QueueEntryVisitor + protected static class GetMessageVisitor implements QueueEntryVisitor { private final long _messageNumber; diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index 2003c12735..f2663bca4e 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -18,12 +18,14 @@ */ package org.apache.qpid.server.jmx.mbeans; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.argThat; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -31,19 +33,26 @@ import javax.management.ListenerNotFoundException; import javax.management.Notification; import javax.management.NotificationListener; import javax.management.OperationsException; +import javax.management.openmbean.CompositeDataSupport; +import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.NotificationCheck; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.test.utils.QpidTestCase; import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -import junit.framework.TestCase; - -public class QueueMBeanTest extends TestCase +public class QueueMBeanTest extends QpidTestCase { private static final String QUEUE_NAME = "QUEUE_NAME"; private static final String QUEUE_DESCRIPTION = "QUEUE_DESCRIPTION"; @@ -59,6 +68,7 @@ public class QueueMBeanTest extends TestCase @Override protected void setUp() throws Exception { + super.setUp(); _mockQueue = mock(Queue.class); _mockQueueStatistics = mock(Statistics.class); when(_mockQueue.getName()).thenReturn(QUEUE_NAME); @@ -365,4 +375,65 @@ public class QueueMBeanTest extends TestCase verify(_mockQueue).setAttribute(underlyingAttributeName, originalAttributeValue, newAttributeValue); } + public void testViewMessageContent() throws Exception + { + viewMessageContentTestImpl(16L, 1000, 1000); + } + + public void testViewMessageContentWithMissingPayload() throws Exception + { + viewMessageContentTestImpl(16L, 1000, 0); + } + + private void viewMessageContentTestImpl(final long messageNumber, + final int messageSize, + final int messageContentSize) throws Exception + { + final byte[] content = new byte[messageContentSize]; + + //mock message and queue entry to return a given message size, and have a given content + final ServerMessage<?> serverMessage = mock(ServerMessage.class); + when(serverMessage.getMessageNumber()).thenReturn(messageNumber); + when(serverMessage.getSize()).thenReturn((long)messageSize); + doAnswer(new Answer<Object>() + { + public Object answer(InvocationOnMock invocation) + { + Object[] args = invocation.getArguments(); + + //verify the arg types / expected values + assertEquals(2, args.length); + assertTrue(args[0] instanceof ByteBuffer); + assertTrue(args[1] instanceof Integer); + + ByteBuffer dest = (ByteBuffer) args[0]; + int offset = (Integer) args[1]; + assertEquals(0, offset); + + dest.put(content); + return messageContentSize; + } + }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class), Matchers.anyInt()); + + final QueueEntry entry = mock(QueueEntry.class); + when(entry.getMessage()).thenReturn(serverMessage); + + //mock the queue.visit() method to ensure we match the mock message + doAnswer(new Answer<Object>() + { + public Object answer(InvocationOnMock invocation) + { + Object[] args = invocation.getArguments(); + GetMessageVisitor visitor = (GetMessageVisitor) args[0]; + visitor.visit(entry); + return null; + } + }).when(_mockQueue).visit(Matchers.any(GetMessageVisitor.class)); + + //now retrieve the content and verify its size + CompositeDataSupport comp = (CompositeDataSupport) _queueMBean.viewMessageContent(messageNumber); + assertNotNull(comp); + byte[] data = (byte[]) comp.get(ManagedQueue.CONTENT); + assertEquals(messageSize, data.length); + } } |