diff options
4 files changed, 734 insertions, 539 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 1aa62dbfa4..407ad236ea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -40,9 +40,6 @@ import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.QueueRegistry; @@ -50,24 +47,12 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.Notification; -import javax.management.monitor.MonitorNotification; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; import javax.security.sasl.SaslServer; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Date; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; @@ -93,7 +78,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private AMQCodecFactory _codecFactory; - private ManagedAMQProtocolSession _managedObject; + private AMQProtocolSessionMBean _managedObject; private SaslServer _saslServer; @@ -102,11 +87,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private Object _lastSent; private boolean _closed; - + // maximum number of channels this session should have private long _maxNoOfChannels = 1000; /* AMQP Version for this session */ - private byte _major; private byte _minor; @@ -115,190 +99,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _managedObject; } - /** - * This class implements the management interface (is an MBean). In order to - * make more attributes, operations and notifications available over JMX simply - * augment the ManagedConnection interface and add the appropriate implementation here. - */ - @MBeanDescription("Management Bean for an AMQ Broker Connection") - private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection - { - private String _name = null; - //openmbean data types for representing the channel attributes - private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"}; - private String[] _indexNames = {_channelAtttibuteNames[0]}; - private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER}; - private CompositeType _channelType = null; // represents the data type for channel data - private TabularType _channelsType = null; // Data type for list of channels type - private TabularDataSupport _channelsList = null; - - @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public ManagedAMQProtocolSession() throws JMException - { - super(ManagedConnection.class, ManagedConnection.TYPE); - init(); - } - - /** - * initialises the openmbean data types - */ - private void init() throws OpenDataException - { - String remote = getRemoteAddress(); - remote = "anonymous".equals(remote) ? remote + hashCode() : remote; - _name = jmxEncode(new StringBuffer(remote), 0).toString(); - _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, - _channelAtttibuteNames, _channelAttributeTypes); - _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); - } - - public Date getLastIoTime() - { - return new Date(_minaProtocolSession.getLastIoTime()); - } - - public String getRemoteAddress() - { - return _minaProtocolSession.getRemoteAddress().toString(); - } - - public Long getWrittenBytes() - { - return _minaProtocolSession.getWrittenBytes(); - } - - public Long getReadBytes() - { - return _minaProtocolSession.getReadBytes(); - } - - public Long getMaximumNumberOfChannels() - { - return _maxNoOfChannels; - } - - public void setMaximumNumberOfChannels(Long value) - { - _maxNoOfChannels = value; - } - - public String getObjectInstanceName() - { - return _name; - } - - public void commitTransactions(int channelId) throws JMException - { - try - { - AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); - } - if (channel.isTransactional()) - { - channel.commit(); - } - } - catch(AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - public void rollbackTransactions(int channelId) throws JMException - { - try - { - AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); - } - if (channel.isTransactional()) - { - channel.rollback(); - } - } - catch(AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * Creates the list of channels in tabular form from the _channelMap. - * @return list of channels in tabular form. - * @throws OpenDataException - */ - public TabularData channels() throws OpenDataException - { - _channelsList = new TabularDataSupport(_channelsType); - - for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet()) - { - AMQChannel channel = entry.getValue(); - Object[] itemValues = {channel.getChannelId(), channel.isTransactional(), - (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null, - channel.getUnacknowledgedMessageMap().size()}; - - CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues); - _channelsList.put(channelData); - } - - return _channelsList; - } - - public void closeChannel(int id) throws Exception - { - try - { - AMQMinaProtocolSession.this.closeChannel(id); - } - catch (AMQException ex) - { - throw new Exception(ex.toString()); - } - } - - public void closeConnection() throws Exception - { - try - { - AMQMinaProtocolSession.this.closeSession(); - } - catch (AMQException ex) - { - throw new Exception(ex.toString()); - } - } - - @Override - public MBeanNotificationInfo[] getNotificationInfo() - { - String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; - String name = MonitorNotification.class.getName(); - String description = "Channel count has reached threshold value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); - - return new MBeanNotificationInfo[] {info1}; - } - - private void checkForNotification() - { - int channelsCount = _channelMap.size(); - if (channelsCount >= getMaximumNumberOfChannels()) - { - Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), - "Channel count (" + channelsCount + ") has reached the threshold value"); - - _broadcaster.sendNotification(n); - } - } - - } // End of MBean class public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) @@ -322,11 +122,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _managedObject.register(); } - private ManagedAMQProtocolSession createMBean() throws AMQException + private AMQProtocolSessionMBean createMBean() throws AMQException { try { - return new ManagedAMQProtocolSession(); + return new AMQProtocolSessionMBean(this); } catch(JMException ex) { @@ -335,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } + public IoSession getIOSession() + { + return _minaProtocolSession; + } + public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession) { return (AMQProtocolSession) minaProtocolSession.getAttachment(); @@ -495,7 +300,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _contextKey = contextKey; } - public AMQChannel getChannel(int channelId) throws AMQException + public List<AMQChannel> getChannels() + { + return new ArrayList<AMQChannel>(_channelMap.values()); + } + + public AMQChannel getChannel(int channelId) { return _channelMap.get(channelId); } @@ -503,7 +313,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void addChannel(AMQChannel channel) { _channelMap.put(channel.getChannelId(), channel); - _managedObject.checkForNotification(); + checkForNotification(); + } + + private void checkForNotification() + { + int channelsCount = _channelMap.size(); + if (channelsCount >= _maxNoOfChannels) + { + _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value"); + } + } + + public Long getMaximumNumberOfChannels() + { + return _maxNoOfChannels; + } + + public void setMaximumNumberOfChannels(Long value) + { + _maxNoOfChannels = value; + } + + public void commitTransactions(AMQChannel channel) throws AMQException + { + if (channel != null && channel.isTransactional()) + { + channel.commit(); + } + } + + public void rollbackTransactions(AMQChannel channel) throws AMQException + { + if (channel != null && channel.isTransactional()) + { + channel.rollback(); + } } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java new file mode 100644 index 0000000000..b56d22d655 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -0,0 +1,240 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; +import javax.management.monitor.MonitorNotification; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.util.Date; +import java.util.List; + +/** + * This MBean class implements the management interface. In order to make more attributes, operations and notifications + * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here. + */ +@MBeanDescription("Management Bean for an AMQ Broker Connection") +public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection +{ + private AMQMinaProtocolSession _session = null; + private String _name = null; + //openmbean data types for representing the channel attributes + private String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"}; + private String[] _indexNames = {_channelAtttibuteNames[0]}; + private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER}; + private CompositeType _channelType = null; // represents the data type for channel data + private TabularType _channelsType = null; // Data type for list of channels type + + @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") + public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException + { + super(ManagedConnection.class, ManagedConnection.TYPE); + _session = session; + init(); + } + + /** + * initialises the openmbean data types + */ + private void init() throws OpenDataException + { + String remote = getRemoteAddress(); + remote = "anonymous".equals(remote) ? remote + hashCode() : remote; + _name = jmxEncode(new StringBuffer(remote), 0).toString(); + _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, + _channelAtttibuteNames, _channelAttributeTypes); + _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); + } + + public Date getLastIoTime() + { + return new Date(_session.getIOSession().getLastIoTime()); + } + + public String getRemoteAddress() + { + return _session.getIOSession().getRemoteAddress().toString(); + } + + public Long getWrittenBytes() + { + return _session.getIOSession().getWrittenBytes(); + } + + public Long getReadBytes() + { + return _session.getIOSession().getReadBytes(); + } + + public Long getMaximumNumberOfChannels() + { + return _session.getMaximumNumberOfChannels(); + } + + public void setMaximumNumberOfChannels(Long value) + { + _session.setMaximumNumberOfChannels(value); + } + + public String getObjectInstanceName() + { + return _name; + } + + /** + * commits transactions for a transactional channel + * + * @param channelId + * @throws JMException if channel with given id doesn't exist or if commit fails + */ + public void commitTransactions(int channelId) throws JMException + { + try + { + AMQChannel channel = _session.getChannel(channelId); + if (channel == null) + { + throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); + } + _session.commitTransactions(channel); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * rollsback the transactions for a transactional channel + * + * @param channelId + * @throws JMException if channel with given id doesn't exist or if rollback fails + */ + public void rollbackTransactions(int channelId) throws JMException + { + try + { + AMQChannel channel = _session.getChannel(channelId); + if (channel == null) + { + throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); + } + _session.rollbackTransactions(channel); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * Creates the list of channels in tabular form from the _channelMap. + * + * @return list of channels in tabular form. + * @throws OpenDataException + */ + public TabularData channels() throws OpenDataException + { + TabularDataSupport channelsList = new TabularDataSupport(_channelsType); + List<AMQChannel> list = _session.getChannels(); + + for (AMQChannel channel : list) + { + Object[] itemValues = {channel.getChannelId(), channel.isTransactional(), + (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null, + channel.getUnacknowledgedMessageMap().size()}; + + CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues); + channelsList.put(channelData); + } + + return channelsList; + } + + /** + * @see AMQMinaProtocolSession#closeChannel(int) + */ + public void closeChannel(int id) throws JMException + { + AMQChannel channel = _session.getChannel(id); + if (channel == null) + { + throw new JMException("The channel (channel Id = " + id + ") does not exist"); + } + try + { + _session.closeChannel(id); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * closes the connection. The administrator can use this management operation to close connection to free up + * resources. + * @throws JMException + */ + public void closeConnection() throws JMException + { + try + { + _session.closeSession(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + @Override + public MBeanNotificationInfo[] getNotificationInfo() + { + String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String name = MonitorNotification.class.getName(); + String description = "Channel count has reached threshold value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); + + return new MBeanNotificationInfo[]{info1}; + } + + public void notifyClients(String notificationMsg) + { + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + _broadcaster.sendNotification(n); + } + +} // End of MBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 353a2007c0..f2ef97cf9a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,15 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -37,13 +30,7 @@ import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MBeanNotificationInfo; -import javax.management.Notification; -import javax.management.monitor.MonitorNotification; -import javax.management.openmbean.*; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; @@ -108,7 +95,7 @@ public class AMQQueue implements Managable, Comparable * max allowed number of messages on a queue. */ private Integer _maxMessageCount = 10000; - + /** * max queue depth(KB) for the queue */ @@ -124,322 +111,6 @@ public class AMQQueue implements Managable, Comparable return _name.compareTo(((AMQQueue) o).getName()); } - /** - * MBean class for AMQQueue. It implements all the management features exposed - * for an AMQQueue. - */ - @MBeanDescription("Management Interface for AMQQueue") - private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue - { - private String _queueName = null; - // OpenMBean data types for viewMessages method - private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"}; - private String[] _msgAttributeIndex = {_msgAttributeNames[0]}; - private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. - private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. - private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - - // OpenMBean data types for viewMessageContent method - private CompositeType _msgContentType = null; - private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"}; - private OpenType[] _msgContentAttributeTypes = new OpenType[4]; - - @MBeanConstructor("Creates an MBean exposing an AMQQueue") - public AMQQueueMBean() throws JMException - { - super(ManagedQueue.class, ManagedQueue.TYPE); - init(); - } - - /** - * initialises the openmbean data types - */ - private void init() throws OpenDataException - { - _queueName = jmxEncode(new StringBuffer(_name), 0).toString(); - _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id - _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType - _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding - _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content - _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, - _msgContentAttributes, _msgContentAttributeTypes); - - _msgAttributeTypes[0] = SimpleType.LONG; // For message id - _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes - _msgAttributeTypes[2] = SimpleType.LONG; // For size - _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered - - _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); - _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); - } - - public String getObjectInstanceName() - { - return _queueName; - } - - public String getName() - { - return _name; - } - - public boolean isDurable() - { - return _durable; - } - - public String getOwner() - { - return _owner; - } - - public boolean isAutoDelete() - { - return _autoDelete; - } - - public Integer getMessageCount() - { - return _deliveryMgr.getQueueMessageCount(); - } - - public Long getMaximumMessageSize() - { - return _maxMessageSize; - } - - public void setMaximumMessageSize(Long value) - { - _maxMessageSize = value; - } - - public Integer getConsumerCount() - { - return _subscribers.size(); - } - - public Integer getActiveConsumerCount() - { - return _subscribers.getWeight(); - } - - public Long getReceivedMessageCount() - { - return _totalMessagesReceived; - } - - public Integer getMaximumMessageCount() - { - return _maxMessageCount; - } - - public void setMaximumMessageCount(Integer value) - { - _maxMessageCount = value; - } - - public Long getMaximumQueueDepth() - { - return _maxQueueDepth; - } - - // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(Long value) - { - _maxQueueDepth = value; - } - - /** - * returns the size of messages(KB) in the queue. - */ - public Long getQueueDepth() - { - List<AMQMessage> list = _deliveryMgr.getMessages(); - if (list.size() == 0) - { - return 0l; - } - - long queueDepth = 0; - for (AMQMessage message : list) - { - queueDepth = queueDepth + getMessageSize(message); - } - return (long)Math.round(queueDepth / 1000); - } - - /** - * returns size of message in bytes - */ - private long getMessageSize(AMQMessage msg) - { - if (msg == null) - { - return 0l; - } - - return msg.getContentHeaderBody().bodySize; - } - - /** - * Checks if there is any notification to be send to the listeners - */ - private void checkForNotification(AMQMessage msg) - { - // Check for threshold message count - Integer msgCount = getMessageCount(); - if (msgCount >= getMaximumMessageCount()) - { - notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); - } - - // Check for threshold message size - long messageSize = getMessageSize(msg); - if (messageSize >= _maxMessageSize) - { - notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); - } - - // Check for threshold queue depth in bytes - long queueDepth = getQueueDepth(); - if (queueDepth >= _maxQueueDepth) - { - notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); - } - } - - /** - * Sends the notification to the listeners - */ - private void notifyClients(String notificationMsg) - { - Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); - - _broadcaster.sendNotification(n); - } - - public void deleteMessageFromTop() throws JMException - { - try - { - _deliveryMgr.removeAMessageFromTop(); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - public void clearQueue() throws JMException - { - try - { - _deliveryMgr.clearAllMessages(); - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * returns message content as byte array and related attributes for the given message id. - */ - public CompositeData viewMessageContent(long msgId) throws JMException - { - List<AMQMessage> list = _deliveryMgr.getMessages(); - AMQMessage msg = null; - for (AMQMessage message : list) - { - if (message.getMessageId() == msgId) - { - msg = message; - break; - } - } - - if (msg == null) - { - throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName ); - } - // get message content - List<ContentBody> cBodies = msg.getContentBodies(); - List<Byte> msgContent = new ArrayList<Byte>(); - for (ContentBody body : cBodies) - { - if (body.getSize() != 0) - { - ByteBuffer slice = body.payload.slice(); - for (int j = 0; j < slice.limit(); j++) - { - msgContent.add(slice.get()); - } - } - } - - // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties; - String mimeType = headerProperties.getContentType(); - String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); - Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; - - return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); - } - - /** - * Returns the header contents of the messages stored in this queue in tabular form. - */ - public TabularData viewMessages(int beginIndex, int endIndex) throws JMException - { - if ((beginIndex > endIndex) || (beginIndex < 1)) - { - throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex + - "\nFrom Index should be greater than 0 and less than To Index"); - } - - List<AMQMessage> list = _deliveryMgr.getMessages(); - TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); - - // Create the tabular list of message header contents - for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) - { - AMQMessage msg = list.get(i - 1); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties; - List<String> headerAttribsList = new ArrayList<String>(); - headerAttribsList.add("App Id=" + headerProperties.getAppId()); - headerAttribsList.add("MimeType=" + headerProperties.getContentType()); - headerAttribsList.add("Correlation Id=" + headerProperties.getCorrelationId()); - headerAttribsList.add("Encoding=" + headerProperties.getEncoding()); - headerAttribsList.add(headerProperties.toString()); - - Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]), - headerBody.bodySize, msg.isRedelivered()}; - CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); - _messageList.put(messageData); - } - - return _messageList; - } - - /** - * returns Notifications sent by this MBean. - */ - @Override - public MBeanNotificationInfo[] getNotificationInfo() - { - String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; - String name = MonitorNotification.class.getName(); - String description = "Either Message count or Queue depth or Message size has reached threshold high value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); - - return new MBeanNotificationInfo[]{info1}; - } - - } // End of AMQMBean class - public AMQQueue(String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry) throws AMQException @@ -533,7 +204,7 @@ public class AMQQueue implements Managable, Comparable { try { - return new AMQQueueMBean(); + return new AMQQueueMBean(this); } catch (JMException ex) { @@ -566,16 +237,112 @@ public class AMQQueue implements Managable, Comparable return _autoDelete; } + /** + * @return no of messages(undelivered) on the queue. + */ public int getMessageCount() { return _deliveryMgr.getQueueMessageCount(); } + /** + * @return List of messages(undelivered) on the queue. + */ + public List<AMQMessage> getMessagesOnTheQueue() + { + return _deliveryMgr.getMessages(); + } + + /** + * @param messageId + * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. + */ + public AMQMessage getMessageOnTheQueue(long messageId) + { + List<AMQMessage> list = getMessagesOnTheQueue(); + AMQMessage msg = null; + for (AMQMessage message : list) + { + if (message.getMessageId() == messageId) + { + msg = message; + break; + } + } + + return msg; + } + + /** + * @return MBean object associated with this Queue + */ public ManagedObject getManagedObject() { return _managedObject; } + public Long getMaximumMessageSize() + { + return _maxMessageSize; + } + + public void setMaximumMessageSize(Long value) + { + _maxMessageSize = value; + } + + public Integer getConsumerCount() + { + return _subscribers.size(); + } + + public Integer getActiveConsumerCount() + { + return _subscribers.getWeight(); + } + + public Long getReceivedMessageCount() + { + return _totalMessagesReceived; + } + + public Integer getMaximumMessageCount() + { + return _maxMessageCount; + } + + public void setMaximumMessageCount(Integer value) + { + _maxMessageCount = value; + } + + public Long getMaximumQueueDepth() + { + return _maxQueueDepth; + } + + // Sets the queue depth, the max queue size + public void setMaximumQueueDepth(Long value) + { + _maxQueueDepth = value; + } + + /** + * Removes the AMQMessage from the top of the queue. + */ + public void deleteMessageFromTop() throws AMQException + { + _deliveryMgr.removeAMessageFromTop(); + } + + /** + * removes all the messages from the queue. + */ + public void clearQueue() throws AMQException + { + _deliveryMgr.clearAllMessages(); + } + public void bind(String routingKey, Exchange exchange) { _bindings.addBinding(routingKey, exchange); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java new file mode 100644 index 0000000000..54dd366d71 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -0,0 +1,343 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.mina.common.ByteBuffer; + +import javax.management.openmbean.*; +import javax.management.JMException; +import javax.management.Notification; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.OperationsException; +import javax.management.monitor.MonitorNotification; +import java.util.List; +import java.util.ArrayList; + +/** + * MBean class for AMQQueue. It implements all the management features exposed + * for an AMQQueue. + */ +@MBeanDescription("Management Interface for AMQQueue") +public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue +{ + private AMQQueue _queue = null; + private String _queueName = null; + // OpenMBean data types for viewMessages method + private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"}; + private String[] _msgAttributeIndex = {_msgAttributeNames[0]}; + private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. + private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. + private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. + + // OpenMBean data types for viewMessageContent method + private CompositeType _msgContentType = null; + private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"}; + private OpenType[] _msgContentAttributeTypes = new OpenType[4]; + + @MBeanConstructor("Creates an MBean exposing an AMQQueue") + public AMQQueueMBean(AMQQueue queue) throws JMException + { + super(ManagedQueue.class, ManagedQueue.TYPE); + _queue = queue; + _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString(); + init(); + } + + /** + * initialises the openmbean data types + */ + private void init() throws OpenDataException + { + _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id + _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType + _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding + _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content + _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, + _msgContentAttributes, _msgContentAttributeTypes); + + _msgAttributeTypes[0] = SimpleType.LONG; // For message id + _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes + _msgAttributeTypes[2] = SimpleType.LONG; // For size + _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered + + _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); + _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); + } + + public String getObjectInstanceName() + { + return _queueName; + } + + public String getName() + { + return _queueName; + } + + public boolean isDurable() + { + return _queue.isDurable(); + } + + public String getOwner() + { + return _queue.getOwner(); + } + + public boolean isAutoDelete() + { + return _queue.isAutoDelete(); + } + + public Integer getMessageCount() + { + return _queue.getMessageCount(); + } + + public Long getMaximumMessageSize() + { + return _queue.getMaximumMessageSize(); + } + + public void setMaximumMessageSize(Long value) + { + _queue.setMaximumMessageSize(value); + } + + public Integer getConsumerCount() + { + return _queue.getConsumerCount(); + } + + public Integer getActiveConsumerCount() + { + return _queue.getActiveConsumerCount(); + } + + public Long getReceivedMessageCount() + { + return _queue.getReceivedMessageCount(); + } + + public Integer getMaximumMessageCount() + { + return _queue.getMaximumMessageCount(); + } + + public void setMaximumMessageCount(Integer value) + { + _queue.setMaximumMessageCount(value); + } + + public Long getMaximumQueueDepth() + { + return _queue.getMaximumQueueDepth(); + } + + public void setMaximumQueueDepth(Long value) + { + _queue.setMaximumQueueDepth(value); + } + + /** + * returns the size of messages(KB) in the queue. + */ + public Long getQueueDepth() + { + List<AMQMessage> list = _queue.getMessagesOnTheQueue(); + if (list.size() == 0) + { + return 0l; + } + + long queueDepth = 0; + for (AMQMessage message : list) + { + queueDepth = queueDepth + getMessageSize(message); + } + return (long) Math.round(queueDepth / 1000); + } + + /** + * returns size of message in bytes + */ + private long getMessageSize(AMQMessage msg) + { + if (msg == null) + { + return 0l; + } + + return msg.getContentHeaderBody().bodySize; + } + + /** + * Checks if there is any notification to be send to the listeners + */ + public void checkForNotification(AMQMessage msg) + { + // Check for threshold message count + Integer msgCount = getMessageCount(); + if (msgCount >= getMaximumMessageCount()) + { + notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); + } + + // Check for threshold message size + long messageSize = getMessageSize(msg); + if (messageSize >= _queue.getMaximumMessageSize()) + { + notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); + } + + // Check for threshold queue depth in bytes + long queueDepth = getQueueDepth(); + if (queueDepth >= _queue.getMaximumQueueDepth()) + { + notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); + } + } + + /** + * Sends the notification to the listeners + */ + private void notifyClients(String notificationMsg) + { + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + + _broadcaster.sendNotification(n); + } + + /** + * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop() + */ + public void deleteMessageFromTop() throws JMException + { + try + { + _queue.deleteMessageFromTop(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * @see org.apache.qpid.server.queue.AMQQueue#clearQueue() + */ + public void clearQueue() throws JMException + { + try + { + _queue.clearQueue(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + /** + * returns message content as byte array and related attributes for the given message id. + */ + public CompositeData viewMessageContent(long msgId) throws JMException + { + AMQMessage msg = _queue.getMessageOnTheQueue(msgId); + if (msg == null) + { + throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); + } + // get message content + List<ContentBody> cBodies = msg.getContentBodies(); + List<Byte> msgContent = new ArrayList<Byte>(); + for (ContentBody body : cBodies) + { + if (body.getSize() != 0) + { + ByteBuffer slice = body.payload.slice(); + for (int j = 0; j < slice.limit(); j++) + { + msgContent.add(slice.get()); + } + } + } + + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; + String mimeType = headerProperties.getContentType(); + String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); + Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; + + return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); + } + + /** + * Returns the header contents of the messages stored in this queue in tabular form. + */ + public TabularData viewMessages(int beginIndex, int endIndex) throws JMException + { + if ((beginIndex > endIndex) || (beginIndex < 1)) + { + throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex + + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); + } + + List<AMQMessage> list = _queue.getMessagesOnTheQueue(); + TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); + + // Create the tabular list of message header contents + for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) + { + AMQMessage msg = list.get(i - 1); + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + String[] headerAttributes = headerProperties.toString().split(","); + Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; + CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); + _messageList.put(messageData); + } + + return _messageList; + } + + /** + * returns Notifications sent by this MBean. + */ + @Override + public MBeanNotificationInfo[] getNotificationInfo() + { + String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String name = MonitorNotification.class.getName(); + String description = "Either Message count or Queue depth or Message size has reached threshold high value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); + + return new MBeanNotificationInfo[]{info1}; + } + +} // End of AMQMBean class |