summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
blob: 4c2758241efe00fe25a114436ad2353b90cdfabe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.failover;

import org.apache.mina.common.WriteTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.FailoverBaseCase;
import org.apache.qpid.AMQConnectionClosedException;

import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Test case based on user reported error.
 *
 * Summary:
 * A user has reported message loss from their application. On bouncing of
 * the broker the 'lost' messages are delivered to the broker.
 *
 * Note:
 * The client was using Spring so that may influence the situation.
 *
 * Issue:
 * The log files show 7 instances of the following which result in 7
 * missing messages.
 *
 * The client log files show:
 *
 * The broker log file show:
 *
 *
 * 7 missing messages have delivery tags 5-11. Which says that they are
 * sequentially the next message from the broker.
 *
 * The only way for the 'without a handler' log to occur is if the consumer
 * has been removed from the look up table of the dispatcher.
 * And the only way for the 'null message' log to occur on the broker is is
 * if the message does not exist in the unacked-map
 *
 * The consumer is only removed from the list during session
 * closure and failover.
 *
 * If the session was closed then the broker would requeue the unacked
 * messages so the potential exists to have an empty map but the broker
 * will not send a message out after the unacked map has been cleared.
 *
 * When failover occurs the _consumer map is cleared and the consumers are
 * resubscribed. This is down without first stopping any existing
 * dispatcher so there exists the potential to receive a message after
 * the _consumer map has been cleared which is how the 'without a handler'
 * log statement occurs.
 *
 * Scenario:
 *
 * Looking over logs the sequence that best fits the events is as follows:
 * - Something causes Mina to be delayed causing the WriteTimoutException.
 * - This exception is recevied by AMQProtocolHandler#exceptionCaught
 * - As the WriteTimeoutException is an IOException this will cause
 * sessionClosed to be called to start failover.
 * + This is potentially the issues here. All IOExceptions are treated
 * as connection failure events.
 * - Failover Runs
 * + Failover assumes that the previous connection has been closed.
 * + Failover binds the existing objects (AMQConnection/Session) to the
 * new connection objects.
 * - Everything is reported as being successfully failed over.
 * However, what is neglected is that the original connection has not
 * been closed.
 * + So what occurs is that the broker sends a message to the consumer on
 * the original connection, as it was not notified of the client
 * failing over.
 * As the client failover reuses the original AMQSession and Dispatcher
 * the new messages the broker sends to the old consumer arrives at the
 * client and is processed by the same AMQSession and Dispatcher.
 * However, as the failover process cleared the _consumer map and
 * resubscribe the consumers the Dispatcher does not recognise the
 * delivery tag and so logs the 'without a handler' message.
 * - The Dispatcher then attempts to reject the message, however,
 * + The AMQSession/Dispatcher pair have been swapped to using a new Mina
 * ProtocolSession as part of the failover process so the reject is
 * sent down the second connection. The broker receives the Reject
 * request but as the Message was sent on a different connection the
 * unacknowledgemap is empty and a 'message is null' log message
 * produced.
 *
 * Test Strategy:
 *
 * It should be easy to demonstrate if we can send an IOException to
 * AMQProtocolHandler#exceptionCaught and then try sending a message.
 *
 * The current unknowns here are the type of consumers that are in use.
 * If it was an exclusive queue(Durable Subscription) then why did the
 * resubscribe not fail.
 *
 * If it was not exclusive then why did the messages not round robin?
 */
public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener
{
    private CountDownLatch _failoverOccured = new CountDownLatch(1);
    AMQConnection _connection;
    Session _session;
    Queue _queue;
    MessageConsumer _consumer;

    public void setUp() throws Exception
    {
        super.setUp();
        stopBroker(getFailingPort());

    }

    /**
     * Test Summary:
     *
     * Create a queue consumer and send 10 messages to the broker.
     *
     * Consume the first message.
     * This will pull the rest into the prefetch
     *
     * Send an IOException to the MinaProtocolHandler.
     *
     * This will force failover to occur.
     *
     * 9 messages would normally be expected but it is expected that none will
     * arrive. As they are still in the prefetch of the first session.
     *
     * To free the messages we need to close all connections.
     * - Simply doing connection.close() and retesting will not be enough as
     * the original connection's IO layer will still exist and is nolonger
     * connected to the connection object as a result of failover.
     *
     * - Test will need to retain a reference to the original connection IO so
     * that it can be closed releasing the messages to validate that the
     * messages have indeed been 'lost' on that sesssion.
     */
    public void test() throws Exception
    {
        initialiseConnection();

        // Create Producer
        // Send 10 messages
        List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10);

        // Consume first messasge
        Message received = _consumer.receive(2000);

        // Verify received messages
        assertNotNull("First message not received.", received);
        assertEquals("Incorrect message Received",
                     messages.remove(0).getIntProperty("count"),
                     received.getIntProperty("count"));

        // When the Exception is received by the underlying IO layer it will
        // initiate failover. The first step of which is to ensure that the
        // existing conection is closed. So in this situation the connection
        // will be flushed casuing the above ACK to be sent to the broker.
        //
        // That said:
        // when the socket close is detected on the server it will rise up the
        // Mina filter chain and interrupt processing.
        // this has been raised as QPID-2138
        _session.createConsumer(_session.createTemporaryQueue()).close();

        //Retain IO Layer
        AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();

        // Send IO Exception - causing failover
        _connection.getProtocolHandler().
                exception(new WriteTimeoutException("WriteTimeoutException to cause failover."));

        // Verify Failover occured through ConnectionListener
        assertTrue("Failover did not occur",
                   _failoverOccured.await(4000, TimeUnit.MILLISECONDS));

        /***********************************/
        // This verifies that the bug has been resolved

        // Attempt to consume again. Expect 9 messages
        for (int count = 1; count < 10; count++)
        {
            received = _consumer.receive(2000);
            assertNotNull("Expected message not received:" + count, received);
            assertEquals(messages.remove(0).getIntProperty("count"),
                         received.getIntProperty("count"));
        }

        //Verify there are no more messages
        received = _consumer.receive(1000);
        assertNull("Message receieved when there should be none:" + received,
                   received);

//        /***********************************/
//        // This verifies that the bug exists
//
//        // Attempt to consume remaining 9 messages.. Expecting NONE.
//        // receiving just one message should fail so no need to fail 9 times
//        received = _consumer.receive(1000);
//        assertNull("Message receieved when it should be null:" + received, received);
//
////        //Close the Connection which you would assume would free the messages
////        _connection.close();
////
////        // Reconnect
////        initialiseConnection();
////
////        // We should still be unable to receive messages
////        received = _consumer.receive(1000);
////        assertNull("Message receieved when it should be null:" + received, received);
////
////        _connection.close();
//
//        // Close original IO layer. Expecting messages to be released
//        protocolSession.closeProtocolSession();
//
//        // Reconnect and all should be good.
////        initialiseConnection();
//
//        // Attempt to consume again. Expect 9 messages
//        for (int count = 1; count < 10; count++)
//        {
//            received = _consumer.receive(2000);
//            assertNotNull("Expected message not received:" + count, received);
//            assertEquals(messages.remove(0).getIntProperty("count"),
//                         received.getIntProperty("count"));
//        }
//
//        //Verify there are no more messages
//        received = _consumer.receive(1000);
//        assertNull("Message receieved when there should be none:" + received,
//                   received);
    }

    private void initialiseConnection()
            throws Exception
    {
        //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default
        _connection = (AMQConnection) getConnectionFactory("default").createConnection("guest", "guest");
        // The default connection does not have any retries configured so
        // Allow this connection to retry so that we can block on the failover.
        // The alternative would be to use the getConnection() default. However,
        // this would add additional complexity in the logging as a second
        // broker is defined in that url. We do not need it for this test.
        _connection.getFailoverPolicy().getCurrentMethod().setRetries(1);
        _connection.setConnectionListener(this);

        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        _queue = _session.createQueue(getTestQueueName());

        // Create Consumer
        _consumer = _session.createConsumer(_queue);

        //Start connection
        _connection.start();
    }

    /** QpidTestCase back port to this release */

    // modified from QTC as sendMessage is not testable.
    //  - should be renamed sendBlankBytesMessage
    //  - should be renamed sendNumberedBytesMessage
    public List<Message> sendNumberedBytesMessage(Session session, Destination destination,
                                                  int count) throws Exception
    {
        List<Message> messages = new ArrayList<Message>(count);

        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < count; i++)
        {
            Message next = session.createMessage();

            next.setIntProperty("count", i);

            producer.send(next);

            messages.add(next);
        }

        producer.close();
        return messages;
    }

    public void bytesSent(long count)
    {
        //To change body of implemented methods use File | Settings | File Templates.
    }

    public void bytesReceived(long count)
    {
    }

    public boolean preFailover(boolean redirect)
    {
        //Allow failover to occur
        return true;
    }

    public boolean preResubscribe()
    {
        //Allow failover to occur
        return true;
    }

    public void failoverComplete()
    {
        _failoverOccured.countDown();
    }
}