summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java93
1 files changed, 57 insertions, 36 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 785b668687..8bb958ed3f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -36,6 +36,7 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -246,7 +247,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
/**
* Checks if there is any notification to be send to the listeners
*/
- public void checkForNotification(AMQMessage msg) throws AMQException
+ public void checkForNotification(ServerMessage msg) throws AMQException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
@@ -333,48 +334,60 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
- AMQMessage msg = entry.getMessage();
- // get message content
- Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
- List<Byte> msgContent = new ArrayList<Byte>();
- while (cBodies.hasNext())
+ ServerMessage serverMsg = entry.getMessage();
+
+ if(serverMsg instanceof AMQMessage)
{
- ContentChunk body = cBodies.next();
- if (body.getSize() != 0)
+ AMQMessage msg = (AMQMessage) serverMsg;
+ // get message content
+ Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ while (cBodies.hasNext())
{
+ ContentChunk body = cBodies.next();
if (body.getSize() != 0)
{
- ByteBuffer slice = body.getData().slice();
- for (int j = 0; j < slice.limit(); j++)
+ if (body.getSize() != 0)
{
- msgContent.add(slice.get());
+ ByteBuffer slice = body.getData().slice();
+ for (int j = 0; j < slice.limit(); j++)
+ {
+ msgContent.add(slice.get());
+ }
}
}
}
- }
- try
- {
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
+
+ try
{
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
- }
+ // Create header attributes list
+ CommonContentHeaderProperties headerProperties =
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = null, encoding = null;
+ if (headerProperties != null)
+ {
+ AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+ mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+ encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+ }
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+ return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+ }
+ catch (AMQException e)
+ {
+ JMException jme = new JMException("Error creating header attributes list: " + e);
+ jme.initCause(e);
+ throw jme;
+ }
- return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
}
- catch (AMQException e)
+ else
{
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
+ // TODO 0-10 Messages for MBean
+ return null;
}
}
@@ -398,13 +411,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
long position = i;
- AMQMessage msg = list.get(i - 1).getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position};
- CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
- _messageList.put(messageData);
+ ServerMessage serverMsg = list.get(i - 1).getMessage();
+ if(serverMsg instanceof AMQMessage)
+ {
+ AMQMessage msg = (AMQMessage) serverMsg;
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position };
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+ _messageList.put(messageData);
+ }
+ else
+ {
+ // TODO 0-10 Message
+ }
}
}
catch (AMQException e)