summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
blob: a7e3ebdd1477314fbcdc58bbfca76805557b2883 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
 *
 * Copyright (c) 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
package org.apache.qpid.client.protocol;

import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;

public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
    private volatile boolean _ready = false;

    public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException;

    private final Object _lock = new Object();

    /**
     * This is set if there is an exception thrown from processCommandFrame and the
     * exception is rethrown to the caller of blockForFrame()
     */
    private volatile Exception _error;

    protected int _channelId;

    protected AMQMethodEvent _doneEvt = null;

    public BlockingMethodFrameListener(int channelId)
    {
        _channelId = channelId;
    }

    /**
     * This method is called by the MINA dispatching thread. Note that it could
     * be called before blockForFrame() has been called.
     * @param evt the frame event
     * @return true if the listener has dealt with this frame
     * @throws AMQException
     */
    public boolean methodReceived(AMQMethodEvent evt) throws AMQException
    {
        AMQMethodBody method = evt.getMethod();

        try
        {
            boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
            if (ready)
            {
                // we only update the flag from inside the synchronized block
                // so that the blockForFrame method cannot "miss" an update - it
                // will only ever read the flag from within the synchronized block
                synchronized (_lock)
                {
                    _doneEvt = evt;
                    _ready = ready;
                    _lock.notify();
                }
            }
            return ready;
        }
        catch (AMQException e)
        {
            error(e);
            // we rethrow the error here, and the code in the frame dispatcher will go round
            // each listener informing them that an exception has been thrown
            throw e;
        }
    }

    /**
     * This method is called by the thread that wants to wait for a frame.
     */
    public AMQMethodEvent blockForFrame() throws AMQException
    {
        synchronized (_lock)
        {
            while (!_ready)
            {
                try
                {
                    _lock.wait();
                }
                catch (InterruptedException e)
                {
                    // IGNORE
                }
            }
        }
        if (_error != null)
        {
            if (_error instanceof AMQException)
            {
                throw (AMQException)_error;
            }
            else
            {
                throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
            }
        }

        return _doneEvt;
    }

    /**
     * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
     * class to avoid code repetition but again is only called by the MINA dispatcher thread.
     * @param e
     */
    public void error(Exception e)
    {
        // set the error so that the thread that is blocking (against blockForFrame())
        // can pick up the exception and rethrow to the caller
        _error = e;
        synchronized (_lock)
        {
            _ready = true;
            _lock.notify();
        }
    }
}