summaryrefslogtreecommitdiff
path: root/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java348
1 files changed, 0 insertions, 348 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
deleted file mode 100644
index 25d35aab16..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
-import javax.management.Notification;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-
-/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
-public class AMQQueueAlertTest extends InternalBrokerBaseCase
-{
- private final static long MAX_MESSAGE_COUNT = 50;
- private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec
- private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
- private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB
- private AMQQueueMBean _queueMBean;
- private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
-
- /**
- * Tests if the alert gets thrown when message count increases the threshold limit
- *
- * @throws Exception
- */
- public void testMessageCountAlert() throws Exception
- {
- setSession(new InternalTestProtocolSession(getVirtualHost()));
- AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
- getSession().addChannel(channel);
-
- setQueue(AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
- false, false,
- getVirtualHost(), null));
- _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
-
- _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
-
- sendMessages(channel, MAX_MESSAGE_COUNT, 256l);
- assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
-
- Notification lastNotification = _queueMBean.getLastNotification();
- assertNotNull(lastNotification);
-
- String notificationMsg = lastNotification.getMessage();
- assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_COUNT_ALERT.name()));
- }
-
- /**
- * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent
- *
- * @throws Exception
- */
- public void testMessageSizeAlert() throws Exception
- {
- setSession(new InternalTestProtocolSession(getVirtualHost()));
- AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
- getSession().addChannel(channel);
-
- setQueue(AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
- false, false,
- getVirtualHost(), null));
- _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
- _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
- _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
-
- sendMessages(channel, 1, MAX_MESSAGE_SIZE * 2);
- assertTrue(_queueMBean.getMessageCount() == 1);
-
- Notification lastNotification = _queueMBean.getLastNotification();
- assertNotNull(lastNotification);
-
- String notificationMsg = lastNotification.getMessage();
- assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_SIZE_ALERT.name()));
- }
-
- /**
- * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
- *
- * Based on FT-402 subbmitted by client
- *
- * @throws Exception
- */
- public void testQueueDepthAlertNoSubscriber() throws Exception
- {
- setSession(new InternalTestProtocolSession(getVirtualHost()));
- AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
- getSession().addChannel(channel);
-
- setQueue(AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
- false, false,
- getVirtualHost(), null));
- _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
- _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
- _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
-
- while (getQueue().getQueueDepth() < MAX_QUEUE_DEPTH)
- {
- sendMessages(channel, 1, MAX_MESSAGE_SIZE);
- }
-
- Notification lastNotification = _queueMBean.getLastNotification();
- assertNotNull(lastNotification);
-
- String notificationMsg = lastNotification.getMessage();
- assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
- }
-
- /**
- * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of
- * message age
- *
- * Alternative test to FT-401 provided by client
- *
- * @throws Exception
- */
- public void testMessageAgeAlert() throws Exception
- {
- setSession(new InternalTestProtocolSession(getVirtualHost()));
- AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
- getSession().addChannel(channel);
-
- setQueue(AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
- false, false,
- getVirtualHost(), null));
- _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
- _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
- _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
-
- sendMessages(channel, 1, MAX_MESSAGE_SIZE);
-
- // Ensure message sits on queue long enough to age.
- Thread.sleep(MAX_MESSAGE_AGE * 2);
-
- Notification lastNotification = _queueMBean.getLastNotification();
- assertNotNull("Last notification was null", lastNotification);
-
- String notificationMsg = lastNotification.getMessage();
- assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name()));
- }
-
- /*
- This test sends some messages to the queue with subscribers needing message to be acknowledged.
- The messages will not be acknowledged and will be required twice. Why we are checking this is because
- the bug reported said that the queueDepth keeps increasing when messages are requeued.
- // TODO - queue depth now includes unacknowledged messages so does not go down when messages are delivered
-
- The QueueDepth should decrease when messages are delivered from the queue (QPID-408)
- */
- public void testQueueDepthAlertWithSubscribers() throws Exception
- {
- AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
- getSession().addChannel(channel);
-
- // Create queue
- setQueue(getNewQueue());
- Subscription subscription =
- SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), getSession(), new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager());
-
- getQueue().registerSubscription(
- subscription, false);
-
- _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
- _queueMBean.setMaximumMessageCount(9999l); // Set a high value, because this is not being tested
- _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
-
- // Send messages(no of message to be little more than what can cause a Queue_Depth alert)
- int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10;
- long totalSize = (messageCount * MAX_MESSAGE_SIZE);
- sendMessages(channel, messageCount, MAX_MESSAGE_SIZE);
-
- // Check queueDepth. There should be no messages on the queue and as the subscriber is listening
- // so there should be no Queue_Deoth alert raised
- assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
- Notification lastNotification = _queueMBean.getLastNotification();
-// assertNull(lastNotification);
-
- // Kill the subscriber and check for the queue depth values.
- // Messages are unacknowledged, so those should get requeued. All messages should be on the Queue
- getQueue().unregisterSubscription(subscription);
- channel.requeue();
-
- assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
-
- lastNotification = _queueMBean.getLastNotification();
- assertNotNull(lastNotification);
- String notificationMsg = lastNotification.getMessage();
- assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
-
- // Connect a consumer again and check QueueDepth values. The queue should get emptied.
- // Messages will get delivered but still are unacknowledged.
- Subscription subscription2 =
- SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), getSession(), new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager());
-
- getQueue().registerSubscription(
- subscription2, false);
-
- while (getQueue().getUndeliveredMessageCount()!= 0)
- {
- Thread.sleep(100);
- }
-// assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
-
- // Kill the subscriber again. Now those messages should get requeued again. Check if the queue depth
- // value is correct.
- getQueue().unregisterSubscription(subscription2);
- channel.requeue();
-
- assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
- getSession().closeSession();
-
- // Check the clear queue
- _queueMBean.clearQueue();
- assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
- }
-
- protected IncomingMessage message(final boolean immediate, long size) throws AMQException
- {
- MessagePublishInfo publish = new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
-
- ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- contentHeaderBody.setProperties(props);
- contentHeaderBody.setBodySize(size); // in bytes
- IncomingMessage message = new IncomingMessage(publish);
- message.setContentHeaderBody(contentHeaderBody);
-
- return message;
- }
-
- @Override
- protected void configure()
- {
- // Increase Alert Check period
- getConfiguration().setHousekeepingCheckPeriod(200);
- }
-
- private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
- {
- IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
- MessageMetaData[] metaData = new MessageMetaData[(int) messageCount];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message(false, size);
- ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
- qs.add(getQueue());
- metaData[i] = messages[i].headersReceived(System.currentTimeMillis());
- messages[i].setStoredMessage(getMessageStore().addMessage(metaData[i]));
-
- messages[i].enqueue(qs);
-
- }
-
- for (int i = 0; i < messageCount; i++)
- {
- ContentChunk contentChunk = new ContentChunk()
- {
- private byte[] _data = new byte[(int)size];
-
- public int getSize()
- {
- return (int) size;
- }
-
- public byte[] getData()
- {
- return _data;
- }
-
- public void reduceToFit()
- {
- }
- };
-
- messages[i].addContentBodyFrame(contentChunk);
- messages[i].getStoredMessage().addContent(0, ByteBuffer.wrap(contentChunk.getData()));
-
- getQueue().enqueue(new AMQMessage(messages[i].getStoredMessage()));
- }
- }
-
- private AMQQueue getNewQueue() throws AMQException
- {
- return AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue" + Math.random()),
- false,
- new AMQShortString("AMQueueAlertTest"),
- false,
- false, getVirtualHost(), null);
- }
-}