summaryrefslogtreecommitdiff
path: root/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java
blob: f193862e78a7c1b6c4b7405edb97e703cf78b826 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
/*
 *
 * Copyright (c) 2006 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
package org.apache.qpid.server.cluster;

import org.apache.log4j.Logger;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionRedirectBody;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
 * A 'client stub' for a remote cluster peer, using MINA for IO Layer
 *
 */
public class MinaBrokerProxy extends Broker implements MethodHandler
{
    private static final Logger _logger = Logger.getLogger(MinaBrokerProxy.class);
    private final ConnectionStatusMonitor _connectionMonitor = new ConnectionStatusMonitor();
    private final ClientHandlerRegistry _legacyHandler;
    private final MinaBinding _binding = new MinaBinding();
    private final MemberHandle _local;
    private IoSession _session;
    private MethodHandler _handler;
    private Iterable<AMQMethodBody> _replay;

    MinaBrokerProxy(String host, int port, MemberHandle local)
    {
        super(host, port);
        _local = local;
        _legacyHandler = new ClientHandlerRegistry(local);
    }

    private void init(IoSession session)
    {
        _session = session;
        _handler = new ClientAdapter(session, _legacyHandler);
    }

    private ConnectFuture connectImpl()
    {
        _logger.info("Connecting to cluster peer: " + getDetails());
        SocketConnector ioConnector = new SocketConnector();
        SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();

        SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
        scfg.setTcpNoDelay(true);
        scfg.setSendBufferSize(32768);
        scfg.setReceiveBufferSize(32768);
        InetSocketAddress address = new InetSocketAddress(getHost(), getPort());
        return ioConnector.connect(address, _binding);
    }

    //extablish connection without handling redirect
    boolean connect() throws IOException, InterruptedException
    {
        ConnectFuture future = connectImpl();
        // wait for connection to complete
        future.join();
        // we call getSession which throws an IOException if there has been an error connecting
        try
        {
            future.getSession();
        }
        catch (RuntimeIOException e)
        {
            _connectionMonitor.failed(e);
            _logger.error(new LogMessage("Could not connect to {0}: {1}", this, e), e);
            throw e;
        }
        return _connectionMonitor.waitUntilOpen();
    }

    void connectAsynch(Iterable<AMQMethodBody> msgs)
    {
        _replay = msgs;
        connectImpl();
    }

    void replay(Iterable<AMQMethodBody> msgs)
    {
        _replay = msgs;
        if(_connectionMonitor.isOpened())
        {
            replay();
        }
    }

    //establish connection, handling redirect if required...
    Broker connectToCluster() throws IOException, InterruptedException
    {
        connect();
        //wait until the connection is open or get a redirection
        if (_connectionMonitor.waitUntilOpen())
        {
            return this;
        }
        else
        {
            Broker broker = new MinaBrokerProxy(_connectionMonitor.getHost(), _connectionMonitor.getPort(), _local);
            broker.connect();
            return broker;
        }
    }

    public void send(AMQDataBlock data) throws AMQException
    {
        if (_session == null)
        {
            try
            {
                _connectionMonitor.waitUntilOpen();
            }
            catch (Exception e)
            {
                throw new AMQException("Failed to send " + data + ": " + e, e);
            }
        }
        _session.write(data);
    }

    private void replay()
    {
        if(_replay != null)
        {
            for(AMQMethodBody b : _replay)
            {
                _session.write(new AMQFrame(0, b));
            }
        }
    }

    public void handle(int channel, AMQMethodBody method) throws AMQException
    {
        _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel));
        if (!handleResponse(channel, method))
        {
            _logger.warn(new LogMessage("Unhandled method: {0} for channel {1}", method, channel));
        }
    }

    private void handleMethod(int channel, AMQMethodBody method) throws AMQException
    {
        if (method instanceof ConnectionRedirectBody)
        {
            //signal redirection to waiting thread
            ConnectionRedirectBody redirect = (ConnectionRedirectBody) method;
            String[] parts = redirect.host.split(":");
            _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1]));
        }
        else
        {
            _handler.handle(channel, method);
            if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this)
            {
                _handler = this;
                _logger.info(new LogMessage("Connection opened, handler switched"));
                //replay any messages:
                replay();
                //signal waiting thread:
                _connectionMonitor.opened();
            }
        }
    }

    private void handleFrame(AMQFrame frame) throws AMQException
    {
        AMQBody body = frame.bodyFrame;
        if (body instanceof AMQMethodBody)
        {
            handleMethod(frame.channel, (AMQMethodBody) body);
        }
        else
        {
            throw new AMQException("Client only expects method body, got: " + body);
        }
    }

    public String toString()
    {
        return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]";
    }

    private class MinaBinding extends IoHandlerAdapter implements ProtocolVersionList
    {
        public void sessionCreated(IoSession session) throws Exception
        {
            init(session);
            _logger.info(new LogMessage("{0}: created", MinaBrokerProxy.this));
            ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
            session.getFilterChain().addLast("protocolFilter", pcf);
            
            /* Find last protocol version in protocol version list. Make sure last protocol version
            listed in the build file (build-module.xml) is the latest version which will be used
            here. */
            int i = pv.length - 1;
            session.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
        }

        public void sessionOpened(IoSession session) throws Exception
        {
            _logger.info(new LogMessage("{0}: opened", MinaBrokerProxy.this));
        }

        public void sessionClosed(IoSession session) throws Exception
        {
            _logger.info(new LogMessage("{0}: closed", MinaBrokerProxy.this));
        }

        public void exceptionCaught(IoSession session, Throwable throwable) throws Exception
        {
            _logger.error(new LogMessage("{0}: received {1}", MinaBrokerProxy.this, throwable), throwable);
            if (! (throwable instanceof IOException))
            {
                _session.close();
            }
            failed();
        }

        public void messageReceived(IoSession session, Object object) throws Exception
        {
            if (object instanceof AMQFrame)
            {
                handleFrame((AMQFrame) object);
            }
            else
            {
                throw new AMQException("Received message of unrecognised type: " + object);
            }
        }

        public void messageSent(IoSession session, Object object) throws Exception
        {
            _logger.debug(new LogMessage("{0}: sent {1}", MinaBrokerProxy.this, object));
        }
    }
}