diff options
author | Robert Gemmell <robbie@apache.org> | 2012-08-01 12:51:51 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-08-01 12:51:51 +0000 |
commit | 2dcf35c06038da5ec5cc53db48ab433dd027d8d0 (patch) | |
tree | a9d9740362bb37aa479b11f544bbf3031c9d6465 | |
parent | cf170625ba412e51ca5feb6e490d3dba36631c22 (diff) | |
download | qpid-python-2dcf35c06038da5ec5cc53db48ab433dd027d8d0.tar.gz |
QPID-4170: prevent JMX threads from spinning in the Queue MBean if the content retrieval fails, log an error if it does. Add unit tests to expose issue and verify fix.
merged from trunk r1367084
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1367967 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 84 insertions, 8 deletions
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 1416cfdd89..5c8b0f7194 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -43,6 +43,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import org.apache.commons.lang.time.FastDateFormat; +import org.apache.log4j.Logger; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.jmx.AMQManagedObject; import org.apache.qpid.server.jmx.ManagedObject; @@ -59,6 +60,8 @@ import org.apache.qpid.server.queue.QueueEntryVisitor; public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { + private static final Logger LOGGER = Logger.getLogger(QueueMBean.class); + private static final String[] VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY = VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]); @@ -370,12 +373,14 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN byte[] msgContent = new byte[bodySize]; ByteBuffer buf = ByteBuffer.wrap(msgContent); - int position = 0; + int stored = serverMsg.getContent(buf, 0); - while(position < bodySize) + if(bodySize != stored) { - position += serverMsg.getContent(buf, position); - + LOGGER.error(String.format("An unexpected amount of content was retrieved " + + "(expected %d, got %d bytes) when viewing content for message with ID %d " + + "on queue '%s' in virtual host '%s'", + bodySize, stored, messageId, _queue.getName(), _vhostMBean.getName())); } AMQMessageHeader header = serverMsg.getMessageHeader(); @@ -591,7 +596,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN } - private static class GetMessageVisitor implements QueueEntryVisitor + protected static class GetMessageVisitor implements QueueEntryVisitor { private final long _messageNumber; diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index 2003c12735..f2663bca4e 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -18,12 +18,14 @@ */ package org.apache.qpid.server.jmx.mbeans; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.argThat; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; @@ -31,19 +33,26 @@ import javax.management.ListenerNotFoundException; import javax.management.Notification; import javax.management.NotificationListener; import javax.management.OperationsException; +import javax.management.openmbean.CompositeDataSupport; +import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.jmx.ManagedObjectRegistry; +import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.NotificationCheck; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.test.utils.QpidTestCase; import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -import junit.framework.TestCase; - -public class QueueMBeanTest extends TestCase +public class QueueMBeanTest extends QpidTestCase { private static final String QUEUE_NAME = "QUEUE_NAME"; private static final String QUEUE_DESCRIPTION = "QUEUE_DESCRIPTION"; @@ -59,6 +68,7 @@ public class QueueMBeanTest extends TestCase @Override protected void setUp() throws Exception { + super.setUp(); _mockQueue = mock(Queue.class); _mockQueueStatistics = mock(Statistics.class); when(_mockQueue.getName()).thenReturn(QUEUE_NAME); @@ -365,4 +375,65 @@ public class QueueMBeanTest extends TestCase verify(_mockQueue).setAttribute(underlyingAttributeName, originalAttributeValue, newAttributeValue); } + public void testViewMessageContent() throws Exception + { + viewMessageContentTestImpl(16L, 1000, 1000); + } + + public void testViewMessageContentWithMissingPayload() throws Exception + { + viewMessageContentTestImpl(16L, 1000, 0); + } + + private void viewMessageContentTestImpl(final long messageNumber, + final int messageSize, + final int messageContentSize) throws Exception + { + final byte[] content = new byte[messageContentSize]; + + //mock message and queue entry to return a given message size, and have a given content + final ServerMessage<?> serverMessage = mock(ServerMessage.class); + when(serverMessage.getMessageNumber()).thenReturn(messageNumber); + when(serverMessage.getSize()).thenReturn((long)messageSize); + doAnswer(new Answer<Object>() + { + public Object answer(InvocationOnMock invocation) + { + Object[] args = invocation.getArguments(); + + //verify the arg types / expected values + assertEquals(2, args.length); + assertTrue(args[0] instanceof ByteBuffer); + assertTrue(args[1] instanceof Integer); + + ByteBuffer dest = (ByteBuffer) args[0]; + int offset = (Integer) args[1]; + assertEquals(0, offset); + + dest.put(content); + return messageContentSize; + } + }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class), Matchers.anyInt()); + + final QueueEntry entry = mock(QueueEntry.class); + when(entry.getMessage()).thenReturn(serverMessage); + + //mock the queue.visit() method to ensure we match the mock message + doAnswer(new Answer<Object>() + { + public Object answer(InvocationOnMock invocation) + { + Object[] args = invocation.getArguments(); + GetMessageVisitor visitor = (GetMessageVisitor) args[0]; + visitor.visit(entry); + return null; + } + }).when(_mockQueue).visit(Matchers.any(GetMessageVisitor.class)); + + //now retrieve the content and verify its size + CompositeDataSupport comp = (CompositeDataSupport) _queueMBean.viewMessageContent(messageNumber); + assertNotNull(comp); + byte[] data = (byte[]) comp.get(ManagedQueue.CONTENT); + assertEquals(messageSize, data.length); + } } |