summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java338
1 files changed, 239 insertions, 99 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 2ed6be77c6..021128d2fc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -22,22 +22,23 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.transport.MessageProperties;
import javax.management.JMException;
-import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
import javax.management.OperationsException;
@@ -71,24 +72,16 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
- /**
- * Since the MBean is not associated with a real channel we can safely create our own store context
- * for use in the few methods that require one.
- */
- private StoreContext _storeContext = new StoreContext();
-
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
- private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" };
- private static String[] _msgAttributeIndex = { _msgAttributeNames[0] };
- private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
+
+ private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
- private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" };
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -100,7 +93,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
- super(ManagedQueue.class, ManagedQueue.TYPE);
+ super(ManagedQueue.class, ManagedQueue.TYPE, ManagedQueue.VERSION);
_queue = queue;
_queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
}
@@ -132,18 +125,20 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
_msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType =
- new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes,
- _msgContentAttributeTypes);
+ _msgContentType = new CompositeType("Message Content", "AMQ Message Content",
+ VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, VIEW_MSG_CONTENT_COMPOSITE_ITEM_DESCRIPTIONS,
+ _msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
_msgAttributeTypes[2] = SimpleType.LONG; // For size
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
+ _msgAttributeTypes[4] = SimpleType.LONG; // For queue position
- _messageDataType =
- new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
+ _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES,
+ VIEW_MSGS_COMPOSITE_ITEM_DESCRIPTIONS, _msgAttributeTypes);
+ _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType,
+ VIEW_MSGS_TABULAR_UNIQUE_INDEX);
}
public String getObjectInstanceName()
@@ -163,7 +158,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
public String getOwner()
{
- return String.valueOf(_queue.getOwner());
+ return String.valueOf(_queue.getPrincipalHolder() == null
+ ? null
+ : _queue.getPrincipalHolder().getPrincipal() == null
+ ? null
+ : _queue.getPrincipalHolder().getPrincipal().getName());
}
public boolean isAutoDelete()
@@ -221,11 +220,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
_queue.setMaximumMessageCount(value);
}
+ /**
+ * returns the maximum total size of messages(bytes) in the queue.
+ */
public Long getMaximumQueueDepth()
{
- long queueDepthInBytes = _queue.getMaximumQueueDepth();
-
- return queueDepthInBytes >> 10;
+ return _queue.getMaximumQueueDepth();
}
public void setMaximumQueueDepth(Long value)
@@ -234,19 +234,52 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
/**
- * returns the size of messages(KB) in the queue.
+ * returns the total size of messages(bytes) in the queue.
*/
public Long getQueueDepth() throws JMException
{
- long queueBytesSize = _queue.getQueueDepth();
+ return _queue.getQueueDepth();
+ }
+
+ public Long getCapacity()
+ {
+ return _queue.getCapacity();
+ }
+
+ public void setCapacity(Long capacity) throws IllegalArgumentException
+ {
+ if( _queue.getFlowResumeCapacity() > capacity )
+ {
+ throw new IllegalArgumentException("Capacity must not be less than FlowResumeCapacity");
+ }
+
+ _queue.setCapacity(capacity);
+ }
+
+ public Long getFlowResumeCapacity()
+ {
+ return _queue.getFlowResumeCapacity();
+ }
- return queueBytesSize >> 10;
+ public void setFlowResumeCapacity(Long flowResumeCapacity) throws IllegalArgumentException
+ {
+ if( _queue.getCapacity() < flowResumeCapacity )
+ {
+ throw new IllegalArgumentException("FlowResumeCapacity must not exceed Capacity");
+ }
+
+ _queue.setFlowResumeCapacity(flowResumeCapacity);
+ }
+
+ public boolean isFlowOverfull()
+ {
+ return _queue.isOverfull();
}
/**
* Checks if there is any notification to be send to the listeners
*/
- public void checkForNotification(AMQMessage msg) throws AMQException, JMException
+ public void checkForNotification(ServerMessage msg) throws AMQException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
@@ -296,29 +329,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
*/
public void deleteMessageFromTop() throws JMException
{
- try
- {
- _queue.deleteMessageFromTop(_storeContext);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
+ _queue.deleteMessageFromTop();
}
/**
+ * Clears the queue of non-acquired messages
+ *
+ * @return the number of messages deleted
* @see AMQQueue#clearQueue
*/
- public void clearQueue() throws JMException
+ public Long clearQueue() throws JMException
{
- try
- {
- _queue.clearQueue(_storeContext);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
+ return _queue.clearQueue();
}
/**
@@ -333,77 +355,115 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
- AMQMessage msg = entry.getMessage();
- // get message content
- Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
+ ServerMessage serverMsg = entry.getMessage();
+ final int bodySize = (int) serverMsg.getSize();
+
+
List<Byte> msgContent = new ArrayList<Byte>();
- while (cBodies.hasNext())
- {
- ContentChunk body = cBodies.next();
- if (body.getSize() != 0)
- {
- if (body.getSize() != 0)
- {
- ByteBuffer slice = body.getData().slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
- }
- }
- }
- try
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize);
+ int position = 0;
+
+ while(position < bodySize)
{
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
+ position += serverMsg.getContent(buf, position);
+ buf.flip();
+ for(int i = 0; i < buf.limit(); i++)
{
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+ msgContent.add(buf.get(i));
}
+ buf.clear();
+ }
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ AMQMessageHeader header = serverMsg.getMessageHeader();
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
- catch (AMQException e)
+ String mimeType = null, encoding = null;
+ if (header != null)
{
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
+ mimeType = header.getMimeType();
+
+ encoding = header.getEncoding();
}
+
+
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+ return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+
}
/**
* Returns the header contents of the messages stored in this queue in tabular form.
+ * Deprecated as of Qpid JMX API 1.3
*/
+ @Deprecated
public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
{
- if ((beginIndex > endIndex) || (beginIndex < 1))
+ return viewMessages((long)beginIndex,(long)endIndex);
+ }
+
+
+ /**
+ * Returns the header contents of the messages stored in this queue in tabular form.
+ * @param startPosition The queue position of the first message to be viewed
+ * @param endPosition The queue position of the last message to be viewed
+ */
+ public TabularData viewMessages(long startPosition, long endPosition) throws JMException
+ {
+ if ((startPosition > endPosition) || (startPosition < 1))
{
- throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex
+ throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
- List<QueueEntry> list = _queue.getMessagesOnTheQueue();
+ if ((endPosition - startPosition) > Integer.MAX_VALUE)
+ {
+ throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size");
+ }
+
+ List<QueueEntry> list = _queue.getMessagesRangeOnTheQueue(startPosition,endPosition);
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
try
{
// Create the tabular list of message header contents
- for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
+ int size = list.size();
+
+ for (int i = 0; i < size ; i++)
{
- AMQMessage msg = list.get(i - 1).getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() };
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
+ long position = startPosition + i;
+ final QueueEntry queueEntry = list.get(i);
+ ServerMessage serverMsg = queueEntry.getMessage();
+ if(serverMsg instanceof AMQMessage)
+ {
+ AMQMessage msg = (AMQMessage) serverMsg;
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
+ Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+ _messageList.put(messageData);
+
+ }
+ else if(serverMsg instanceof MessageTransferMessage)
+ {
+ // We have a 0-10 message
+ MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+
+ // Create header attributes list
+ String[] headerAttributes = getMessageTransferMessageHeaderProps(msg);
+ Object[] itemValues = {msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+ _messageList.put(messageData);
+ }
+ else
+ {
+ //unknown message
+ String[] headerAttributes = new String[]{"N/A"};
+ Object[] itemValues = { serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+ _messageList.put(messageData);
+ }
}
}
catch (AMQException e)
@@ -429,7 +489,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
int delMode = headerProperties.getDeliveryMode();
- list.add("JMSDeliveryMode = " + ((delMode == 1) ? "Persistent" : "Non_Persistent"));
+ list.add("JMSDeliveryMode = " +
+ ((delMode == BasicContentHeaderProperties.PERSISTENT) ? "Persistent" : "Non_Persistent"));
list.add("JMSPriority = " + headerProperties.getPriority());
list.add("JMSType = " + headerProperties.getType());
@@ -445,6 +506,44 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return list.toArray(new String[list.size()]);
}
+ private String[] getMessageTransferMessageHeaderProps(MessageTransferMessage msg)
+ {
+ List<String> list = new ArrayList<String>();
+
+ AMQMessageHeader header = msg.getMessageHeader();
+ MessageProperties msgProps = msg.getHeader().get(MessageProperties.class);
+
+ String appID = null;
+ String userID = null;
+
+ if(msgProps != null)
+ {
+ appID = msgProps.getAppId() == null ? "null" : new String(msgProps.getAppId());
+ userID = msgProps.getUserId() == null ? "null" : new String(msgProps.getUserId());
+ }
+
+ list.add("reply-to = " + header.getReplyTo());
+ list.add("propertyFlags = "); //TODO
+ list.add("ApplicationID = " + appID);
+ list.add("ClusterID = "); //TODO
+ list.add("UserId = " + userID);
+ list.add("JMSMessageID = " + header.getMessageId());
+ list.add("JMSCorrelationID = " + header.getCorrelationId());
+ list.add("JMSDeliveryMode = " + (msg.isPersistent() ? "Persistent" : "Non_Persistent"));
+ list.add("JMSPriority = " + header.getPriority());
+ list.add("JMSType = " + header.getType());
+
+ long longDate = header.getExpiration();
+ String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
+ list.add("JMSExpiration = " + strDate);
+
+ longDate = header.getTimestamp();
+ strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
+ list.add("JMSTimestamp = " + strDate);
+
+ return list.toArray(new String[list.size()]);
+ }
+
/**
* @see ManagedQueue#moveMessages
* @param fromMessageId
@@ -456,10 +555,51 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
if ((fromMessageId > toMessageId) || (fromMessageId < 1))
{
- throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
+ throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
+ }
+
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+ txn.commit();
+ }
+
+ /**
+ * @see ManagedQueue#deleteMessages
+ * @param fromMessageId
+ * @param toMessageId
+ * @throws JMException
+ */
+ public void deleteMessages(long fromMessageId, long toMessageId) throws JMException
+ {
+ if ((fromMessageId > toMessageId) || (fromMessageId < 1))
+ {
+ throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
+ }
+
+ _queue.removeMessagesFromQueue(fromMessageId, toMessageId);
+ }
+
+ /**
+ * @see ManagedQueue#copyMessages
+ * @param fromMessageId
+ * @param toMessageId
+ * @param toQueueName
+ * @throws JMException
+ */
+ public void copyMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
+ {
+ if ((fromMessageId > toMessageId) || (fromMessageId < 1))
+ {
+ throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+ _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+
+ txn.commit();
+
+
}
/**