summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
blob: 4ac0d2c4d65633832130b8e08563ae7e38388ecf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.queue;

import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.client.AMQConnection;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Test DeapQueueConsumerWithSelector
 * Summary:
 * Prior to M4 the broker had a different queue model which pre-processed the
 * messages on the queue for any connecting subscription that had a selector.
 *
 * If the queue had a lot of data then this may take a long time to process
 * to such an extent that the subscription creation may time out. During this
 * pre-process phase the virtualhost would be come unresposive.
 *
 * Our solution was to allow the timeout to be adjusted QPID-1119, which allowed
 * the subscription to connect but did not address the unresponsiveness.
 *
 * The new queue model introduced in M4 resolved this.
 *
 * This test is to validate that the new queueing model does indeed remove the
 * long pre-processing phase and allow immediate subscription so that there is
 * no unresponsive period.
 *
 * Test Strategy:
 *
 * Add 100k messages to the queue with a numberic header property that will
 * allow later subscribers to use as in a selector.
 *
 * Connect the subscriber and time how long it takes to connect.
 *
 * Finally consume all the messages from the queue to clean up.
 */
public class DeepQueueConsumeWithSelector extends QpidBrokerTestCase implements MessageListener
{

    private static final int MESSAGE_COUNT = 10000;
    private static final int BATCH_SIZE = MESSAGE_COUNT / 10;

    private CountDownLatch _receviedLatch = new CountDownLatch(MESSAGE_COUNT);

    protected long SYNC_WRITE_TIMEOUT = 120000L;


    public void setUp() throws Exception
    {
        //Set the syncWrite timeout to be just larger than the delay on the commitTran.
        setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT));

        super.setUp();
    }

    public void test() throws Exception
    {
        // Create Connection
        Connection connection = getConnection();
        Session session = ((AMQConnection)connection).createSession(true, Session.SESSION_TRANSACTED, 100000);

        Queue queue = (Queue) getInitialContext().lookup("queue");

        // Validate that the destination exists
        session.createConsumer(queue).close();

        // Send Messages
        sendMessage(session, queue, MESSAGE_COUNT, BATCH_SIZE);

        session.close();

        session = ((AMQConnection) connection).createSession(false, Session.AUTO_ACKNOWLEDGE);//, 100000);


        // Setup Selector to perform a few calculations which will slow it down
        String selector = "((\"" + INDEX + "\" % 1) = 0) AND ('" + INDEX + "' IS NOT NULL) AND ('" + INDEX + "' <> -1)";

        // Setup timing
        long start = System.nanoTime();

        System.err.println("Create Consumer");
        // Connect Consumer
        MessageConsumer consumer = session.createConsumer(queue, selector);
        consumer.setMessageListener(this);

        // Validate timing details
        long end = System.nanoTime();

        System.err.println("Subscription time took:" + (end - start));

        // Consume Messages
        connection.start();



        assertTrue("Messages took to long to be received :"+_receviedLatch.getCount(),
                   _receviedLatch.await(SYNC_WRITE_TIMEOUT, TimeUnit.MILLISECONDS   ));

    }

    @Override
    public Message createNextMessage(Session session, int msgCount) throws JMSException
    {
        Message message = super.createNextMessage(session,msgCount);

        if ((msgCount % BATCH_SIZE) == 0 )
        {
            System.err.println("Sent:"+msgCount);
        }

        return message;
    }

    public void onMessage(Message message)
    {
        _receviedLatch.countDown();
        int msgCount = 0;
        try
        {
            msgCount = message.getIntProperty(INDEX);
        }
        catch (JMSException e)
        {
            //ignore
        }
        if ((msgCount % BATCH_SIZE) == 0 )
        {
            System.err.println("Received:"+msgCount);            
        }

    }
}