diff options
author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2006-10-17 11:10:39 +0000 |
---|---|---|
committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2006-10-17 11:10:39 +0000 |
commit | ea7e5c6c5de0b50b190f308db935f2132596c533 (patch) | |
tree | e9063d2f1e044e35be3f90c604faff5eb55bcb5f | |
parent | ecc1d550abfb52989f17aa7a09bdf9a8936c3fb4 (diff) | |
download | qpid-python-ea7e5c6c5de0b50b190f308db935f2132596c533.tar.gz |
AMQQueueMBean - management notifications will be sent to the listening clients if the queue size increases the threshold value or a message with size higher than threshold value is received.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464900 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/org/apache/qpid/server/queue/AMQQueue.java | 87 | ||||
-rw-r--r-- | java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java | 20 |
2 files changed, 85 insertions, 22 deletions
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java index 886846c0a5..6c795590f6 100644 --- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java @@ -95,19 +95,19 @@ public class AMQQueue implements Managable private final AMQQueueMBean _managedObject; /** - * max allowed size of a single message. + * max allowed size of a single message(in KBytes). */ - private long _maxAllowedMessageSize = 0; + private long _maxAllowedMessageSize = 10000; // 10 MB /** * max allowed number of messages on a queue. */ - private Integer _maxAllowedMessageCount = 0; + private Integer _maxAllowedMessageCount = 10000; /** - * max allowed size in bytes for all the messages combined together in a queue. + * max allowed size in KBytes for all the messages combined together in a queue. */ - private long _queueDepth = 0; + private long _queueDepth = 10000000; // 10 GB /** * total messages received by the queue since startup. @@ -266,26 +266,81 @@ public class AMQQueue implements Managable return _queueDepth; } + // Sets the queue depth, the max queue size public void setQueueDepth(Long value) { _queueDepth = value; } + // Returns the size of messages in the queue + public Long getQueueSize() + { + List<AMQMessage> list = _deliveryMgr.getMessages(); + if (list.size() == 0) + return 0l; + + long queueSize = 0; + for (AMQMessage message : list) + { + queueSize = queueSize + getMessageSize(message); + } + return new Long(Math.round(queueSize/100)); + } // Operations - private void checkForNotification() + // calculates the size of an AMQMessage + private long getMessageSize(AMQMessage msg) { - if (getMessageCount() >= getMaximumMessageCount()) + if (msg == null) + return 0l; + + List<ContentBody> cBodies = msg.getContentBodies(); + long messageSize = 0; + for (ContentBody body : cBodies) { - Notification n = new Notification( + if (body != null) + messageSize = messageSize + body.getSize(); + } + return messageSize; + } + + // Checks if there is any notification to be send to the listeners + private void checkForNotification(AMQMessage msg) + { + // Check for message count + Integer msgCount = getMessageCount(); + if (msgCount >= getMaximumMessageCount()) + { + notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full."); + } + + // Check for received message size + long messageSize = getMessageSize(msg); + if (messageSize >= getMaximumMessageSize()) + { + notifyClients("MessageSize = " + messageSize + ", Message size (MessageID="+ msg.getMessageId() + + ")is higher than the threshold value"); + } + + // Check for queue size in bytes + long queueSize = getQueueSize(); + if (queueSize >= getQueueDepth()) + { + notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value"); + } + } + + // Send the notification to the listeners + private void notifyClients(String notificationMsg) + { + Notification n = new Notification( MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, System.currentTimeMillis(), - "MessageCount = " + getMessageCount() + ", Queue has reached its size limit and is now full."); + notificationMsg); _broadcaster.sendNotification(n); - } } public void deleteMessageFromTop() throws JMException @@ -634,10 +689,11 @@ public class AMQQueue implements Managable private void process(AMQMessage msg) throws FailedDequeueException { _deliveryMgr.deliver(getName(), msg); + updateReceivedMessageCount(msg); try { msg.checkDeliveredToConsumer(); - updateReceivedMessageCount(); + updateReceivedMessageCount(msg); } catch(NoConsumersException e) { @@ -645,14 +701,13 @@ public class AMQQueue implements Managable // from the queue: dequeue(msg); } - } void dequeue(AMQMessage msg) throws FailedDequeueException { try { - msg.decrementReference(); + msg.decrementReference(); msg.dequeue(this); } catch(AMQException e) @@ -660,7 +715,7 @@ public class AMQQueue implements Managable throw new FailedDequeueException(_name, e); } } - + public void deliverAsync() { _deliveryMgr.processAsync(_asyncDelivery); @@ -671,10 +726,10 @@ public class AMQQueue implements Managable return _subscribers; } - protected void updateReceivedMessageCount() + protected void updateReceivedMessageCount(AMQMessage msg) { _totalMessagesReceived++; - _managedObject.checkForNotification(); + _managedObject.checkForNotification(msg); } public boolean equals(Object o) diff --git a/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java index 9732fc4c17..109b3c0c9c 100644 --- a/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java +++ b/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java @@ -78,7 +78,7 @@ public interface ManagedQueue Integer getMessageCount() throws IOException; /** - * Returns the maximum size of a message (in bytes) allowed to be accepted by the + * Returns the maximum size of a message (in kbytes) allowed to be accepted by the * ManagedQueue. This is useful in setting notifications or taking * appropriate action, if the size of the message received is more than * the allowed size. @@ -89,14 +89,14 @@ public interface ManagedQueue Long getMaximumMessageSize() throws IOException; /** - * Sets the maximum size of the message (in bytes) that is allowed to be + * Sets the maximum size of the message (in kbytes) that is allowed to be * accepted by the Queue. - * @param bytes maximum size of message. + * @param size maximum size of message. * @throws IOException */ @MBeanAttribute(name="MaximumMessageSize", - description="Maximum size of a message in bytes allowed for this Queue") - void setMaximumMessageSize(Long bytes) throws IOException; + description="Maximum size(KB) of a message allowed for this Queue") + void setMaximumMessageSize(Long size) throws IOException; /** * Returns the total number of subscribers to the queue. @@ -142,6 +142,14 @@ public interface ManagedQueue void setMaximumMessageCount(Integer value) throws IOException; /** + * Size of messages in the queue + * @return + * @throws IOException + */ + @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue") + Long getQueueSize() throws IOException; + + /** * Tells the maximum size of all the messages combined together, * that can be stored in the queue. This is useful for setting notifications * or taking required action if the size of messages stored in the queue @@ -158,7 +166,7 @@ public interface ManagedQueue * @throws IOException */ @MBeanAttribute(name="QueueDepth", - description="The size of all the messages together, that can be stored in the queue") + description="The size(KB) of all the messages together, that can be stored in the queue") void setQueueDepth(Long value) throws IOException; |