diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java | 93 |
1 files changed, 57 insertions, 36 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 785b668687..8bb958ed3f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -36,6 +36,7 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.message.ServerMessage; import javax.management.JMException; import javax.management.MBeanException; @@ -246,7 +247,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que /** * Checks if there is any notification to be send to the listeners */ - public void checkForNotification(AMQMessage msg) throws AMQException + public void checkForNotification(ServerMessage msg) throws AMQException { final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); @@ -333,48 +334,60 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } - AMQMessage msg = entry.getMessage(); - // get message content - Iterator<ContentChunk> cBodies = msg.getContentBodyIterator(); - List<Byte> msgContent = new ArrayList<Byte>(); - while (cBodies.hasNext()) + ServerMessage serverMsg = entry.getMessage(); + + if(serverMsg instanceof AMQMessage) { - ContentChunk body = cBodies.next(); - if (body.getSize() != 0) + AMQMessage msg = (AMQMessage) serverMsg; + // get message content + Iterator<ContentChunk> cBodies = msg.getContentBodyIterator(); + List<Byte> msgContent = new ArrayList<Byte>(); + while (cBodies.hasNext()) { + ContentChunk body = cBodies.next(); if (body.getSize() != 0) { - ByteBuffer slice = body.getData().slice(); - for (int j = 0; j < slice.limit(); j++) + if (body.getSize() != 0) { - msgContent.add(slice.get()); + ByteBuffer slice = body.getData().slice(); + for (int j = 0; j < slice.limit(); j++) + { + msgContent.add(slice.get()); + } } } } - } - try - { - // Create header attributes list - CommonContentHeaderProperties headerProperties = - (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) + + try { - AMQShortString mimeTypeShortSting = headerProperties.getContentType(); - mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); - encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); - } + // Create header attributes list + CommonContentHeaderProperties headerProperties = + (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; + String mimeType = null, encoding = null; + if (headerProperties != null) + { + AMQShortString mimeTypeShortSting = headerProperties.getContentType(); + mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); + encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); + } - Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + + return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues); + } + catch (AMQException e) + { + JMException jme = new JMException("Error creating header attributes list: " + e); + jme.initCause(e); + throw jme; + } - return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues); } - catch (AMQException e) + else { - JMException jme = new JMException("Error creating header attributes list: " + e); - jme.initCause(e); - throw jme; + // TODO 0-10 Messages for MBean + return null; } } @@ -398,13 +411,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { long position = i; - AMQMessage msg = list.get(i - 1).getMessage(); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position}; - CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); - _messageList.put(messageData); + ServerMessage serverMsg = list.get(i - 1).getMessage(); + if(serverMsg instanceof AMQMessage) + { + AMQMessage msg = (AMQMessage) serverMsg; + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + String[] headerAttributes = getMessageHeaderProperties(headerBody); + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position }; + CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); + _messageList.put(messageData); + } + else + { + // TODO 0-10 Message + } } } catch (AMQException e) |