diff options
-rw-r--r-- | java/broker/src/org/apache/qpid/server/queue/AMQQueue.java | 197 | ||||
-rw-r--r-- | java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java | 5 |
2 files changed, 117 insertions, 85 deletions
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java index 17511fb7e7..7125f8897b 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java @@ -18,7 +18,9 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.AMQManagedObject; @@ -97,7 +99,7 @@ public class AMQQueue implements Managable /** * max allowed size of a single message(in KBytes). */ - private long _maxAllowedMessageSize = 10000; // 10 MB + private long _maxAllowedMessageSize = 10000; // 10 MB /** * max allowed number of messages on a queue. @@ -107,7 +109,7 @@ public class AMQQueue implements Managable /** * max allowed size in KBytes for all the messages combined together in a queue. */ - private long _queueDepth = 10000000; // 10 GB + private long _queueDepth = 10000000; // 10 GB /** * total messages received by the queue since startup. @@ -122,33 +124,39 @@ public class AMQQueue implements Managable private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue { private String _queueName = null; - //private MBeanInfo _mbeanInfo; - // AMQ message attribute names exposed. + // AMQ message attribute names private String[] _msgAttributeNames = {"MessageId", - "Redelivered", - "Content's size", - "Contents"}; + "Header", + "Size", + "Redelivered" + }; // AMQ Message attribute descriptions. private String[] _msgAttributeDescriptions = {"Message Id", - "Redelivered", - "Message content's size in bytes", - "Message content bodies"}; - // AMQ message attribute types. - private OpenType[] _msgAttributeTypes = new OpenType[4]; - // Messages will be indexed according to the messageId. - private String[] _msgAttributeIndex = {"MessageId"}; - // Composite type for representing AMQ Message data. - private CompositeType _messageDataType = null; - // Datatype for representing AMQ messages list. - private TabularType _messagelistDataType = null; - - private String[] _contentNames = {"SerialNumber", "ContentBody"}; - private String[] _contentDesc = {"Serial Number", "Content Body"}; - private String[] _contentIndex = {"SerialNumber"}; - private OpenType[] _contentType = new OpenType[2]; - private CompositeType _contentBodyType = null; - private TabularType _contentBodyListType = null; + "Header", + "Message size in bytes", + "Redelivered" + }; + + private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. + private String[] _msgAttributeIndex = {"MessageId"}; // Messages will be indexed according to the messageId. + private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. + private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. + + + private CompositeType _msgContentType = null; // For message content + private String[] _msgContentAttributes = {"MessageId", + "MimeType", + "Encoding", + "Content" + }; + private String[] _msgContentDescriptions = {"Message Id", + "MimeType", + "Encoding", + "Message content" + }; + private OpenType[] _msgContentAttributeTypes = new OpenType[4]; + @MBeanConstructor("Creates an MBean exposing an AMQQueue.") public AMQQueueMBean() throws NotCompliantMBeanException @@ -162,22 +170,21 @@ public class AMQQueue implements Managable _queueName = jmxEncode(new StringBuffer(_name), 0).toString(); try { - _contentType[0] = SimpleType.INTEGER; - _contentType[1] = new ArrayType(1, SimpleType.BYTE); - _contentBodyType = new CompositeType("Content", - "Content", - _contentNames, - _contentDesc, - _contentType); - _contentBodyListType = new TabularType("MessageContents", - "Message Contents", - _contentBodyType, - _contentIndex); - - _msgAttributeTypes[0] = SimpleType.LONG; - _msgAttributeTypes[1] = SimpleType.BOOLEAN; - _msgAttributeTypes[2] = SimpleType.LONG; - _msgAttributeTypes[3] = _contentBodyListType; + _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id + _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType + _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding + _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content + _msgContentType = new CompositeType("MessageContent", + "AMQ Message Content", + _msgContentAttributes, + _msgContentDescriptions, + _msgContentAttributeTypes); + + + _msgAttributeTypes[0] = SimpleType.LONG; // For message id + _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes + _msgAttributeTypes[2] = SimpleType.LONG; // For size + _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered _messageDataType = new CompositeType("Message", "AMQ Message", @@ -288,10 +295,8 @@ public class AMQQueue implements Managable } return new Long(Math.round(queueSize / 100)); } - // Operations // calculates the size of an AMQMessage - private long getMessageSize(AMQMessage msg) { if (msg == null) @@ -374,6 +379,53 @@ public class AMQQueue implements Managable } } + public CompositeData viewMessageContent(long msgId) throws JMException + { + List<AMQMessage> list = _deliveryMgr.getMessages(); + CompositeData messageContent = null; + AMQMessage msg = null; + for (AMQMessage message : list) + { + if (message.getMessageId() == msgId) + { + msg = message; + break; + } + } + + if (msg != null) + { + // get message content + List<ContentBody> cBodies = msg.getContentBodies(); + List<Byte> msgContent = new ArrayList<Byte>(); + for (ContentBody body : cBodies) + { + if (body.getSize() != 0) + { + ByteBuffer slice = body.payload.slice(); + for (int j = 0; j < slice.limit(); j++) + { + msgContent.add(slice.get()); + } + } + } + + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties; + String mimeType = headerProperties.getContentType(); + String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); + + Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; + messageContent = new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); + } + else + { + throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName ); + } + + return messageContent; + } + /** * Returns the messages stored in this queue in tabular form. * @@ -391,43 +443,38 @@ public class AMQQueue implements Managable } List<AMQMessage> list = _deliveryMgr.getMessages(); + TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); if (beginIndex > list.size()) { - throw new JMException("FromIndex = " + beginIndex + ". There are only " + list.size() + " messages in the queue"); + return _messageList; } - endIndex = endIndex < list.size() ? endIndex : list.size(); - TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); for (int i = beginIndex; i <= endIndex; i++) { AMQMessage msg = list.get(i - 1); - long msgId = msg.getMessageId(); - - List<ContentBody> cBodies = msg.getContentBodies(); - - TabularDataSupport _contentList = new TabularDataSupport(_contentBodyListType); - int contentSerialNo = 1; long size = 0; - + // get message content + List<ContentBody> cBodies = msg.getContentBodies(); for (ContentBody body : cBodies) { - if (body.getSize() != 0) - { - Byte[] byteArray = getByteArray(body.payload.slice().array()); - size = size + byteArray.length; + size = size + body.getSize(); + } - Object[] contentValues = {contentSerialNo, byteArray}; - CompositeData contentData = new CompositeDataSupport(_contentBodyType, - _contentNames, - contentValues); + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties; + List<String> headerAttribsList = new ArrayList<String>(); + headerAttribsList.add("App Id=" + headerProperties.getAppId()); + headerAttribsList.add("MimeType=" + headerProperties.getContentType()); + headerAttribsList.add("Correlation Id=" + headerProperties.getCorrelationId()); + headerAttribsList.add("Encoding=" + headerProperties.getEncoding()); + headerAttribsList.add(headerProperties.toString()); - _contentList.put(contentData); - } - } + Object[] itemValues = {msg.getMessageId(), + headerAttribsList.toArray(new String[0]), + size, msg.isRedelivered()}; - Object[] itemValues = {msgId, true, size, _contentList}; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); @@ -438,26 +485,6 @@ public class AMQQueue implements Managable } /** - * A utility to convert byte[] to Byte[]. Required to create composite - * type for message contents. - * - * @param byteArray message content as byte[] - * @return Byte[] - */ - private Byte[] getByteArray(byte[] byteArray) - { - int size = byteArray.length; - List<Byte> list = new ArrayList<Byte>(); - - for (int i = 0; i < size; i++) - { - list.add(byteArray[i]); - } - - return list.toArray(new Byte[0]); - } - - /** * Creates all the notifications this MBean can send. * * @return Notifications broadcasted by this MBean. diff --git a/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java index 109b3c0c9c..9c976b0d30 100644 --- a/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java +++ b/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.management.MBeanOperationParameter; import javax.management.JMException; import javax.management.MBeanOperationInfo; import javax.management.openmbean.TabularData; +import javax.management.openmbean.CompositeData; import java.io.IOException; /** @@ -209,4 +210,8 @@ public interface ManagedQueue impact= MBeanOperationInfo.ACTION) void clearQueue() throws IOException, JMException; + @MBeanOperation(name="viewMessageContent", + description="Returns the message content along with MimeType and Encoding") + CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId) + throws IOException, JMException; } |