summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java50
1 files changed, 26 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index a914975e00..f5ecf6ba55 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
import javax.management.openmbean.*;
import javax.management.JMException;
@@ -41,8 +42,11 @@ import java.util.ArrayList;
* for an AMQQueue.
*/
@MBeanDescription("Management Interface for AMQQueue")
-public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+
+ private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
@@ -51,12 +55,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
+
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
+
+ private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -71,7 +77,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
{
init();
}
- catch(JMException ex)
+ catch (JMException ex)
{
// It should never occur
System.out.println(ex.getMessage());
@@ -88,7 +94,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
_msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
+ _msgContentAttributes, _msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
@@ -215,35 +221,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
*/
public void checkForNotification(AMQMessage msg)
{
- // Check for threshold message count
- Integer msgCount = getMessageCount();
- if (msgCount >= getMaximumMessageCount())
- {
- notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
- }
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
- // Check for threshold message size
- long messageSize = getMessageSize(msg);
- if (messageSize >= _queue.getMaximumMessageSize())
+ for (NotificationCheck check : NotificationCheck.values())
{
- notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
- }
-
- // Check for threshold queue depth in bytes
- long queueDepth = getQueueDepth();
- if (queueDepth >= _queue.getMaximumQueueDepth())
- {
- notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+ if (check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()] < thresholdTime)
+ {
+ if (check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
+ }
}
}
/**
* Sends the notification to the listeners
*/
- private void notifyClients(String notificationMsg)
+ public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
+ // important : add log to the log file - monitoring tools may be looking for this
+ _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+
Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}