summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
blob: 0ab2e07340aed393cb94d53f1959ff9ae0c22452 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client.protocol;

import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;

/**
 * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of
 * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or
 * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
 * differs from a 'rendezvous' in that sense.
 *
 * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response.
 * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
 * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
 * have been completed.
 *
 * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to
 * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to
 * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The
 * channel id must be passed to the constructor.
 *
 * <p/>Errors from the producer are rethrown to the consumer.
 *
 * <p/><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations
 * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent}
 * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody}
 * <tr><td> Block until a method is handled by the delegated to handler.
 * <tr><td> Propagate the most recent exception to the consumer.
 * </table>
 *
 * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
 *       methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
 *       seem to use it. So wrapping the listeners is possible.
 *
 * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener,
 *       overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot
 *       behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for
 *       method has been received.
 *
 * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
 *       for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
 *       when this happens. At the very least, restore the interrupted status flag.
 *
 * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
 *       check that SynchronousQueue has a non-blocking put method available.
 */
public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
    /** This flag is used to indicate that the blocked for method has been received. */
    private volatile boolean _ready = false;

    /** Used to protect the shared event and ready flag between the producer and consumer. */
    private final Object _lock = new Object();

    /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
    private volatile Exception _error;

    /** Holds the channel id for the channel upon which this listener is waiting for a response. */
    protected int _channelId;

    /** Holds the incoming method. */
    protected AMQMethodEvent _doneEvt = null;

    /**
     * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
     *
     * @param channelId The channel id to filter incoming methods with.
     */
    public BlockingMethodFrameListener(int channelId)
    {
        _channelId = channelId;
    }

    /**
     * Delegates any additional handling of the incoming methods to another handler.
     *
     * @param channelId The channel id of the incoming method.
     * @param frame     The method body.
     *
     * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
     */
    public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException;

    /**
     * Informs this listener that an AMQP method has been received.
     *
     * @param evt The AMQP method.
     *
     * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
     */
    public boolean methodReceived(AMQMethodEvent evt) // throws AMQException
    {
        AMQMethodBody method = evt.getMethod();

        /*try
        {*/
        boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);

        if (ready)
        {
            // we only update the flag from inside the synchronized block
            // so that the blockForFrame method cannot "miss" an update - it
            // will only ever read the flag from within the synchronized block
            synchronized (_lock)
            {
                _doneEvt = evt;
                _ready = ready;
                _lock.notify();
            }
        }

        return ready;

        /*}
        catch (AMQException e)
        {
            error(e);
            // we rethrow the error here, and the code in the frame dispatcher will go round
            // each listener informing them that an exception has been thrown
            throw e;
        }*/
    }

    /**
     * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout
     * has passed.
     *
     * @param timeout The timeout in milliseconds.
     *
     * @return The AMQP method that was received.
     *
     * @throws AMQException
     * @throws FailoverException
     */
    public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
    {
        synchronized (_lock)
        {
            while (!_ready)
            {
                try
                {
                    if (timeout == -1)
                    {
                        _lock.wait();
                    }
                    else
                    {

                        _lock.wait(timeout);
                        if (!_ready)
                        {
                            _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
                            _ready = true;
                        }
                    }
                }
                catch (InterruptedException e)
                {
                    // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
                    // if (!_ready && timeout != -1)
                    // {
                    // _error = new AMQException("Server did not respond timely");
                    // _ready = true;
                    // }
                }
            }
        }

        if (_error != null)
        {
            if (_error instanceof AMQException)
            {
                throw (AMQException) _error;
            }
            else if (_error instanceof FailoverException)
            {
                // This should ensure that FailoverException is not wrapped and can be caught.
                throw (FailoverException) _error; // needed to expose FailoverException.
            }
            else
            {
                throw new AMQException(null, "Woken up due to " + _error.getClass(), _error);
            }
        }

        return _doneEvt;
    }

    /**
     * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
     * class to avoid code repetition but again is only called by the MINA dispatcher thread.
     *
     * @param e
     */
    public void error(Exception e)
    {
        // set the error so that the thread that is blocking (against blockForFrame())
        // can pick up the exception and rethrow to the caller
        _error = e;

        synchronized (_lock)
        {
            _ready = true;
            _lock.notify();
        }
    }
}