summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
blob: d91b9b92635ebd023a69f94cb43d2077f0963afb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.prefetch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class PrefetchBehaviourTest extends QpidBrokerTestCase
{
    protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class);
    private Connection _normalConnection;
    private AtomicBoolean _exceptionCaught;
    private CountDownLatch _processingStarted;
    private CountDownLatch _processingCompleted;

    protected void setUp() throws Exception
    {
        super.setUp();
        _normalConnection = getConnection();
        _exceptionCaught = new AtomicBoolean();
        _processingStarted = new CountDownLatch(1);
        _processingCompleted = new CountDownLatch(1);
    }

    /**
     * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only
     * gets 1 of the messages sent, with the second consumer picking up the others while the
     * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin.
     */
    public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception
    {
        final long processingTime = 5000;
        
        //create a second connection with prefetch set to 1
        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
        Connection prefetch1Connection = getConnection();

        prefetch1Connection.start();
        _normalConnection.start();

        //create an asynchronous consumer with simulated slow processing
        final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = prefetch1session.createQueue(getTestQueueName());
        MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
        prefetch1consumer.setMessageListener(new MessageListener()
        {
            public void onMessage(Message message)
            {
                try
                {
                    _logger.debug("starting processing");
                    _processingStarted.countDown();
                    _logger.debug("processing started");

                    //simulate message processing
                    Thread.sleep(processingTime);

                    prefetch1session.commit();

                    _processingCompleted.countDown();
                }
                catch(Exception e)
                {
                    _logger.error("Exception caught in message listener");
                    _exceptionCaught.set(true);
                }
            }
        });

        //create producer and send 5 messages
        Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
        MessageProducer producer = producerSession.createProducer(queue);

        for (int i = 0; i < 5; i++)
        {
            producer.send(producerSession.createTextMessage("test"));
        }
        producerSession.commit();

        //wait for the first message to start being processed by the async consumer
        assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS));
        _logger.debug("proceeding with test");

        //try to consumer the other messages with another consumer while the async procesisng occurs
        Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer normalConsumer = normalSession.createConsumer(queue);        
        
        Message msg;
        // Check that other consumer gets the other 4 messages
        for (int i = 0; i < 4; i++)
        {
            msg = normalConsumer.receive(1500);
            assertNotNull("Consumer should receive 4 messages",msg);                
        }
        msg = normalConsumer.receive(250);
        assertNull("Consumer should not have received a 5th message",msg);

        //wait for the other consumer to finish to ensure it completes ok
        _logger.debug("waiting for async consumer to complete");
        assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS));
        assertFalse("Unexpected exception during async message processing",_exceptionCaught.get());
    }

    /**
     * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty.
     *
     */
    public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception
    {
        Queue queue = getTestQueue();

        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());

        Connection connection = getConnection();
        connection.start();
        // Create Consumer A
        Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumerA = consSessA.createConsumer(queue);

        // ensure message delivery to consumer A is started (required for 0-8..0-9-1)
        final Message msg = consumerA.receiveNoWait();
        assertNull(msg);

        Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
        sendMessage(producerSession, queue, 3);

        // Create Consumer B
        MessageConsumer consumerB = null;
        if (isBroker010())
        {
            // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A
            consumerB = consSessA.createConsumer(queue);
        }
        else
        {
            // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session
            Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            consumerB = consSessB.createConsumer(queue);
        }

        // As message delivery to consumer A is already started, the first two messages should
        // now be with consumer A.  The last message will still be on the Broker as consumer A's
        // credit is exhausted and message delivery for consumer B is not yet running.

        // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A.
        // If we were to reverse the order, the SessionComplete will restore Consumer A's credit,
        // and the third message could be delivered to either Consumer A or Consumer B.

        // Check that consumer B gets the last (third) message.
        final Message msgConsumerB = consumerB.receive(1500);
        assertNotNull("Consumer B should have received a message", msgConsumerB);
        assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX));

        // Now check that consumer A has indeed got the first two messages.
        for (int i = 0; i < 2; i++)
        {
            final Message msgConsumerA = consumerA.receive(1500);
            assertNotNull("Consumer A should have received a message " + i, msgConsumerA);
            assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX));
        }
    }
}