summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-02 13:26:51 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-02 13:26:51 +0000
commit409f1c667a0eb41ebaefb8c072565e23641152dd (patch)
tree5907116e8639897aaf3b18e8a08df08992170ef1
parent9478b7ff315c274603f1d85a9e48d44288750b3c (diff)
downloadqpid-python-409f1c667a0eb41ebaefb8c072565e23641152dd.tar.gz
QPID-390
Added test case for all the AMQQueue alerts git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@513748 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java13
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java164
2 files changed, 174 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 9657fcfcb3..b3c682c771 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -60,6 +60,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+ private Notification _lastNotification = null;
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -227,11 +229,16 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
// important : add log to the log file - monitoring tools may be looking for this
_logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
-
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ notificationMsg = notification.name() + " " + notificationMsg;
+ _lastNotification = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
- _broadcaster.sendNotification(n);
+ _broadcaster.sendNotification(_lastNotification);
+ }
+
+ public Notification getLastNotification()
+ {
+ return _lastNotification;
}
/** @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop() */
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
new file mode 100644
index 0000000000..fee091e3bd
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.management.Notification;
+
+/**
+ * This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters
+ */
+public class AMQQueueAlertTest extends TestCase
+{
+ private final static int MAX_MESSAGE_COUNT = 50;
+ private final static long MAX_MESSAGE_AGE = 2000; // 2 sec
+ private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
+ private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB
+ private AMQQueue _queue;
+ private AMQQueueMBean _queueMBean;
+ private QueueRegistry _queueRegistry;
+ private MessageStore _messageStore = new SkeletonMessageStore();
+
+ /**
+ * Tests if the alert gets thrown when message count increases the threshold limit
+ * @throws Exception
+ */
+ public void testMessageCountAlert() throws Exception
+ {
+ _queue = new AMQQueue("testQueue1", false, "AMQueueAlertTest", false, _queueRegistry);
+ _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+
+ sendMessages(MAX_MESSAGE_COUNT, 256l);
+ assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
+
+ Notification lastNotification= _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_COUNT_ALERT.name()));
+ }
+
+ /**
+ * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent
+ * @throws Exception
+ */
+ public void testMessageSizeAlert() throws Exception
+ {
+ _queue = new AMQQueue("testQueue2", false, "AMQueueAlertTest", false, _queueRegistry);
+ _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
+
+ sendMessages(1, MAX_MESSAGE_SIZE * 2);
+ assertTrue(_queueMBean.getMessageCount() == 1);
+
+ Notification lastNotification= _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_SIZE_ALERT.name()));
+ }
+
+ /**
+ * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
+ * @throws Exception
+ */
+ public void testQueueDepthAlert() throws Exception
+ {
+ _queue = new AMQQueue("testQueue3", false, "AMQueueAlertTest", false, _queueRegistry);
+ _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+ while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
+ {
+ sendMessages(1, MAX_MESSAGE_SIZE);
+ }
+
+ Notification lastNotification= _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+ }
+
+ /**
+ * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than
+ * threshold value of message age
+ * @throws Exception
+ */
+ public void testMessageAgeAlert() throws Exception
+ {
+ _queue = new AMQQueue("testQueue4", false, "AMQueueAlertTest", false, _queueRegistry);
+ _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
+
+ sendMessages(1, MAX_MESSAGE_SIZE);
+ Thread.sleep(MAX_MESSAGE_AGE);
+ sendMessages(1, MAX_MESSAGE_SIZE);
+ assertTrue(_queueMBean.getMessageCount() == 2);
+
+ Notification lastNotification= _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name()));
+ }
+
+ private AMQMessage message(boolean immediate, long size) throws AMQException
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Establish some way to determine the version for the test.
+ BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
+ publish.immediate = immediate;
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.bodySize = size; // in bytes
+ AMQMessage message = new AMQMessage(_messageStore, publish);//, contentHeaderBody, null);
+ message.setContentHeaderBody(contentHeaderBody);
+ return message;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _queueRegistry = new DefaultQueueRegistry();
+ }
+
+ private void sendMessages(int messageCount, long size) throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[messageCount];
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message(false, size);
+ }
+ for (int i = 0; i < messageCount; i++)
+ {
+ _queue.deliver(messages[i]);
+ }
+ }
+}