summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQQueue.java197
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java5
2 files changed, 117 insertions, 85 deletions
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
index 17511fb7e7..7125f8897b 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
@@ -18,7 +18,9 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.AMQManagedObject;
@@ -97,7 +99,7 @@ public class AMQQueue implements Managable
/**
* max allowed size of a single message(in KBytes).
*/
- private long _maxAllowedMessageSize = 10000; // 10 MB
+ private long _maxAllowedMessageSize = 10000; // 10 MB
/**
* max allowed number of messages on a queue.
@@ -107,7 +109,7 @@ public class AMQQueue implements Managable
/**
* max allowed size in KBytes for all the messages combined together in a queue.
*/
- private long _queueDepth = 10000000; // 10 GB
+ private long _queueDepth = 10000000; // 10 GB
/**
* total messages received by the queue since startup.
@@ -122,33 +124,39 @@ public class AMQQueue implements Managable
private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
{
private String _queueName = null;
- //private MBeanInfo _mbeanInfo;
- // AMQ message attribute names exposed.
+ // AMQ message attribute names
private String[] _msgAttributeNames = {"MessageId",
- "Redelivered",
- "Content's size",
- "Contents"};
+ "Header",
+ "Size",
+ "Redelivered"
+ };
// AMQ Message attribute descriptions.
private String[] _msgAttributeDescriptions = {"Message Id",
- "Redelivered",
- "Message content's size in bytes",
- "Message content bodies"};
- // AMQ message attribute types.
- private OpenType[] _msgAttributeTypes = new OpenType[4];
- // Messages will be indexed according to the messageId.
- private String[] _msgAttributeIndex = {"MessageId"};
- // Composite type for representing AMQ Message data.
- private CompositeType _messageDataType = null;
- // Datatype for representing AMQ messages list.
- private TabularType _messagelistDataType = null;
-
- private String[] _contentNames = {"SerialNumber", "ContentBody"};
- private String[] _contentDesc = {"Serial Number", "Content Body"};
- private String[] _contentIndex = {"SerialNumber"};
- private OpenType[] _contentType = new OpenType[2];
- private CompositeType _contentBodyType = null;
- private TabularType _contentBodyListType = null;
+ "Header",
+ "Message size in bytes",
+ "Redelivered"
+ };
+
+ private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
+ private String[] _msgAttributeIndex = {"MessageId"}; // Messages will be indexed according to the messageId.
+ private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+ private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
+
+
+ private CompositeType _msgContentType = null; // For message content
+ private String[] _msgContentAttributes = {"MessageId",
+ "MimeType",
+ "Encoding",
+ "Content"
+ };
+ private String[] _msgContentDescriptions = {"Message Id",
+ "MimeType",
+ "Encoding",
+ "Message content"
+ };
+ private OpenType[] _msgContentAttributeTypes = new OpenType[4];
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue.")
public AMQQueueMBean() throws NotCompliantMBeanException
@@ -162,22 +170,21 @@ public class AMQQueue implements Managable
_queueName = jmxEncode(new StringBuffer(_name), 0).toString();
try
{
- _contentType[0] = SimpleType.INTEGER;
- _contentType[1] = new ArrayType(1, SimpleType.BYTE);
- _contentBodyType = new CompositeType("Content",
- "Content",
- _contentNames,
- _contentDesc,
- _contentType);
- _contentBodyListType = new TabularType("MessageContents",
- "Message Contents",
- _contentBodyType,
- _contentIndex);
-
- _msgAttributeTypes[0] = SimpleType.LONG;
- _msgAttributeTypes[1] = SimpleType.BOOLEAN;
- _msgAttributeTypes[2] = SimpleType.LONG;
- _msgAttributeTypes[3] = _contentBodyListType;
+ _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+ _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+ _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+ _msgContentType = new CompositeType("MessageContent",
+ "AMQ Message Content",
+ _msgContentAttributes,
+ _msgContentDescriptions,
+ _msgContentAttributeTypes);
+
+
+ _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+ _msgAttributeTypes[2] = SimpleType.LONG; // For size
+ _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
_messageDataType = new CompositeType("Message",
"AMQ Message",
@@ -288,10 +295,8 @@ public class AMQQueue implements Managable
}
return new Long(Math.round(queueSize / 100));
}
- // Operations
// calculates the size of an AMQMessage
-
private long getMessageSize(AMQMessage msg)
{
if (msg == null)
@@ -374,6 +379,53 @@ public class AMQQueue implements Managable
}
}
+ public CompositeData viewMessageContent(long msgId) throws JMException
+ {
+ List<AMQMessage> list = _deliveryMgr.getMessages();
+ CompositeData messageContent = null;
+ AMQMessage msg = null;
+ for (AMQMessage message : list)
+ {
+ if (message.getMessageId() == msgId)
+ {
+ msg = message;
+ break;
+ }
+ }
+
+ if (msg != null)
+ {
+ // get message content
+ List<ContentBody> cBodies = msg.getContentBodies();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ for (ContentBody body : cBodies)
+ {
+ if (body.getSize() != 0)
+ {
+ ByteBuffer slice = body.payload.slice();
+ for (int j = 0; j < slice.limit(); j++)
+ {
+ msgContent.add(slice.get());
+ }
+ }
+ }
+
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+ String mimeType = headerProperties.getContentType();
+ String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+
+ Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+ messageContent = new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+ }
+ else
+ {
+ throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName );
+ }
+
+ return messageContent;
+ }
+
/**
* Returns the messages stored in this queue in tabular form.
*
@@ -391,43 +443,38 @@ public class AMQQueue implements Managable
}
List<AMQMessage> list = _deliveryMgr.getMessages();
+ TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
if (beginIndex > list.size())
{
- throw new JMException("FromIndex = " + beginIndex + ". There are only " + list.size() + " messages in the queue");
+ return _messageList;
}
-
endIndex = endIndex < list.size() ? endIndex : list.size();
- TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
for (int i = beginIndex; i <= endIndex; i++)
{
AMQMessage msg = list.get(i - 1);
- long msgId = msg.getMessageId();
-
- List<ContentBody> cBodies = msg.getContentBodies();
-
- TabularDataSupport _contentList = new TabularDataSupport(_contentBodyListType);
- int contentSerialNo = 1;
long size = 0;
-
+ // get message content
+ List<ContentBody> cBodies = msg.getContentBodies();
for (ContentBody body : cBodies)
{
- if (body.getSize() != 0)
- {
- Byte[] byteArray = getByteArray(body.payload.slice().array());
- size = size + byteArray.length;
+ size = size + body.getSize();
+ }
- Object[] contentValues = {contentSerialNo, byteArray};
- CompositeData contentData = new CompositeDataSupport(_contentBodyType,
- _contentNames,
- contentValues);
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+ List<String> headerAttribsList = new ArrayList<String>();
+ headerAttribsList.add("App Id=" + headerProperties.getAppId());
+ headerAttribsList.add("MimeType=" + headerProperties.getContentType());
+ headerAttribsList.add("Correlation Id=" + headerProperties.getCorrelationId());
+ headerAttribsList.add("Encoding=" + headerProperties.getEncoding());
+ headerAttribsList.add(headerProperties.toString());
- _contentList.put(contentData);
- }
- }
+ Object[] itemValues = {msg.getMessageId(),
+ headerAttribsList.toArray(new String[0]),
+ size, msg.isRedelivered()};
- Object[] itemValues = {msgId, true, size, _contentList};
CompositeData messageData = new CompositeDataSupport(_messageDataType,
_msgAttributeNames,
itemValues);
@@ -438,26 +485,6 @@ public class AMQQueue implements Managable
}
/**
- * A utility to convert byte[] to Byte[]. Required to create composite
- * type for message contents.
- *
- * @param byteArray message content as byte[]
- * @return Byte[]
- */
- private Byte[] getByteArray(byte[] byteArray)
- {
- int size = byteArray.length;
- List<Byte> list = new ArrayList<Byte>();
-
- for (int i = 0; i < size; i++)
- {
- list.add(byteArray[i]);
- }
-
- return list.toArray(new Byte[0]);
- }
-
- /**
* Creates all the notifications this MBean can send.
*
* @return Notifications broadcasted by this MBean.
diff --git a/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
index 109b3c0c9c..9c976b0d30 100644
--- a/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
+++ b/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.management.MBeanOperationParameter;
import javax.management.JMException;
import javax.management.MBeanOperationInfo;
import javax.management.openmbean.TabularData;
+import javax.management.openmbean.CompositeData;
import java.io.IOException;
/**
@@ -209,4 +210,8 @@ public interface ManagedQueue
impact= MBeanOperationInfo.ACTION)
void clearQueue() throws IOException, JMException;
+ @MBeanOperation(name="viewMessageContent",
+ description="Returns the message content along with MimeType and Encoding")
+ CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
+ throws IOException, JMException;
}