summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2006-10-17 11:10:39 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2006-10-17 11:10:39 +0000
commitea7e5c6c5de0b50b190f308db935f2132596c533 (patch)
treee9063d2f1e044e35be3f90c604faff5eb55bcb5f
parentecc1d550abfb52989f17aa7a09bdf9a8936c3fb4 (diff)
downloadqpid-python-ea7e5c6c5de0b50b190f308db935f2132596c533.tar.gz
AMQQueueMBean - management notifications will be sent to the listening clients if the queue size increases the threshold value or a message with size higher than threshold value is received.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464900 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQQueue.java87
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java20
2 files changed, 85 insertions, 22 deletions
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
index 886846c0a5..6c795590f6 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
@@ -95,19 +95,19 @@ public class AMQQueue implements Managable
private final AMQQueueMBean _managedObject;
/**
- * max allowed size of a single message.
+ * max allowed size of a single message(in KBytes).
*/
- private long _maxAllowedMessageSize = 0;
+ private long _maxAllowedMessageSize = 10000; // 10 MB
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxAllowedMessageCount = 0;
+ private Integer _maxAllowedMessageCount = 10000;
/**
- * max allowed size in bytes for all the messages combined together in a queue.
+ * max allowed size in KBytes for all the messages combined together in a queue.
*/
- private long _queueDepth = 0;
+ private long _queueDepth = 10000000; // 10 GB
/**
* total messages received by the queue since startup.
@@ -266,26 +266,81 @@ public class AMQQueue implements Managable
return _queueDepth;
}
+ // Sets the queue depth, the max queue size
public void setQueueDepth(Long value)
{
_queueDepth = value;
}
+ // Returns the size of messages in the queue
+ public Long getQueueSize()
+ {
+ List<AMQMessage> list = _deliveryMgr.getMessages();
+ if (list.size() == 0)
+ return 0l;
+
+ long queueSize = 0;
+ for (AMQMessage message : list)
+ {
+ queueSize = queueSize + getMessageSize(message);
+ }
+ return new Long(Math.round(queueSize/100));
+ }
// Operations
- private void checkForNotification()
+ // calculates the size of an AMQMessage
+ private long getMessageSize(AMQMessage msg)
{
- if (getMessageCount() >= getMaximumMessageCount())
+ if (msg == null)
+ return 0l;
+
+ List<ContentBody> cBodies = msg.getContentBodies();
+ long messageSize = 0;
+ for (ContentBody body : cBodies)
{
- Notification n = new Notification(
+ if (body != null)
+ messageSize = messageSize + body.getSize();
+ }
+ return messageSize;
+ }
+
+ // Checks if there is any notification to be send to the listeners
+ private void checkForNotification(AMQMessage msg)
+ {
+ // Check for message count
+ Integer msgCount = getMessageCount();
+ if (msgCount >= getMaximumMessageCount())
+ {
+ notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full.");
+ }
+
+ // Check for received message size
+ long messageSize = getMessageSize(msg);
+ if (messageSize >= getMaximumMessageSize())
+ {
+ notifyClients("MessageSize = " + messageSize + ", Message size (MessageID="+ msg.getMessageId() +
+ ")is higher than the threshold value");
+ }
+
+ // Check for queue size in bytes
+ long queueSize = getQueueSize();
+ if (queueSize >= getQueueDepth())
+ {
+ notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value");
+ }
+ }
+
+ // Send the notification to the listeners
+ private void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(
MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
this,
++_notificationSequenceNumber,
System.currentTimeMillis(),
- "MessageCount = " + getMessageCount() + ", Queue has reached its size limit and is now full.");
+ notificationMsg);
_broadcaster.sendNotification(n);
- }
}
public void deleteMessageFromTop() throws JMException
@@ -634,10 +689,11 @@ public class AMQQueue implements Managable
private void process(AMQMessage msg) throws FailedDequeueException
{
_deliveryMgr.deliver(getName(), msg);
+ updateReceivedMessageCount(msg);
try
{
msg.checkDeliveredToConsumer();
- updateReceivedMessageCount();
+ updateReceivedMessageCount(msg);
}
catch(NoConsumersException e)
{
@@ -645,14 +701,13 @@ public class AMQQueue implements Managable
// from the queue:
dequeue(msg);
}
-
}
void dequeue(AMQMessage msg) throws FailedDequeueException
{
try
{
- msg.decrementReference();
+ msg.decrementReference();
msg.dequeue(this);
}
catch(AMQException e)
@@ -660,7 +715,7 @@ public class AMQQueue implements Managable
throw new FailedDequeueException(_name, e);
}
}
-
+
public void deliverAsync()
{
_deliveryMgr.processAsync(_asyncDelivery);
@@ -671,10 +726,10 @@ public class AMQQueue implements Managable
return _subscribers;
}
- protected void updateReceivedMessageCount()
+ protected void updateReceivedMessageCount(AMQMessage msg)
{
_totalMessagesReceived++;
- _managedObject.checkForNotification();
+ _managedObject.checkForNotification(msg);
}
public boolean equals(Object o)
diff --git a/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
index 9732fc4c17..109b3c0c9c 100644
--- a/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
+++ b/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
@@ -78,7 +78,7 @@ public interface ManagedQueue
Integer getMessageCount() throws IOException;
/**
- * Returns the maximum size of a message (in bytes) allowed to be accepted by the
+ * Returns the maximum size of a message (in kbytes) allowed to be accepted by the
* ManagedQueue. This is useful in setting notifications or taking
* appropriate action, if the size of the message received is more than
* the allowed size.
@@ -89,14 +89,14 @@ public interface ManagedQueue
Long getMaximumMessageSize() throws IOException;
/**
- * Sets the maximum size of the message (in bytes) that is allowed to be
+ * Sets the maximum size of the message (in kbytes) that is allowed to be
* accepted by the Queue.
- * @param bytes maximum size of message.
+ * @param size maximum size of message.
* @throws IOException
*/
@MBeanAttribute(name="MaximumMessageSize",
- description="Maximum size of a message in bytes allowed for this Queue")
- void setMaximumMessageSize(Long bytes) throws IOException;
+ description="Maximum size(KB) of a message allowed for this Queue")
+ void setMaximumMessageSize(Long size) throws IOException;
/**
* Returns the total number of subscribers to the queue.
@@ -142,6 +142,14 @@ public interface ManagedQueue
void setMaximumMessageCount(Integer value) throws IOException;
/**
+ * Size of messages in the queue
+ * @return
+ * @throws IOException
+ */
+ @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
+ Long getQueueSize() throws IOException;
+
+ /**
* Tells the maximum size of all the messages combined together,
* that can be stored in the queue. This is useful for setting notifications
* or taking required action if the size of messages stored in the queue
@@ -158,7 +166,7 @@ public interface ManagedQueue
* @throws IOException
*/
@MBeanAttribute(name="QueueDepth",
- description="The size of all the messages together, that can be stored in the queue")
+ description="The size(KB) of all the messages together, that can be stored in the queue")
void setQueueDepth(Long value) throws IOException;