summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
blob: 208658a5ffc54a602bf9a2f54749b77d9e73e5e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client.util;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;

/**
 * BlockingWaiter is a 'rendezvous' which delegates handling of
 * incoming Objects to a listener implemented as a sub-class of this and hands off the process or
 * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
 * differs from a 'rendezvous' in that sense.
 *
 * <p/>BlockingWaiters are used to coordinate when waiting for an an event that expect a response.
 * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
 * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
 * have been completed.
 *
 * <p/>The {@link #process} must return <tt>true</tt> on any incoming method that it handles. This indicates to
 * this listeners that the object just processed ends the waiting process.
 *
 * <p/>Errors from the producer are rethrown to the consumer.
 *
 * <p/><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations </td>
 * <tr><td> Accept generic objects as events for processing via {@link #process}. <td>
 * <tr><td> Delegate handling and undserstanding of the object to a concrete implementation. <td>
 * <tr><td> Block until {@link #process} determines that waiting is no longer required <td>
 * <tr><td> Propagate the most recent exception to the consumer.<td>
 * </table>
 *
 * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
 * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
 * when this happens. At the very least, restore the interrupted status flag.
 * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
 * check that SynchronousQueue has a non-blocking put method available.
 */
public abstract class BlockingWaiter<T>
{
    /** This flag is used to indicate that the blocked for method has been received. */
    private volatile boolean _ready = false;

    /** This flag is used to indicate that the received error has been processed. */
    private volatile boolean _errorAck = false;

    /** Used to protect the shared event and ready flag between the producer and consumer. */
    private final ReentrantLock _lock = new ReentrantLock();

    /** Used to signal that a method has been received */
    private final Condition _receivedCondition = _lock.newCondition();

    /** Used to signal that a error has been processed */
    private final Condition _errorConditionAck = _lock.newCondition();

    /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
    private volatile Exception _error;

    /** Holds the incomming Object. */
    protected Object _doneObject = null;
    private AtomicBoolean _waiting = new AtomicBoolean(false);
    private boolean _closed = false;

    /**
     * Delegates processing of the incomming object to the handler.
     *
     * @param object The object to process.
     *
     * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue.
     */
    public abstract boolean process(T object);

    /**
     * An Object has been received and should be processed to see if our wait condition has been reached.
     *
     * @param object The object received.
     *
     * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue.
     */
    public boolean received(T object)
    {

        boolean ready = process(object);

        if (ready)
        {
            // we only update the flag from inside the synchronized block
            // so that the blockForFrame method cannot "miss" an update - it
            // will only ever read the flag from within the synchronized block
            _lock.lock();
            try
            {
                _doneObject = object;
                _ready = ready;
                _receivedCondition.signal();
            }
            finally
            {
                _lock.unlock();
            }
        }

        return ready;
    }

    /**
     * Blocks until an object is received that is handled by process, or the specified timeout
     * has passed.
     *
     * Once closed any attempt to wait will throw an exception.
     *
     * @param timeout The timeout in milliseconds.
     *
     * @return The object that resolved the blocking.
     *
     * @throws AMQException
     * @throws FailoverException
     */
    public Object block(long timeout) throws AMQException, FailoverException
    {
        long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);

        _lock.lock();

        try
        {
            if (_closed)
            {
                throw throwClosedException();
            }

            if (_error == null)
            {
                _waiting.set(true);

                while (!_ready)
                {
                    try
                    {
                        if (timeout == -1)
                        {
                            _receivedCondition.await();
                        }
                        else
                        {
                            nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout);

                            if (nanoTimeout <= 0 && !_ready && _error == null)
                            {
                                _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
                                _ready = true;
                            }
                        }
                    }
                    catch (InterruptedException e)
                    {
                        System.err.println(e.getMessage());
                        // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
                        // if (!_ready && timeout != -1)
                        // {
                        // _error = new AMQException("Server did not respond timely");
                        // _ready = true;
                        // }
                    }
                }
            }

            if (_error != null)
            {
                if (_error instanceof AMQException)
                {
                    throw (AMQException) _error;
                }
                else if (_error instanceof FailoverException)
                {
                    // This should ensure that FailoverException is not wrapped and can be caught.
                    throw (FailoverException) _error; // needed to expose FailoverException.
                }
                else
                {
                    throw new AMQException("Woken up due to " + _error.getClass(), _error);
                }
            }

        }
        finally
        {
            _waiting.set(false);

            //Release Error handling thread
            if (_error != null)
            {
                _errorAck = true;
                _errorConditionAck.signal();

                _error = null;
            }
            _lock.unlock();
        }

        return _doneObject;
    }

    /**
     * This is a callback, called when an error has occured that should interupt any waiter.
     * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
     *
     * Once closed any notification of an exception will be ignored.
     *
     * @param e The exception being propogated.
     */
    public void error(Exception e)
    {
        // set the error so that the thread that is blocking (against blockForFrame())
        // can pick up the exception and rethrow to the caller

        _lock.lock();

        try
        {
            if (_closed)
            {
                return;
            }

            if (_error == null)
            {
                _error = e;
            }
            else
            {
                System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
            }

            if (_waiting.get())
            {

                _ready = true;
                _receivedCondition.signal();

                while (!_errorAck)
                {
                    try
                    {
                        _errorConditionAck.await();
                    }
                    catch (InterruptedException e1)
                    {
                        System.err.println(e.getMessage());
                    }
                }
                _errorAck = false;
            }
        }
        finally
        {
            _lock.unlock();
        }
    }

    /**
     * Close this Waiter so that no more errors are processed.
     * This is a preventative method to ensure that a second error thread does not get stuck in the error method after
     * the await has returned. This has not happend but in practise but if two errors occur on the Connection at
     * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a
     * waiter.
     *
     * Once closed any attempt to wait will throw an exception.
     * Any notification of an exception will be ignored.
     */
    public void close()
    {
        _lock.lock();
        try
        {
            //if we have already closed then our job is done.
            if (_closed)
            {
                return;
            }

            //Close Waiter so no more exceptions are processed
            _closed = true;

            //Wake up any await() threads

            //If we are waiting then use the error() to wake them up.
            if (_waiting.get())
            {
                error(throwClosedException());
            }
            //If they are not waiting then there is nothing to do.

            // Wake up any error handling threads

            if (!_errorAck)
            {
                _errorAck = true;
                _errorConditionAck.signal();

                _error = null;
            }
        }
        finally
        {
            _lock.unlock();
        }
    }

    /**
     * Helper method to generate the a closed Exception.
     *
     * todo: This should be converted to something more friendly. 
     *
     * @return AMQException to throw to waiters when the Waiter is closed.
     */
    private AMQException throwClosedException()
    {
        return new AMQException(null, "Waiter was closed.", null);
    }

}