diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java | 647 |
1 files changed, 647 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java new file mode 100644 index 0000000000..c8eb118b11 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -0,0 +1,647 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.transport.MessageProperties; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; +import javax.management.ObjectName; +import javax.management.OperationsException; +import javax.management.monitor.MonitorNotification; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * AMQQueueMBean is the management bean for an {@link AMQQueue}. + * + * <p/><tablse id="crc"><caption>CRC Caption</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + */ +@MBeanDescription("Management Interface for AMQQueue") +public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener +{ + /** Used for debugging purposes. */ + private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); + + private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z"); + + private AMQQueue _queue = null; + private String _queueName = null; + // OpenMBean data types for viewMessages method + + private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types. + private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. + private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. + + // OpenMBean data types for viewMessageContent method + private static CompositeType _msgContentType = null; + private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; + + private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + private Notification _lastNotification = null; + + + + + @MBeanConstructor("Creates an MBean exposing an AMQQueue") + public AMQQueueMBean(AMQQueue queue) throws JMException + { + super(ManagedQueue.class, ManagedQueue.TYPE); + _queue = queue; + _queueName = queue.getName(); + } + + public ManagedObject getParentObject() + { + return _queue.getVirtualHost().getManagedObject(); + } + + static + { + try + { + init(); + } + catch (JMException ex) + { + // This is not expected to ever occur. + throw new RuntimeException("Got JMException in static initializer.", ex); + } + } + + /** + * initialises the openmbean data types + */ + private static void init() throws OpenDataException + { + _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id + _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType + _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding + _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content + _msgContentType = new CompositeType("Message Content", "AMQ Message Content", + VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), + VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), + _msgContentAttributeTypes); + + _msgAttributeTypes[0] = SimpleType.LONG; // For message id + _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes + _msgAttributeTypes[2] = SimpleType.LONG; // For size + _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered + _msgAttributeTypes[4] = SimpleType.LONG; // For queue position + + _messageDataType = new CompositeType("Message", "AMQ Message", + VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), + VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), _msgAttributeTypes); + _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, + VIEW_MSGS_TABULAR_UNIQUE_INDEX.toArray(new String[VIEW_MSGS_TABULAR_UNIQUE_INDEX.size()])); + } + + public String getObjectInstanceName() + { + return ObjectName.quote(_queueName); + } + + public String getName() + { + return _queueName; + } + + public boolean isDurable() + { + return _queue.isDurable(); + } + + public String getOwner() + { + return String.valueOf(_queue.getOwner()); + } + + public boolean isAutoDelete() + { + return _queue.isAutoDelete(); + } + + public Integer getMessageCount() + { + return _queue.getMessageCount(); + } + + public Long getMaximumMessageSize() + { + return _queue.getMaximumMessageSize(); + } + + public Long getMaximumMessageAge() + { + return _queue.getMaximumMessageAge(); + } + + public void setMaximumMessageAge(Long maximumMessageAge) + { + _queue.setMaximumMessageAge(maximumMessageAge); + } + + public void setMaximumMessageSize(Long value) + { + _queue.setMaximumMessageSize(value); + } + + public Integer getConsumerCount() + { + return _queue.getConsumerCount(); + } + + public Integer getActiveConsumerCount() + { + return _queue.getActiveConsumerCount(); + } + + public Long getReceivedMessageCount() + { + return _queue.getReceivedMessageCount(); + } + + public Long getMaximumMessageCount() + { + return _queue.getMaximumMessageCount(); + } + + public void setMaximumMessageCount(Long value) + { + _queue.setMaximumMessageCount(value); + } + + /** + * returns the maximum total size of messages(bytes) in the queue. + */ + public Long getMaximumQueueDepth() + { + return _queue.getMaximumQueueDepth(); + } + + public void setMaximumQueueDepth(Long value) + { + _queue.setMaximumQueueDepth(value); + } + + /** + * returns the total size of messages(bytes) in the queue. + */ + public Long getQueueDepth() throws JMException + { + return _queue.getQueueDepth(); + } + + public Long getCapacity() + { + return _queue.getCapacity(); + } + + public void setCapacity(Long capacity) throws IllegalArgumentException + { + if( _queue.getFlowResumeCapacity() > capacity ) + { + throw new IllegalArgumentException("Capacity must not be less than FlowResumeCapacity"); + } + + _queue.setCapacity(capacity); + } + + public Long getFlowResumeCapacity() + { + return _queue.getFlowResumeCapacity(); + } + + public void setFlowResumeCapacity(Long flowResumeCapacity) throws IllegalArgumentException + { + if( _queue.getCapacity() < flowResumeCapacity ) + { + throw new IllegalArgumentException("FlowResumeCapacity must not exceed Capacity"); + } + + _queue.setFlowResumeCapacity(flowResumeCapacity); + } + + public boolean isFlowOverfull() + { + return _queue.isOverfull(); + } + + public boolean isExclusive() + { + return _queue.isExclusive(); + } + + public void setExclusive(boolean exclusive) throws JMException + { + try + { + _queue.setExclusive(exclusive); + } + catch (AMQException e) + { + throw new JMException(e.toString()); + } + } + + /** + * Checks if there is any notification to be send to the listeners + */ + public void checkForNotification(ServerMessage msg) throws AMQException + { + + final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); + + if(!notificationChecks.isEmpty()) + { + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + + for (NotificationCheck check : notificationChecks) + { + if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) + { + if (check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } + } + } + } + + } + + /** + * Sends the notification to the listeners + */ + public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg) + { + // important : add log to the log file - monitoring tools may be looking for this + _logger.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg); + notificationMsg = notification.name() + " " + notificationMsg; + + _lastNotification = + new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, + System.currentTimeMillis(), notificationMsg); + + _broadcaster.sendNotification(_lastNotification); + } + + public Notification getLastNotification() + { + return _lastNotification; + } + + /** + * @see AMQQueue#deleteMessageFromTop + */ + public void deleteMessageFromTop() throws JMException + { + _queue.deleteMessageFromTop(); + } + + /** + * Clears the queue of non-acquired messages + * + * @return the number of messages deleted + * @see AMQQueue#clearQueue + */ + public Long clearQueue() throws JMException + { + try + { + return _queue.clearQueue(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, "Error clearing queue " + _queueName); + } + } + + /** + * returns message content as byte array and related attributes for the given message id. + */ + public CompositeData viewMessageContent(long msgId) throws JMException + { + QueueEntry entry = _queue.getMessageOnTheQueue(msgId); + + if (entry == null) + { + throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); + } + + ServerMessage serverMsg = entry.getMessage(); + final int bodySize = (int) serverMsg.getSize(); + + + List<Byte> msgContent = new ArrayList<Byte>(); + + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize); + int position = 0; + + while(position < bodySize) + { + position += serverMsg.getContent(buf, position); + buf.flip(); + for(int i = 0; i < buf.limit(); i++) + { + msgContent.add(buf.get(i)); + } + buf.clear(); + } + + AMQMessageHeader header = serverMsg.getMessageHeader(); + + String mimeType = null, encoding = null; + if (header != null) + { + mimeType = header.getMimeType(); + + encoding = header.getEncoding(); + } + + + Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + + return new CompositeDataSupport(_msgContentType, + VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.toArray( + new String[VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues); + + } + + /** + * Returns the header contents of the messages stored in this queue in tabular form. + * Deprecated as of Qpid JMX API 1.3 + */ + @Deprecated + public TabularData viewMessages(int beginIndex, int endIndex) throws JMException + { + return viewMessages((long)beginIndex,(long)endIndex); + } + + + /** + * Returns the header contents of the messages stored in this queue in tabular form. + * @param startPosition The queue position of the first message to be viewed + * @param endPosition The queue position of the last message to be viewed + */ + public TabularData viewMessages(long startPosition, long endPosition) throws JMException + { + if ((startPosition > endPosition) || (startPosition < 1)) + { + throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition + + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); + } + + if ((endPosition - startPosition) > Integer.MAX_VALUE) + { + throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size"); + } + + List<QueueEntry> list = _queue.getMessagesRangeOnTheQueue(startPosition,endPosition); + TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); + + try + { + // Create the tabular list of message header contents + int size = list.size(); + + for (int i = 0; i < size ; i++) + { + long position = startPosition + i; + final QueueEntry queueEntry = list.get(i); + ServerMessage serverMsg = queueEntry.getMessage(); + + String[] headerAttributes = null; + Object[] itemValues = null; + + if(serverMsg instanceof AMQMessage) + { + AMQMessage msg = (AMQMessage) serverMsg; + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + headerAttributes = getMessageHeaderProperties(headerBody); + itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position}; + } + else if(serverMsg instanceof MessageTransferMessage) + { + // We have a 0-10 message + MessageTransferMessage msg = (MessageTransferMessage) serverMsg; + + // Create header attributes list + headerAttributes = getMessageTransferMessageHeaderProps(msg); + itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position}; + } + else + { + //unknown message + headerAttributes = new String[]{"N/A"}; + itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position}; + } + + CompositeData messageData = new CompositeDataSupport(_messageDataType, + VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues); + _messageList.put(messageData); + } + } + catch (AMQException e) + { + JMException jme = new JMException("Error creating message contents: " + e); + jme.initCause(e); + throw jme; + } + + return _messageList; + } + + private String[] getMessageHeaderProperties(ContentHeaderBody headerBody) + { + List<String> list = new ArrayList<String>(); + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties(); + list.add("reply-to = " + headerProperties.getReplyToAsString()); + list.add("propertyFlags = " + headerProperties.getPropertyFlags()); + list.add("ApplicationID = " + headerProperties.getAppIdAsString()); + list.add("ClusterID = " + headerProperties.getClusterIdAsString()); + list.add("UserId = " + headerProperties.getUserIdAsString()); + list.add("JMSMessageID = " + headerProperties.getMessageIdAsString()); + list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString()); + + int delMode = headerProperties.getDeliveryMode(); + list.add("JMSDeliveryMode = " + + ((delMode == BasicContentHeaderProperties.PERSISTENT) ? "Persistent" : "Non_Persistent")); + + list.add("JMSPriority = " + headerProperties.getPriority()); + list.add("JMSType = " + headerProperties.getType()); + + long longDate = headerProperties.getExpiration(); + String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSExpiration = " + strDate); + + longDate = headerProperties.getTimestamp(); + strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSTimestamp = " + strDate); + + return list.toArray(new String[list.size()]); + } + + private String[] getMessageTransferMessageHeaderProps(MessageTransferMessage msg) + { + List<String> list = new ArrayList<String>(); + + AMQMessageHeader header = msg.getMessageHeader(); + MessageProperties msgProps = msg.getHeader().get(MessageProperties.class); + + String appID = null; + String userID = null; + + if(msgProps != null) + { + appID = msgProps.getAppId() == null ? "null" : new String(msgProps.getAppId()); + userID = msgProps.getUserId() == null ? "null" : new String(msgProps.getUserId()); + } + + list.add("reply-to = " + header.getReplyTo()); + list.add("propertyFlags = "); //TODO + list.add("ApplicationID = " + appID); + list.add("ClusterID = "); //TODO + list.add("UserId = " + userID); + list.add("JMSMessageID = " + header.getMessageId()); + list.add("JMSCorrelationID = " + header.getCorrelationId()); + list.add("JMSDeliveryMode = " + (msg.isPersistent() ? "Persistent" : "Non_Persistent")); + list.add("JMSPriority = " + header.getPriority()); + list.add("JMSType = " + header.getType()); + + long longDate = header.getExpiration(); + String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSExpiration = " + strDate); + + longDate = header.getTimestamp(); + strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSTimestamp = " + strDate); + + return list.toArray(new String[list.size()]); + } + + /** + * @see ManagedQueue#moveMessages + * @param fromMessageId + * @param toMessageId + * @param toQueueName + * @throws JMException + */ + public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException + { + if ((fromMessageId > toMessageId) || (fromMessageId < 1)) + { + throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); + } + + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); + txn.commit(); + } + + /** + * @see ManagedQueue#deleteMessages + * @param fromMessageId + * @param toMessageId + * @throws JMException + */ + public void deleteMessages(long fromMessageId, long toMessageId) throws JMException + { + if ((fromMessageId > toMessageId) || (fromMessageId < 1)) + { + throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); + } + + _queue.removeMessagesFromQueue(fromMessageId, toMessageId); + } + + /** + * @see ManagedQueue#copyMessages + * @param fromMessageId + * @param toMessageId + * @param toQueueName + * @throws JMException + */ + public void copyMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException + { + if ((fromMessageId > toMessageId) || (fromMessageId < 1)) + { + throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\""); + } + + ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog()); + + _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn); + + txn.commit(); + + + } + + /** + * returns Notifications sent by this MBean. + */ + @Override + public MBeanNotificationInfo[] getNotificationInfo() + { + String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED }; + String name = MonitorNotification.class.getName(); + String description = "Either Message count or Queue depth or Message size has reached threshold high value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); + + return new MBeanNotificationInfo[] { info1 }; + } + +} // End of AMQQueueMBean class |