diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index a914975e00..f5ecf6ba55 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.mina.common.ByteBuffer; +import org.apache.log4j.Logger; import javax.management.openmbean.*; import javax.management.JMException; @@ -41,8 +42,11 @@ import java.util.ArrayList; * for an AMQQueue. */ @MBeanDescription("Management Interface for AMQQueue") -public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue +public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { + + private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); + private AMQQueue _queue = null; private String _queueName = null; // OpenMBean data types for viewMessages method @@ -51,12 +55,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - + // OpenMBean data types for viewMessageContent method private static CompositeType _msgContentType = null; private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"}; private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; - + + private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { @@ -71,7 +77,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue { init(); } - catch(JMException ex) + catch (JMException ex) { // It should never occur System.out.println(ex.getMessage()); @@ -88,7 +94,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, - _msgContentAttributes, _msgContentAttributeTypes); + _msgContentAttributes, _msgContentAttributeTypes); _msgAttributeTypes[0] = SimpleType.LONG; // For message id _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes @@ -215,35 +221,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue */ public void checkForNotification(AMQMessage msg) { - // Check for threshold message count - Integer msgCount = getMessageCount(); - if (msgCount >= getMaximumMessageCount()) - { - notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); - } + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); - // Check for threshold message size - long messageSize = getMessageSize(msg); - if (messageSize >= _queue.getMaximumMessageSize()) + for (NotificationCheck check : NotificationCheck.values()) { - notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); - } - - // Check for threshold queue depth in bytes - long queueDepth = getQueueDepth(); - if (queueDepth >= _queue.getMaximumQueueDepth()) - { - notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); + if (check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()] < thresholdTime) + { + if (check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } + } } } /** * Sends the notification to the listeners */ - private void notifyClients(String notificationMsg) + public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg) { + // important : add log to the log file - monitoring tools may be looking for this + _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg); + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(n); } |