summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-01-28 17:08:03 +0000
committerKeith Wall <kwall@apache.org>2015-01-28 17:08:03 +0000
commit0900a59cebb72b3446ce8eb1b06e36935cc815ac (patch)
treea045b623dfa5f5b575c38ed96603210ded575d26 /qpid/java/common
parente1eb5a92b2765ba9ad7178d2a1c8564d44f6d608 (diff)
downloadqpid-python-0900a59cebb72b3446ce8eb1b06e36935cc815ac.tar.gz
WIP: First phase of moving towards threadpool for IO threads. Introduced SelectorThread and scheduler that currently runs the IO activity in the same thread. SSLTests failing
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java57
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java122
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java172
4 files changed, 332 insertions, 59 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index 92cea345ca..68670d1a9d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -40,16 +40,19 @@ import org.apache.qpid.transport.network.TransportEncryption;
public class NonBlockingConnection implements NetworkConnection
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
- private final SocketChannel _socket;
+ private final SocketChannel _socketChannel;
private final long _timeout;
private final NonBlockingSenderReceiver _nonBlockingSenderReceiver;
+ private final Ticker _ticker;
+ private final SelectorThread _selector;
private int _maxReadIdle;
private int _maxWriteIdle;
private Principal _principal;
private boolean _principalChecked;
private final Object _lock = new Object();
+ private boolean _stateChanged;
- public NonBlockingConnection(SocketChannel socket,
+ public NonBlockingConnection(SocketChannel socketChannel,
ServerProtocolEngine delegate,
int sendBufferSize,
int receiveBufferSize,
@@ -59,18 +62,30 @@ public class NonBlockingConnection implements NetworkConnection
final SSLContext sslContext,
final boolean wantClientAuth,
final boolean needClientAuth,
- final Runnable onTransportEncryptionAction)
+ final Runnable onTransportEncryptionAction, final SelectorThread selectorThread)
{
- _socket = socket;
+ _socketChannel = socketChannel;
_timeout = timeout;
+ _ticker = ticker;
+ _selector = selectorThread;
- _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
+ _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this,
+ delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
}
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public SocketChannel getSocketChannel()
+ {
+ return _socketChannel;
+ }
+
public void start()
{
- _nonBlockingSenderReceiver.initiate();
}
public Sender<ByteBuffer> getSender()
@@ -85,12 +100,12 @@ public class NonBlockingConnection implements NetworkConnection
public SocketAddress getRemoteAddress()
{
- return _socket.socket().getRemoteSocketAddress();
+ return _socketChannel.socket().getRemoteSocketAddress();
}
public SocketAddress getLocalAddress()
{
- return _socket.socket().getLocalSocketAddress();
+ return _socketChannel.socket().getLocalSocketAddress();
}
public void setMaxWriteIdle(int sec)
@@ -131,4 +146,30 @@ public class NonBlockingConnection implements NetworkConnection
{
return _maxWriteIdle;
}
+
+ public boolean canRead()
+ {
+ return _nonBlockingSenderReceiver.canRead();
+ }
+
+ public boolean waitingForWrite()
+ {
+ return _nonBlockingSenderReceiver.waitingForWrite();
+ }
+
+ public boolean isStateChanged()
+ {
+
+ return _nonBlockingSenderReceiver.isStateChanged();
+ }
+
+ public boolean doWork()
+ {
+ return _nonBlockingSenderReceiver.doWork();
+ }
+
+ public SelectorThread getSelector()
+ {
+ return _selector;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index 80ba7a0221..0b06a95717 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -38,7 +38,6 @@ import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportEncryption;
public class NonBlockingNetworkTransport implements IncomingNetworkTransport
@@ -50,8 +49,9 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private AcceptingThread _acceptor;
+ private SelectorThread _selector = new SelectorThread();
- protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
+ protected NonBlockingConnection createNetworkConnection(final SocketChannel socketChannel,
final ServerProtocolEngine engine,
final Integer sendBufferSize,
final Integer receiveBufferSize,
@@ -63,7 +63,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
final boolean needClientAuth,
final Runnable onTransportEncryptionAction)
{
- return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
+ return new NonBlockingConnection(socketChannel, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction, _selector);
}
public void close()
@@ -84,11 +84,16 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
_acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet);
_acceptor.setDaemon(false);
_acceptor.start();
+
+
+ _selector.start();
}
catch (IOException e)
{
throw new TransportException("Failed to start AMQP on port : " + config, e);
}
+
+
}
public int getAcceptingPort()
@@ -159,30 +164,31 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
{
while (!_closed)
{
- SocketChannel socket = null;
+ SocketChannel socketChannel = null;
try
{
- socket = _serverSocket.accept();
+ socketChannel = _serverSocket.accept();
final ServerProtocolEngine engine =
- (ServerProtocolEngine) _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
+ (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+ .getRemoteSocketAddress());
if(engine != null)
{
- socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
final Integer sendBufferSize = _config.getSendBufferSize();
final Integer receiveBufferSize = _config.getReceiveBufferSize();
- socket.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
- socket.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
- NetworkConnection connection =
- createNetworkConnection(socket,
+ NonBlockingConnection connection =
+ createNetworkConnection(socketChannel,
engine,
sendBufferSize,
receiveBufferSize,
@@ -208,23 +214,26 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
ticker.setConnection(connection);
connection.start();
+
+ _selector.addConnection(connection);
+
}
else
{
- socket.close();
+ socketChannel.close();
}
}
catch(RuntimeException e)
{
LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket.socket());
+ closeSocketIfNecessary(socketChannel.socket());
}
catch(IOException e)
{
if(!_closed)
{
LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket.socket());
+ closeSocketIfNecessary(socketChannel.socket());
try
{
//Delay to avoid tight spinning the loop during issues such as too many open files
@@ -265,4 +274,5 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
}
}
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index acc2b89881..eaccaa2098 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -50,18 +50,16 @@ import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
+public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
private final SocketChannel _socketChannel;
- private final Selector _selector;
private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
- private final Thread _ioThread;
private final String _remoteSocketAddress;
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ServerProtocolEngine _receiver;
@@ -70,6 +68,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
private final Set<TransportEncryption> _encryptionSet;
private final SSLContext _sslContext;
private final Runnable _onTransportEncryptionAction;
+ private final NonBlockingConnection _connection;
private ByteBuffer _netInputBuffer;
private SSLEngine _sslEngine;
@@ -77,9 +76,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
private TransportEncryption _transportEncryption;
private SSLEngineResult _status;
+ private volatile boolean _fullyWritten = true;
+ private AtomicBoolean _stateChanged = new AtomicBoolean();
- public NonBlockingSenderReceiver(final SocketChannel socketChannel,
+ public NonBlockingSenderReceiver(final NonBlockingConnection connection,
ServerProtocolEngine receiver,
int receiveBufSize,
Ticker ticker,
@@ -89,7 +90,8 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
final boolean needClientAuth,
final Runnable onTransportEncryptionAction)
{
- _socketChannel = socketChannel;
+ _connection = connection;
+ _socketChannel = connection.getSocketChannel();
_receiver = receiver;
_receiveBufSize = receiveBufSize;
_ticker = ticker;
@@ -124,33 +126,14 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
try
{
- _remoteSocketAddress = socketChannel.getRemoteAddress().toString();
+ _remoteSocketAddress = _socketChannel.getRemoteAddress().toString();
_socketChannel.configureBlocking(false);
- _selector = Selector.open();
- _socketChannel.register(_selector, SelectionKey.OP_READ);
}
catch (IOException e)
{
throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
}
- try
- {
- //Create but deliberately don't start the thread.
- _ioThread = Threading.getThreadFactory().createThread(this);
- }
- catch(Exception e)
- {
- throw new SenderException("Error creating NonBlockingSenderReceiver thread for " + _remoteSocketAddress, e);
- }
-
- _ioThread.setDaemon(true);
- _ioThread.setName(String.format("NonBlockingSenderReceiver-%s", _remoteSocketAddress));
-
- }
- public void initiate()
- {
- _ioThread.start();
}
@Override
@@ -162,11 +145,62 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
}
// append to list and do selector wakeup
_buffers.add(msg);
- _selector.wakeup();
+ _stateChanged.set(true);
}
- @Override
- public void run()
+
+ public boolean doWork()
+ {
+ _stateChanged.set(false);
+
+ boolean closed = _closed.get();
+ if (!closed)
+ {
+ try
+ {
+ long currentTime = System.currentTimeMillis();
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if (tick <= 0)
+ {
+ _ticker.tick(currentTime);
+ }
+
+ _receiver.setTransportBlockedForWriting(!doWrite());
+ doRead();
+ _fullyWritten = doWrite();
+ _receiver.setTransportBlockedForWriting(!_fullyWritten);
+
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
+ close();
+ }
+ }
+ else
+ {
+ try
+ {
+ while(!doWrite())
+ {
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+
+ }
+
+ _receiver.closed();
+
+ }
+
+ return closed;
+
+ }
+
+
+/* public void run()
{
LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started");
@@ -189,11 +223,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
_receiver.setTransportBlockedForWriting(!doWrite());
doRead();
- boolean fullyWritten = doWrite();
- _receiver.setTransportBlockedForWriting(!fullyWritten);
+ _fullyWritten = doWrite();
+ _receiver.setTransportBlockedForWriting(!_fullyWritten);
_socketChannel.register(_selector,
- fullyWritten
+ _fullyWritten
? SelectionKey.OP_READ
: (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
@@ -221,22 +255,22 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress);
}
- }
+ }*/
@Override
public void flush()
{
- _selector.wakeup();
+ _connection.getSelector().wakeup();
}
@Override
public void close()
{
- LOGGER.debug("Closing " + _remoteSocketAddress);
+ LOGGER.debug("Closing " + _remoteSocketAddress);
_closed.set(true);
- _selector.wakeup();
-
+ _stateChanged.set(true);
+ _connection.getSelector().wakeup();
}
private boolean doWrite() throws IOException
@@ -497,4 +531,20 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
return null;
}
+
+ public boolean canRead()
+ {
+ return true;
+ }
+
+ public boolean waitingForWrite()
+ {
+ return !_fullyWritten;
+ }
+
+ public boolean isStateChanged()
+ {
+ return _stateChanged.get();
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
new file mode 100644
index 0000000000..65707eb4c2
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+* Created by keith on 28/01/2015.
+*/
+public class SelectorThread extends Thread
+{
+
+ private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+ private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+ private final Selector _selector;
+ private final AtomicBoolean _closed = new AtomicBoolean();
+ private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
+
+ SelectorThread()
+ {
+
+
+ try
+ {
+ _selector = Selector.open();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Failed to create selector");
+ }
+ }
+
+ @Override
+ public void run()
+ {
+
+ long nextTimeout = 0;
+ while(!_closed.get())
+ {
+
+
+ try
+ {
+
+ _selector.select(nextTimeout);
+
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+
+
+
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+
+ key.channel().register(_selector, 0);
+
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
+
+ }
+ selectionKeys.clear();
+
+ while(_unregisteredConnections.peek() != null)
+ {
+ NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
+
+
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+
+ }
+
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+ nextTimeout = Integer.MAX_VALUE;
+ while(iterator.hasNext())
+ {
+ NonBlockingConnection connection = iterator.next();
+
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
+ if (period < 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ iterator.remove();
+ }
+ else
+ {
+ nextTimeout = Math.min(period, nextTimeout);
+ }
+ }
+
+ for(NonBlockingConnection connection : toBeScheduled)
+ {
+ _scheduler.schedule(connection);
+ }
+
+ }
+ catch (IOException e)
+ {
+ // Close ourselves? Inform accepting thread??
+ e.printStackTrace();
+ }
+
+ }
+
+
+ }
+
+ public void addConnection(final NonBlockingConnection connection)
+ {
+ _unregisteredConnections.add(connection);
+ _selector.wakeup();
+
+ }
+
+ public void wakeup()
+ {
+ _selector.wakeup();
+ }
+
+ private class NetworkConnectionScheduler
+ {
+ public void schedule(final NonBlockingConnection connection)
+ {
+
+
+ boolean closed = connection.doWork();
+
+ if (!closed)
+ {
+ if (connection.isStateChanged())
+ {
+ schedule(connection);
+ }
+ else
+ {
+ SelectorThread.this.addConnection(connection);
+ }
+ }
+ }
+ }
+
+}