summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
blob: a8bddcf6bfdced0ab0af6702442a56e43d3be26f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.queue;

import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.util.InternalBrokerBaseCase;

import javax.management.Notification;
import java.util.ArrayList;

/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends InternalBrokerBaseCase
{
    private final static long MAX_MESSAGE_COUNT = 50;
    private final static long MAX_MESSAGE_AGE = 250;   // 0.25 sec
    private final static long MAX_MESSAGE_SIZE = 2000;  // 2 KB
    private final static long MAX_QUEUE_DEPTH = 10000;  // 10 KB
    private AMQQueueMBean _queueMBean;
    private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;

    /**
     * Tests if the alert gets thrown when message count increases the threshold limit
     *
     * @throws Exception
     */
    public void testMessageCountAlert() throws Exception
    {
        setSession(new InternalTestProtocolSession(getVirtualHost()));
        AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
        getSession().addChannel(channel);

        setQueue(AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
                              false, false,
                getVirtualHost(), null));
        _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();

        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);

        sendMessages(channel, MAX_MESSAGE_COUNT, 256l);
        assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);

        Notification lastNotification = _queueMBean.getLastNotification();
        assertNotNull(lastNotification);

        String notificationMsg = lastNotification.getMessage();
        assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_COUNT_ALERT.name()));
    }

    /**
     * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent
     *
     * @throws Exception
     */
    public void testMessageSizeAlert() throws Exception
    {
        setSession(new InternalTestProtocolSession(getVirtualHost()));
        AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
        getSession().addChannel(channel);

        setQueue(AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
                              false, false,
                getVirtualHost(), null));
        _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
        _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);

        sendMessages(channel, 1, MAX_MESSAGE_SIZE * 2);
        assertTrue(_queueMBean.getMessageCount() == 1);

        Notification lastNotification = _queueMBean.getLastNotification();
        assertNotNull(lastNotification);

        String notificationMsg = lastNotification.getMessage();
        assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_SIZE_ALERT.name()));
    }

    /**
     * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
     *
     * Based on FT-402 subbmitted by client
     *
     * @throws Exception
     */
    public void testQueueDepthAlertNoSubscriber() throws Exception
    {
        setSession(new InternalTestProtocolSession(getVirtualHost()));
        AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
        getSession().addChannel(channel);

        setQueue(AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
                              false, false,
                getVirtualHost(), null));
        _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
        _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);

        while (getQueue().getQueueDepth() < MAX_QUEUE_DEPTH)
        {
            sendMessages(channel, 1, MAX_MESSAGE_SIZE);
        }

        Notification lastNotification = _queueMBean.getLastNotification();
        assertNotNull(lastNotification);

        String notificationMsg = lastNotification.getMessage();
        assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
    }

    /**
     * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of
     * message age
     *
     * Alternative test to FT-401 provided by client
     *
     * @throws Exception
     */
    public void testMessageAgeAlert() throws Exception
    {
        setSession(new InternalTestProtocolSession(getVirtualHost()));
        AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
        getSession().addChannel(channel);

        setQueue(AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
                              false, false,
                getVirtualHost(), null));
        _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
        _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);

        sendMessages(channel, 1, MAX_MESSAGE_SIZE);

        // Ensure message sits on queue long enough to age.
        Thread.sleep(MAX_MESSAGE_AGE * 2);

        Notification lastNotification = _queueMBean.getLastNotification();
        assertNotNull("Last notification was null", lastNotification);

        String notificationMsg = lastNotification.getMessage();
        assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name()));
    }

    /*
     This test sends some messages to the queue with subscribers needing message to be acknowledged.
     The messages will not be acknowledged and will be required twice. Why we are checking this is because
     the bug reported said that the queueDepth keeps increasing when messages are requeued.
     // TODO - queue depth now includes unacknowledged messages so does not go down when messages are delivered

     The QueueDepth should decrease when messages are delivered from the queue (QPID-408)
    */
    public void testQueueDepthAlertWithSubscribers() throws Exception
    {
        AMQChannel channel = new AMQChannel(getSession(), 2, getMessageStore());
        getSession().addChannel(channel);

        // Create queue
        setQueue(getNewQueue());
        Subscription subscription =
                SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), getSession(), new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager());

        getQueue().registerSubscription(
                subscription, false);

        _queueMBean = (AMQQueueMBean) getQueue().getManagedObject();
        _queueMBean.setMaximumMessageCount(9999l);   // Set a high value, because this is not being tested
        _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);

        // Send messages(no of message to be little more than what can cause a Queue_Depth alert)
        int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10;
        long totalSize = (messageCount * MAX_MESSAGE_SIZE);
        sendMessages(channel, messageCount, MAX_MESSAGE_SIZE);

        // Check queueDepth. There should be no messages on the queue and as the subscriber is listening
        // so there should be no Queue_Deoth alert raised
        assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
        Notification lastNotification = _queueMBean.getLastNotification();
//        assertNull(lastNotification);

        // Kill the subscriber and check for the queue depth values.
        // Messages are unacknowledged, so those should get requeued. All messages should be on the Queue
        getQueue().unregisterSubscription(subscription);
        channel.requeue();

        assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));

        lastNotification = _queueMBean.getLastNotification();
        assertNotNull(lastNotification);
        String notificationMsg = lastNotification.getMessage();
        assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));

        // Connect a consumer again and check QueueDepth values. The queue should get emptied.
        // Messages will get delivered but still are unacknowledged.
        Subscription subscription2 =
                SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), getSession(), new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager());

        getQueue().registerSubscription(
                subscription2, false);

        while (getQueue().getUndeliveredMessageCount()!= 0)
        {
            Thread.sleep(100);
        }
//        assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));

        // Kill the subscriber again. Now those messages should get requeued again. Check if the queue depth
        // value is correct.
        getQueue().unregisterSubscription(subscription2);
        channel.requeue();

        assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth()));
        getSession().closeSession();

        // Check the clear queue
        _queueMBean.clearQueue();
        assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
    }

    protected IncomingMessage message(final boolean immediate, long size) throws AMQException
    {
        MessagePublishInfo publish = new MessagePublishInfo()
        {

            public AMQShortString getExchange()
            {
                return null;
            }

            public void setExchange(AMQShortString exchange)
            {
                //To change body of implemented methods use File | Settings | File Templates.
            }

            public boolean isImmediate()
            {
                return immediate;
            }

            public boolean isMandatory()
            {
                return false;
            }

            public AMQShortString getRoutingKey()
            {
                return null;
            }
        };

        ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
        contentHeaderBody.setProperties(props);
        contentHeaderBody.bodySize = size;   // in bytes
        IncomingMessage message = new IncomingMessage(publish);
        message.setContentHeaderBody(contentHeaderBody);

        return message;
    }

    @Override
    protected void configure()
    {
        // Increase Alert Check period
        getConfiguration().setHousekeepingExpiredMessageCheckPeriod(200);
    }

    private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
    {
        IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
        MessageMetaData[] metaData = new MessageMetaData[(int) messageCount];
        for (int i = 0; i < messages.length; i++)
        {
            messages[i] = message(false, size);
            ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
            qs.add(getQueue());
            metaData[i] = messages[i].headersReceived();
            messages[i].setStoredMessage(getMessageStore().addMessage(metaData[i]));

            messages[i].enqueue(qs);

        }

        for (int i = 0; i < messageCount; i++)
        {
            messages[i].addContentBodyFrame(new ContentChunk(){

                ByteBuffer _data = ByteBuffer.allocate((int)size);

                {
                    _data.limit((int)size);
                }

                public int getSize()
                {
                    return (int) size;
                }

                public ByteBuffer getData()
                {
                    return _data;
                }

                public void reduceToFit()
                {

                }
            });

            getQueue().enqueue(new AMQMessage(messages[i].getStoredMessage()));

        }
    }

    private AMQQueue getNewQueue() throws AMQException
    {
        return AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue" + Math.random()),
                            false,
                            new AMQShortString("AMQueueAlertTest"),
                            false,
                            false, getVirtualHost(), null);
    }
}