diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java | 435 |
1 files changed, 435 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java new file mode 100644 index 0000000000..0f2c0d0226 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -0,0 +1,435 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExecutorThreadModel; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.SessionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.NetworkDriverConfiguration; +import org.apache.qpid.transport.OpenException; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver +{ + + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + + ProtocolEngine _protocolEngine; + private boolean _useNIO = false; + private int _processors = 4; + private boolean _executorPool = false; + private SSLContextFactory _sslFactory = null; + private IoConnector _socketConnector; + private IoAcceptor _acceptor; + private IoSession _ioSession; + private ProtocolEngineFactory _factory; + private boolean _protectIO; + private NetworkDriverConfiguration _config; + private Throwable _lastException; + private boolean _acceptingConnections = false; + + private WriteFuture _lastWriteFuture; + + private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class); + + static + { + org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + + //override the MINA defaults to prevent use of the PooledByteBufferAllocator + org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) + { + _useNIO = useNIO; + _processors = processors; + _executorPool = executorPool; + _protectIO = protectIO; + } + + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO, + ProtocolEngine protocolEngine, IoSession session) + { + _useNIO = useNIO; + _processors = processors; + _executorPool = executorPool; + _protectIO = protectIO; + _protocolEngine = protocolEngine; + _ioSession = session; + _ioSession.setAttachment(_protocolEngine); + } + + public MINANetworkDriver() + { + + } + + public MINANetworkDriver(IoConnector ioConnector) + { + _socketConnector = ioConnector; + } + + public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine) + { + _socketConnector = ioConnector; + _protocolEngine = engine; + } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + { + + _factory = factory; + _config = config; + + if (_useNIO) + { + _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors, + new NewThreadExecutor()); + } + else + { + _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); + } + + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)")); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + + if (config != null) + { + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + sc.setSendBufferSize(config.getSendBufferSize()); + sc.setTcpNoDelay(config.getTcpNoDelay()); + } + + if (sslFactory != null) + { + _sslFactory = sslFactory; + } + + if (addresses != null && addresses.length > 0) + { + for (InetAddress addr : addresses) + { + try + { + _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig); + } + catch (IOException e) + { + throw new BindException(String.format("Could not bind to %1s:%2s", addr, port)); + } + } + } + else + { + try + { + _acceptor.bind(new InetSocketAddress(port), this, sconfig); + } + catch (IOException e) + { + throw new BindException(String.format("Could not bind to *:%1s", port)); + } + } + _acceptingConnections = true; + } + + public SocketAddress getRemoteAddress() + { + return _ioSession.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _ioSession.getLocalAddress(); + } + + + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + SSLContextFactory sslFactory) throws OpenException + { + if (sslFactory != null) + { + _sslFactory = sslFactory; + } + + if (_useNIO) + { + _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); + } + else + { + _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking + // connector + } + + SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); + String s = ""; + StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + for(StackTraceElement elt : trace) + { + if(elt.getClassName().contains("Test")) + { + s = elt.getClassName(); + break; + } + } + cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s)); + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); + scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE); + scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE); + + // Don't have the connector's worker thread wait around for other + // connections (we only use + // one SocketConnector per connection at the moment anyway). This allows + // short-running + // clients (like unit tests) to complete quickly. + if (_socketConnector instanceof SocketConnector) + { + ((SocketConnector) _socketConnector).setWorkerTimeout(0); + } + + ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); + future.join(); + if (!future.isConnected()) + { + throw new OpenException("Could not open connection", _lastException); + } + _ioSession = future.getSession(); + _ioSession.setAttachment(engine); + engine.setNetworkDriver(this); + _protocolEngine = engine; + } + + public void setMaxReadIdle(int idleTime) + { + _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime); + } + + public void setMaxWriteIdle(int idleTime) + { + _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime); + } + + public void close() + { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } + if (_acceptor != null) + { + _acceptor.unbindAll(); + } + if (_ioSession != null) + { + _ioSession.close(); + } + } + + public void flush() + { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } + } + + public void send(ByteBuffer msg) + { + org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity()); + minaBuf.put(msg); + minaBuf.flip(); + _lastWriteFuture = _ioSession.write(minaBuf); + } + + public void setIdleTimeout(int i) + { + // MINA doesn't support setting SO_TIMEOUT + } + + public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception + { + if (_protocolEngine != null) + { + _protocolEngine.exception(throwable); + } + else + { + _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable); + } + _lastException = throwable; + } + + /** + * Invoked when a message is received on a particular protocol session. Note + * that a protocol session is directly tied to a particular physical + * connection. + * + * @param protocolSession + * the protocol session that received the message + * @param message + * the message itself (i.e. a decoded frame) + * + * @throws Exception + * if the message cannot be processed + */ + public void messageReceived(IoSession protocolSession, Object message) throws Exception + { + if (message instanceof org.apache.mina.common.ByteBuffer) + { + ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf()); + } + else + { + throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); + } + } + + public void sessionClosed(IoSession protocolSession) throws Exception + { + ((ProtocolEngine) protocolSession.getAttachment()).closed(); + } + + public void sessionCreated(IoSession protocolSession) throws Exception + { + // Configure the session with SSL if necessary + SessionUtil.initialize(protocolSession); + if (_executorPool) + { + if (_sslFactory != null) + { + protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + } + else + { + if (_sslFactory != null) + { + protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + } + // Do we want to have read/write buffer limits? + if (_protectIO) + { + //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); + + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); + writefilter.attach(chain); + + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + } + + if (_ioSession == null) + { + _ioSession = protocolSession; + } + + if (_acceptingConnections) + { + // Set up the protocol engine + ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); + MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); + protocolEngine.setNetworkDriver(newDriver); + } + } + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + if (IdleStatus.WRITER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).writerIdle(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).readerIdle(); + } + } + + private ProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + + public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) + { + _factory = engineFactory; + _acceptingConnections = acceptingConnections; + } + + public void setProtocolEngine(ProtocolEngine protocolEngine) + { + _protocolEngine = protocolEngine; + if (_ioSession != null) + { + _ioSession.setAttachment(protocolEngine); + } + } + +} |