diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java | 338 |
1 files changed, 239 insertions, 99 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 2ed6be77c6..021128d2fc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -22,22 +22,23 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.CommonContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.transport.MessageProperties; import javax.management.JMException; -import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; import javax.management.Notification; import javax.management.OperationsException; @@ -71,24 +72,16 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z"); - /** - * Since the MBean is not associated with a real channel we can safely create our own store context - * for use in the few methods that require one. - */ - private StoreContext _storeContext = new StoreContext(); - private AMQQueue _queue = null; private String _queueName = null; // OpenMBean data types for viewMessages method - private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" }; - private static String[] _msgAttributeIndex = { _msgAttributeNames[0] }; - private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. + + private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types. private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. // OpenMBean data types for viewMessageContent method private static CompositeType _msgContentType = null; - private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" }; private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; @@ -100,7 +93,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { - super(ManagedQueue.class, ManagedQueue.TYPE); + super(ManagedQueue.class, ManagedQueue.TYPE, ManagedQueue.VERSION); _queue = queue; _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString(); } @@ -132,18 +125,20 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content - _msgContentType = - new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes, - _msgContentAttributeTypes); + _msgContentType = new CompositeType("Message Content", "AMQ Message Content", + VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, VIEW_MSG_CONTENT_COMPOSITE_ITEM_DESCRIPTIONS, + _msgContentAttributeTypes); _msgAttributeTypes[0] = SimpleType.LONG; // For message id _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes _msgAttributeTypes[2] = SimpleType.LONG; // For size _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered + _msgAttributeTypes[4] = SimpleType.LONG; // For queue position - _messageDataType = - new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); - _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); + _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES, + VIEW_MSGS_COMPOSITE_ITEM_DESCRIPTIONS, _msgAttributeTypes); + _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, + VIEW_MSGS_TABULAR_UNIQUE_INDEX); } public String getObjectInstanceName() @@ -163,7 +158,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public String getOwner() { - return String.valueOf(_queue.getOwner()); + return String.valueOf(_queue.getPrincipalHolder() == null + ? null + : _queue.getPrincipalHolder().getPrincipal() == null + ? null + : _queue.getPrincipalHolder().getPrincipal().getName()); } public boolean isAutoDelete() @@ -221,11 +220,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que _queue.setMaximumMessageCount(value); } + /** + * returns the maximum total size of messages(bytes) in the queue. + */ public Long getMaximumQueueDepth() { - long queueDepthInBytes = _queue.getMaximumQueueDepth(); - - return queueDepthInBytes >> 10; + return _queue.getMaximumQueueDepth(); } public void setMaximumQueueDepth(Long value) @@ -234,19 +234,52 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } /** - * returns the size of messages(KB) in the queue. + * returns the total size of messages(bytes) in the queue. */ public Long getQueueDepth() throws JMException { - long queueBytesSize = _queue.getQueueDepth(); + return _queue.getQueueDepth(); + } + + public Long getCapacity() + { + return _queue.getCapacity(); + } + + public void setCapacity(Long capacity) throws IllegalArgumentException + { + if( _queue.getFlowResumeCapacity() > capacity ) + { + throw new IllegalArgumentException("Capacity must not be less than FlowResumeCapacity"); + } + + _queue.setCapacity(capacity); + } + + public Long getFlowResumeCapacity() + { + return _queue.getFlowResumeCapacity(); + } - return queueBytesSize >> 10; + public void setFlowResumeCapacity(Long flowResumeCapacity) throws IllegalArgumentException + { + if( _queue.getCapacity() < flowResumeCapacity ) + { + throw new IllegalArgumentException("FlowResumeCapacity must not exceed Capacity"); + } + + _queue.setFlowResumeCapacity(flowResumeCapacity); + } + + public boolean isFlowOverfull() + { + return _queue.isOverfull(); } /** * Checks if there is any notification to be send to the listeners */ - public void checkForNotification(AMQMessage msg) throws AMQException, JMException + public void checkForNotification(ServerMessage msg) throws AMQException { final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); @@ -296,29 +329,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que */ public void deleteMessageFromTop() throws JMException { - try - { - _queue.deleteMessageFromTop(_storeContext); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } + _queue.deleteMessageFromTop(); } /** + * Clears the queue of non-acquired messages + * + * @return the number of messages deleted * @see AMQQueue#clearQueue */ - public void clearQueue() throws JMException + public Long clearQueue() throws JMException { - try - { - _queue.clearQueue(_storeContext); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } + return _queue.clearQueue(); } /** @@ -333,77 +355,115 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } - AMQMessage msg = entry.getMessage(); - // get message content - Iterator<ContentChunk> cBodies = msg.getContentBodyIterator(); + ServerMessage serverMsg = entry.getMessage(); + final int bodySize = (int) serverMsg.getSize(); + + List<Byte> msgContent = new ArrayList<Byte>(); - while (cBodies.hasNext()) - { - ContentChunk body = cBodies.next(); - if (body.getSize() != 0) - { - if (body.getSize() != 0) - { - ByteBuffer slice = body.getData().slice(); - for (int j = 0; j < slice.limit(); j++) - { - msgContent.add(slice.get()); - } - } - } - } - try + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize); + int position = 0; + + while(position < bodySize) { - // Create header attributes list - CommonContentHeaderProperties headerProperties = - (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) + position += serverMsg.getContent(buf, position); + buf.flip(); + for(int i = 0; i < buf.limit(); i++) { - AMQShortString mimeTypeShortSting = headerProperties.getContentType(); - mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); - encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); + msgContent.add(buf.get(i)); } + buf.clear(); + } - Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + AMQMessageHeader header = serverMsg.getMessageHeader(); - return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); - } - catch (AMQException e) + String mimeType = null, encoding = null; + if (header != null) { - JMException jme = new JMException("Error creating header attributes list: " + e); - jme.initCause(e); - throw jme; + mimeType = header.getMimeType(); + + encoding = header.getEncoding(); } + + + Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + + return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues); + } /** * Returns the header contents of the messages stored in this queue in tabular form. + * Deprecated as of Qpid JMX API 1.3 */ + @Deprecated public TabularData viewMessages(int beginIndex, int endIndex) throws JMException { - if ((beginIndex > endIndex) || (beginIndex < 1)) + return viewMessages((long)beginIndex,(long)endIndex); + } + + + /** + * Returns the header contents of the messages stored in this queue in tabular form. + * @param startPosition The queue position of the first message to be viewed + * @param endPosition The queue position of the last message to be viewed + */ + public TabularData viewMessages(long startPosition, long endPosition) throws JMException + { + if ((startPosition > endPosition) || (startPosition < 1)) { - throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex + throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); } - List<QueueEntry> list = _queue.getMessagesOnTheQueue(); + if ((endPosition - startPosition) > Integer.MAX_VALUE) + { + throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size"); + } + + List<QueueEntry> list = _queue.getMessagesRangeOnTheQueue(startPosition,endPosition); TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); try { // Create the tabular list of message header contents - for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) + int size = list.size(); + + for (int i = 0; i < size ; i++) { - AMQMessage msg = list.get(i - 1).getMessage(); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() }; - CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); - _messageList.put(messageData); + long position = startPosition + i; + final QueueEntry queueEntry = list.get(i); + ServerMessage serverMsg = queueEntry.getMessage(); + if(serverMsg instanceof AMQMessage) + { + AMQMessage msg = (AMQMessage) serverMsg; + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + String[] headerAttributes = getMessageHeaderProperties(headerBody); + Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position}; + CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); + _messageList.put(messageData); + + } + else if(serverMsg instanceof MessageTransferMessage) + { + // We have a 0-10 message + MessageTransferMessage msg = (MessageTransferMessage) serverMsg; + + // Create header attributes list + String[] headerAttributes = getMessageTransferMessageHeaderProps(msg); + Object[] itemValues = {msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position}; + CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); + _messageList.put(messageData); + } + else + { + //unknown message + String[] headerAttributes = new String[]{"N/A"}; + Object[] itemValues = { serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position}; + CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); + _messageList.put(messageData); + } } } catch (AMQException e) @@ -429,7 +489,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString()); int delMode = headerProperties.getDeliveryMode(); - list.add("JMSDeliveryMode = " + ((delMode == 1) ? "Persistent" : "Non_Persistent")); + list.add("JMSDeliveryMode = " + + ((delMode == BasicContentHeaderProperties.PERSISTENT) ? "Persistent" : "Non_Persistent")); list.add("JMSPriority = " + headerProperties.getPriority()); list.add("JMSType = " + headerProperties.getType()); @@ -445,6 +506,44 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return list.toArray(new String[list.size()]); } + private String[] getMessageTransferMessageHeaderProps(MessageTransferMessage msg) + { + List<String> list = new ArrayList<String>(); + + AMQMessageHeader header = msg.getMessageHeader(); + MessageProperties msgProps = msg.getHeader().get(MessageProperties.class); + + String appID = null; + String userID = null; + + if(msgProps != null) + { + appID = msgProps.getAppId() == null ? "null" : new String(msgProps.getAppId()); + userID = msgProps.getUserId() == null ? "null" : new String(msgProps.getUserId()); + } + + list.add("reply-to = " + header.getReplyTo()); + list.add("propertyFlags = "); //TODO + list.add("ApplicationID = " + appID); + list.add("ClusterID = "); //TODO + list.add("UserId = " + userID); + list.add("JMSMessageID = " + header.getMessageId()); + list.add("JMSCorrelationID = " + header.getCorrelationId()); + list.add("JMSDeliveryMode = " + (msg.isPersistent() ? "Persistent" : "Non_Persistent")); + list.add("JMSPriority = " + header.getPriority()); + list.add("JMSType = " + header.getType()); + + long longDate = header.getExpiration(); + String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSExpiration = " + strDate); + + longDate = header.getTimestamp(); + strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSTimestamp = " + strDate); + + return list.toArray(new String[list.size()]); + } + /** * @see ManagedQueue#moveMessages * @param fromMessageId @@ -456,10 +555,51 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { if ((fromMessageId > toMessageId) || (fromMessageId < 1)) { - throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\""); + throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); + } + + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); + txn.commit(); + } + + /** + * @see ManagedQueue#deleteMessages + * @param fromMessageId + * @param toMessageId + * @throws JMException + */ + public void deleteMessages(long fromMessageId, long toMessageId) throws JMException + { + if ((fromMessageId > toMessageId) || (fromMessageId < 1)) + { + throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); + } + + _queue.removeMessagesFromQueue(fromMessageId, toMessageId); + } + + /** + * @see ManagedQueue#copyMessages + * @param fromMessageId + * @param toMessageId + * @param toQueueName + * @throws JMException + */ + public void copyMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException + { + if ((fromMessageId > toMessageId) || (fromMessageId < 1)) + { + throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); } - _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext); + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + + _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); + + txn.commit(); + + } /** |