summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-08-01 12:51:51 +0000
committerRobert Gemmell <robbie@apache.org>2012-08-01 12:51:51 +0000
commit2dcf35c06038da5ec5cc53db48ab433dd027d8d0 (patch)
treea9d9740362bb37aa479b11f544bbf3031c9d6465
parentcf170625ba412e51ca5feb6e490d3dba36631c22 (diff)
downloadqpid-python-2dcf35c06038da5ec5cc53db48ab433dd027d8d0.tar.gz
QPID-4170: prevent JMX threads from spinning in the Queue MBean if the content retrieval fails, log an error if it does. Add unit tests to expose issue and verify fix.
merged from trunk r1367084 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1367967 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java15
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java77
2 files changed, 84 insertions, 8 deletions
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index 1416cfdd89..5c8b0f7194 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -43,6 +43,7 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.log4j.Logger;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
@@ -59,6 +60,8 @@ import org.apache.qpid.server.queue.QueueEntryVisitor;
public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+ private static final Logger LOGGER = Logger.getLogger(QueueMBean.class);
+
private static final String[] VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY =
VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]);
@@ -370,12 +373,14 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
byte[] msgContent = new byte[bodySize];
ByteBuffer buf = ByteBuffer.wrap(msgContent);
- int position = 0;
+ int stored = serverMsg.getContent(buf, 0);
- while(position < bodySize)
+ if(bodySize != stored)
{
- position += serverMsg.getContent(buf, position);
-
+ LOGGER.error(String.format("An unexpected amount of content was retrieved " +
+ "(expected %d, got %d bytes) when viewing content for message with ID %d " +
+ "on queue '%s' in virtual host '%s'",
+ bodySize, stored, messageId, _queue.getName(), _vhostMBean.getName()));
}
AMQMessageHeader header = serverMsg.getMessageHeader();
@@ -591,7 +596,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
}
- private static class GetMessageVisitor implements QueueEntryVisitor
+ protected static class GetMessageVisitor implements QueueEntryVisitor
{
private final long _messageNumber;
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index 2003c12735..f2663bca4e 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.qpid.server.jmx.mbeans;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.argThat;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -31,19 +33,26 @@ import javax.management.ListenerNotFoundException;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.OperationsException;
+import javax.management.openmbean.CompositeDataSupport;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
+import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.NotificationCheck;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
-import junit.framework.TestCase;
-
-public class QueueMBeanTest extends TestCase
+public class QueueMBeanTest extends QpidTestCase
{
private static final String QUEUE_NAME = "QUEUE_NAME";
private static final String QUEUE_DESCRIPTION = "QUEUE_DESCRIPTION";
@@ -59,6 +68,7 @@ public class QueueMBeanTest extends TestCase
@Override
protected void setUp() throws Exception
{
+ super.setUp();
_mockQueue = mock(Queue.class);
_mockQueueStatistics = mock(Statistics.class);
when(_mockQueue.getName()).thenReturn(QUEUE_NAME);
@@ -365,4 +375,65 @@ public class QueueMBeanTest extends TestCase
verify(_mockQueue).setAttribute(underlyingAttributeName, originalAttributeValue, newAttributeValue);
}
+ public void testViewMessageContent() throws Exception
+ {
+ viewMessageContentTestImpl(16L, 1000, 1000);
+ }
+
+ public void testViewMessageContentWithMissingPayload() throws Exception
+ {
+ viewMessageContentTestImpl(16L, 1000, 0);
+ }
+
+ private void viewMessageContentTestImpl(final long messageNumber,
+ final int messageSize,
+ final int messageContentSize) throws Exception
+ {
+ final byte[] content = new byte[messageContentSize];
+
+ //mock message and queue entry to return a given message size, and have a given content
+ final ServerMessage<?> serverMessage = mock(ServerMessage.class);
+ when(serverMessage.getMessageNumber()).thenReturn(messageNumber);
+ when(serverMessage.getSize()).thenReturn((long)messageSize);
+ doAnswer(new Answer<Object>()
+ {
+ public Object answer(InvocationOnMock invocation)
+ {
+ Object[] args = invocation.getArguments();
+
+ //verify the arg types / expected values
+ assertEquals(2, args.length);
+ assertTrue(args[0] instanceof ByteBuffer);
+ assertTrue(args[1] instanceof Integer);
+
+ ByteBuffer dest = (ByteBuffer) args[0];
+ int offset = (Integer) args[1];
+ assertEquals(0, offset);
+
+ dest.put(content);
+ return messageContentSize;
+ }
+ }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class), Matchers.anyInt());
+
+ final QueueEntry entry = mock(QueueEntry.class);
+ when(entry.getMessage()).thenReturn(serverMessage);
+
+ //mock the queue.visit() method to ensure we match the mock message
+ doAnswer(new Answer<Object>()
+ {
+ public Object answer(InvocationOnMock invocation)
+ {
+ Object[] args = invocation.getArguments();
+ GetMessageVisitor visitor = (GetMessageVisitor) args[0];
+ visitor.visit(entry);
+ return null;
+ }
+ }).when(_mockQueue).visit(Matchers.any(GetMessageVisitor.class));
+
+ //now retrieve the content and verify its size
+ CompositeDataSupport comp = (CompositeDataSupport) _queueMBean.viewMessageContent(messageNumber);
+ assertNotNull(comp);
+ byte[] data = (byte[]) comp.get(ManagedQueue.CONTENT);
+ assertEquals(messageSize, data.length);
+ }
}