diff options
author | Keith Wall <kwall@apache.org> | 2015-01-28 17:08:03 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-01-28 17:08:03 +0000 |
commit | 0900a59cebb72b3446ce8eb1b06e36935cc815ac (patch) | |
tree | a045b623dfa5f5b575c38ed96603210ded575d26 /qpid/java/common | |
parent | e1eb5a92b2765ba9ad7178d2a1c8564d44f6d608 (diff) | |
download | qpid-python-0900a59cebb72b3446ce8eb1b06e36935cc815ac.tar.gz |
WIP: First phase of moving towards threadpool for IO threads. Introduced SelectorThread and scheduler that currently runs the IO activity in the same thread. SSLTests failing
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
4 files changed, 332 insertions, 59 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java index 92cea345ca..68670d1a9d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java @@ -40,16 +40,19 @@ import org.apache.qpid.transport.network.TransportEncryption; public class NonBlockingConnection implements NetworkConnection { private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); - private final SocketChannel _socket; + private final SocketChannel _socketChannel; private final long _timeout; private final NonBlockingSenderReceiver _nonBlockingSenderReceiver; + private final Ticker _ticker; + private final SelectorThread _selector; private int _maxReadIdle; private int _maxWriteIdle; private Principal _principal; private boolean _principalChecked; private final Object _lock = new Object(); + private boolean _stateChanged; - public NonBlockingConnection(SocketChannel socket, + public NonBlockingConnection(SocketChannel socketChannel, ServerProtocolEngine delegate, int sendBufferSize, int receiveBufferSize, @@ -59,18 +62,30 @@ public class NonBlockingConnection implements NetworkConnection final SSLContext sslContext, final boolean wantClientAuth, final boolean needClientAuth, - final Runnable onTransportEncryptionAction) + final Runnable onTransportEncryptionAction, final SelectorThread selectorThread) { - _socket = socket; + _socketChannel = socketChannel; _timeout = timeout; + _ticker = ticker; + _selector = selectorThread; - _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); + _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this, + delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); } + public Ticker getTicker() + { + return _ticker; + } + + public SocketChannel getSocketChannel() + { + return _socketChannel; + } + public void start() { - _nonBlockingSenderReceiver.initiate(); } public Sender<ByteBuffer> getSender() @@ -85,12 +100,12 @@ public class NonBlockingConnection implements NetworkConnection public SocketAddress getRemoteAddress() { - return _socket.socket().getRemoteSocketAddress(); + return _socketChannel.socket().getRemoteSocketAddress(); } public SocketAddress getLocalAddress() { - return _socket.socket().getLocalSocketAddress(); + return _socketChannel.socket().getLocalSocketAddress(); } public void setMaxWriteIdle(int sec) @@ -131,4 +146,30 @@ public class NonBlockingConnection implements NetworkConnection { return _maxWriteIdle; } + + public boolean canRead() + { + return _nonBlockingSenderReceiver.canRead(); + } + + public boolean waitingForWrite() + { + return _nonBlockingSenderReceiver.waitingForWrite(); + } + + public boolean isStateChanged() + { + + return _nonBlockingSenderReceiver.isStateChanged(); + } + + public boolean doWork() + { + return _nonBlockingSenderReceiver.doWork(); + } + + public SelectorThread getSelector() + { + return _selector; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java index 80ba7a0221..0b06a95717 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -38,7 +38,6 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportEncryption; public class NonBlockingNetworkTransport implements IncomingNetworkTransport @@ -50,8 +49,9 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); private AcceptingThread _acceptor; + private SelectorThread _selector = new SelectorThread(); - protected NonBlockingConnection createNetworkConnection(final SocketChannel socket, + protected NonBlockingConnection createNetworkConnection(final SocketChannel socketChannel, final ServerProtocolEngine engine, final Integer sendBufferSize, final Integer receiveBufferSize, @@ -63,7 +63,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport final boolean needClientAuth, final Runnable onTransportEncryptionAction) { - return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); + return new NonBlockingConnection(socketChannel, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction, _selector); } public void close() @@ -84,11 +84,16 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet); _acceptor.setDaemon(false); _acceptor.start(); + + + _selector.start(); } catch (IOException e) { throw new TransportException("Failed to start AMQP on port : " + config, e); } + + } public int getAcceptingPort() @@ -159,30 +164,31 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport { while (!_closed) { - SocketChannel socket = null; + SocketChannel socketChannel = null; try { - socket = _serverSocket.accept(); + socketChannel = _serverSocket.accept(); final ServerProtocolEngine engine = - (ServerProtocolEngine) _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); if(engine != null) { - socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); final Integer sendBufferSize = _config.getSendBufferSize(); final Integer receiveBufferSize = _config.getReceiveBufferSize(); - socket.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - socket.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - NetworkConnection connection = - createNetworkConnection(socket, + NonBlockingConnection connection = + createNetworkConnection(socketChannel, engine, sendBufferSize, receiveBufferSize, @@ -208,23 +214,26 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport ticker.setConnection(connection); connection.start(); + + _selector.addConnection(connection); + } else { - socket.close(); + socketChannel.close(); } } catch(RuntimeException e) { LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socket.socket()); + closeSocketIfNecessary(socketChannel.socket()); } catch(IOException e) { if(!_closed) { LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socket.socket()); + closeSocketIfNecessary(socketChannel.socket()); try { //Delay to avoid tight spinning the loop during issues such as too many open files @@ -265,4 +274,5 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport } } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index acc2b89881..eaccaa2098 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -50,18 +50,16 @@ import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportEncryption; import org.apache.qpid.transport.network.security.ssl.SSLUtil; -public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> +public class NonBlockingSenderReceiver implements Sender<ByteBuffer> { private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; private final SocketChannel _socketChannel; - private final Selector _selector; private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); - private final Thread _ioThread; private final String _remoteSocketAddress; private final AtomicBoolean _closed = new AtomicBoolean(false); private final ServerProtocolEngine _receiver; @@ -70,6 +68,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private final Set<TransportEncryption> _encryptionSet; private final SSLContext _sslContext; private final Runnable _onTransportEncryptionAction; + private final NonBlockingConnection _connection; private ByteBuffer _netInputBuffer; private SSLEngine _sslEngine; @@ -77,9 +76,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private TransportEncryption _transportEncryption; private SSLEngineResult _status; + private volatile boolean _fullyWritten = true; + private AtomicBoolean _stateChanged = new AtomicBoolean(); - public NonBlockingSenderReceiver(final SocketChannel socketChannel, + public NonBlockingSenderReceiver(final NonBlockingConnection connection, ServerProtocolEngine receiver, int receiveBufSize, Ticker ticker, @@ -89,7 +90,8 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> final boolean needClientAuth, final Runnable onTransportEncryptionAction) { - _socketChannel = socketChannel; + _connection = connection; + _socketChannel = connection.getSocketChannel(); _receiver = receiver; _receiveBufSize = receiveBufSize; _ticker = ticker; @@ -124,33 +126,14 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> try { - _remoteSocketAddress = socketChannel.getRemoteAddress().toString(); + _remoteSocketAddress = _socketChannel.getRemoteAddress().toString(); _socketChannel.configureBlocking(false); - _selector = Selector.open(); - _socketChannel.register(_selector, SelectionKey.OP_READ); } catch (IOException e) { throw new SenderException("Unable to prepare the channel for non-blocking IO", e); } - try - { - //Create but deliberately don't start the thread. - _ioThread = Threading.getThreadFactory().createThread(this); - } - catch(Exception e) - { - throw new SenderException("Error creating NonBlockingSenderReceiver thread for " + _remoteSocketAddress, e); - } - - _ioThread.setDaemon(true); - _ioThread.setName(String.format("NonBlockingSenderReceiver-%s", _remoteSocketAddress)); - - } - public void initiate() - { - _ioThread.start(); } @Override @@ -162,11 +145,62 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } // append to list and do selector wakeup _buffers.add(msg); - _selector.wakeup(); + _stateChanged.set(true); } - @Override - public void run() + + public boolean doWork() + { + _stateChanged.set(false); + + boolean closed = _closed.get(); + if (!closed) + { + try + { + long currentTime = System.currentTimeMillis(); + int tick = _ticker.getTimeToNextTick(currentTime); + if (tick <= 0) + { + _ticker.tick(currentTime); + } + + _receiver.setTransportBlockedForWriting(!doWrite()); + doRead(); + _fullyWritten = doWrite(); + _receiver.setTransportBlockedForWriting(!_fullyWritten); + + } + catch (IOException e) + { + LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); + close(); + } + } + else + { + try + { + while(!doWrite()) + { + } + } + catch (IOException e) + { + LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); + + } + + _receiver.closed(); + + } + + return closed; + + } + + +/* public void run() { LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started"); @@ -189,11 +223,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> _receiver.setTransportBlockedForWriting(!doWrite()); doRead(); - boolean fullyWritten = doWrite(); - _receiver.setTransportBlockedForWriting(!fullyWritten); + _fullyWritten = doWrite(); + _receiver.setTransportBlockedForWriting(!_fullyWritten); _socketChannel.register(_selector, - fullyWritten + _fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); @@ -221,22 +255,22 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> { LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress); } - } + }*/ @Override public void flush() { - _selector.wakeup(); + _connection.getSelector().wakeup(); } @Override public void close() { - LOGGER.debug("Closing " + _remoteSocketAddress); + LOGGER.debug("Closing " + _remoteSocketAddress); _closed.set(true); - _selector.wakeup(); - + _stateChanged.set(true); + _connection.getSelector().wakeup(); } private boolean doWrite() throws IOException @@ -497,4 +531,20 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> return null; } + + public boolean canRead() + { + return true; + } + + public boolean waitingForWrite() + { + return !_fullyWritten; + } + + public boolean isStateChanged() + { + return _stateChanged.get(); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java new file mode 100644 index 0000000000..65707eb4c2 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** +* Created by keith on 28/01/2015. +*/ +public class SelectorThread extends Thread +{ + + private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); + private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); + private final Selector _selector; + private final AtomicBoolean _closed = new AtomicBoolean(); + private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); + + SelectorThread() + { + + + try + { + _selector = Selector.open(); + } + catch (IOException e) + { + throw new RuntimeException("Failed to create selector"); + } + } + + @Override + public void run() + { + + long nextTimeout = 0; + while(!_closed.get()) + { + + + try + { + + _selector.select(nextTimeout); + + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); + + + + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + for (SelectionKey key : selectionKeys) + { + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + + key.channel().register(_selector, 0); + + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); + + } + selectionKeys.clear(); + + while(_unregisteredConnections.peek() != null) + { + NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); + _unscheduledConnections.add(unregisteredConnection); + + + final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); + unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + + } + + long currentTime = System.currentTimeMillis(); + Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); + nextTimeout = Integer.MAX_VALUE; + while(iterator.hasNext()) + { + NonBlockingConnection connection = iterator.next(); + + int period = connection.getTicker().getTimeToNextTick(currentTime); + if (period < 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); + iterator.remove(); + } + else + { + nextTimeout = Math.min(period, nextTimeout); + } + } + + for(NonBlockingConnection connection : toBeScheduled) + { + _scheduler.schedule(connection); + } + + } + catch (IOException e) + { + // Close ourselves? Inform accepting thread?? + e.printStackTrace(); + } + + } + + + } + + public void addConnection(final NonBlockingConnection connection) + { + _unregisteredConnections.add(connection); + _selector.wakeup(); + + } + + public void wakeup() + { + _selector.wakeup(); + } + + private class NetworkConnectionScheduler + { + public void schedule(final NonBlockingConnection connection) + { + + + boolean closed = connection.doWork(); + + if (!closed) + { + if (connection.isStateChanged()) + { + schedule(connection); + } + else + { + SelectorThread.this.addConnection(connection); + } + } + } + } + +} |