summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
blob: 2bc609ebf29860bc530fa69d56473d7906e61168 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.client.protocol;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.util.BlockingWaiter;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;

/**
 * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of
 * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or
 * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
 * differs from a 'rendezvous' in that sense.
 *
 * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response.
 * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
 * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
 * have been completed.
 *
 * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to
 * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to
 * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The
 * channel id must be passed to the constructor.
 *
 * <p/>Errors from the producer are rethrown to the consumer.
 *
 * <p/><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations
 * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent}
 * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody}
 * <tr><td> Block until a method is handled by the delegated to handler.
 * <tr><td> Propagate the most recent exception to the consumer.
 * </table>
 *
 * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
 * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
 * seem to use it. So wrapping the listeners is possible.
 * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
 * check that SynchronousQueue has a non-blocking put method available.
 */
public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
{

    /** Holds the channel id for the channel upon which this listener is waiting for a response. */
    protected int _channelId;

    /**
     * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
     *
     * @param channelId The channel id to filter incoming methods with.
     */
    public BlockingMethodFrameListener(int channelId)
    {
        _channelId = channelId;
    }

    /**
     * Delegates any additional handling of the incoming methods to another handler.
     *
     * @param channelId The channel id of the incoming method.
     * @param frame     The method body.
     *
     * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
     */
    public abstract boolean processMethod(int channelId, AMQMethodBody frame);

    public boolean process(AMQMethodEvent evt)
    {
        AMQMethodBody method = evt.getMethod();

        return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
    }

    /**
     * Informs this listener that an AMQP method has been received.
     *
     * @param evt The AMQP method.
     *
     * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
     */
    public boolean methodReceived(AMQMethodEvent evt)
    {
        return received(evt);
    }

    /**
     * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout
     * has passed.
     *
     * @param timeout The timeout in milliseconds.
     *
     * @return The AMQP method that was received.
     *
     * @throws AMQException
     * @throws FailoverException
     */
    public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
    {
        try
        {
            return (AMQMethodEvent) block(timeout);
        }
        finally
        {
            //Prevent any more errors being notified to this waiter.
            close();
        }
    }

}