summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-07-30 13:36:00 +0000
committerRobert Gemmell <robbie@apache.org>2012-07-30 13:36:00 +0000
commit4a84a99169a1598c466d8658290da46d90eaf33d (patch)
tree8c3aea5ee024ac19266546b702082b0d266cb713
parentfa71a13c8c1ca098e1725a8663d8d719a2f6437d (diff)
downloadqpid-python-4a84a99169a1598c466d8658290da46d90eaf33d.tar.gz
QPID-4170: prevent JMX threads from spinning in the Queue MBean if the content retrieval fails, log an error if it does. Add unit tests to expose issue and verify fix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1367084 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java15
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java77
2 files changed, 84 insertions, 8 deletions
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index 1416cfdd89..5c8b0f7194 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -43,6 +43,7 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.log4j.Logger;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
@@ -59,6 +60,8 @@ import org.apache.qpid.server.queue.QueueEntryVisitor;
public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+ private static final Logger LOGGER = Logger.getLogger(QueueMBean.class);
+
private static final String[] VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC_ARRAY =
VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]);
@@ -370,12 +373,14 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
byte[] msgContent = new byte[bodySize];
ByteBuffer buf = ByteBuffer.wrap(msgContent);
- int position = 0;
+ int stored = serverMsg.getContent(buf, 0);
- while(position < bodySize)
+ if(bodySize != stored)
{
- position += serverMsg.getContent(buf, position);
-
+ LOGGER.error(String.format("An unexpected amount of content was retrieved " +
+ "(expected %d, got %d bytes) when viewing content for message with ID %d " +
+ "on queue '%s' in virtual host '%s'",
+ bodySize, stored, messageId, _queue.getName(), _vhostMBean.getName()));
}
AMQMessageHeader header = serverMsg.getMessageHeader();
@@ -591,7 +596,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
}
- private static class GetMessageVisitor implements QueueEntryVisitor
+ protected static class GetMessageVisitor implements QueueEntryVisitor
{
private final long _messageNumber;
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index 2003c12735..f2663bca4e 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.qpid.server.jmx.mbeans;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.argThat;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -31,19 +33,26 @@ import javax.management.ListenerNotFoundException;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.OperationsException;
+import javax.management.openmbean.CompositeDataSupport;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
+import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.NotificationCheck;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
-import junit.framework.TestCase;
-
-public class QueueMBeanTest extends TestCase
+public class QueueMBeanTest extends QpidTestCase
{
private static final String QUEUE_NAME = "QUEUE_NAME";
private static final String QUEUE_DESCRIPTION = "QUEUE_DESCRIPTION";
@@ -59,6 +68,7 @@ public class QueueMBeanTest extends TestCase
@Override
protected void setUp() throws Exception
{
+ super.setUp();
_mockQueue = mock(Queue.class);
_mockQueueStatistics = mock(Statistics.class);
when(_mockQueue.getName()).thenReturn(QUEUE_NAME);
@@ -365,4 +375,65 @@ public class QueueMBeanTest extends TestCase
verify(_mockQueue).setAttribute(underlyingAttributeName, originalAttributeValue, newAttributeValue);
}
+ public void testViewMessageContent() throws Exception
+ {
+ viewMessageContentTestImpl(16L, 1000, 1000);
+ }
+
+ public void testViewMessageContentWithMissingPayload() throws Exception
+ {
+ viewMessageContentTestImpl(16L, 1000, 0);
+ }
+
+ private void viewMessageContentTestImpl(final long messageNumber,
+ final int messageSize,
+ final int messageContentSize) throws Exception
+ {
+ final byte[] content = new byte[messageContentSize];
+
+ //mock message and queue entry to return a given message size, and have a given content
+ final ServerMessage<?> serverMessage = mock(ServerMessage.class);
+ when(serverMessage.getMessageNumber()).thenReturn(messageNumber);
+ when(serverMessage.getSize()).thenReturn((long)messageSize);
+ doAnswer(new Answer<Object>()
+ {
+ public Object answer(InvocationOnMock invocation)
+ {
+ Object[] args = invocation.getArguments();
+
+ //verify the arg types / expected values
+ assertEquals(2, args.length);
+ assertTrue(args[0] instanceof ByteBuffer);
+ assertTrue(args[1] instanceof Integer);
+
+ ByteBuffer dest = (ByteBuffer) args[0];
+ int offset = (Integer) args[1];
+ assertEquals(0, offset);
+
+ dest.put(content);
+ return messageContentSize;
+ }
+ }).when(serverMessage).getContent(Matchers.any(ByteBuffer.class), Matchers.anyInt());
+
+ final QueueEntry entry = mock(QueueEntry.class);
+ when(entry.getMessage()).thenReturn(serverMessage);
+
+ //mock the queue.visit() method to ensure we match the mock message
+ doAnswer(new Answer<Object>()
+ {
+ public Object answer(InvocationOnMock invocation)
+ {
+ Object[] args = invocation.getArguments();
+ GetMessageVisitor visitor = (GetMessageVisitor) args[0];
+ visitor.visit(entry);
+ return null;
+ }
+ }).when(_mockQueue).visit(Matchers.any(GetMessageVisitor.class));
+
+ //now retrieve the content and verify its size
+ CompositeDataSupport comp = (CompositeDataSupport) _queueMBean.viewMessageContent(messageNumber);
+ assertNotNull(comp);
+ byte[] data = (byte[]) comp.get(ManagedQueue.CONTENT);
+ assertEquals(messageSize, data.length);
+ }
}