diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-01-29 11:28:32 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-01-29 11:28:32 +0000 |
commit | af3fc3b6c71ff90d07f92104e65d5d6cc103840d (patch) | |
tree | 604bbaa687ca86b44fa3da82826cc82bc0d1d547 /qpid/java/common | |
parent | d05c56b43851ce4b8fda1bb51f971ed4870844b6 (diff) | |
download | qpid-python-af3fc3b6c71ff90d07f92104e65d5d6cc103840d.tar.gz |
Ensure selectors are closed, and all work which can be done on a read is done
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655596 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
2 files changed, 63 insertions, 65 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index eaccaa2098..151af64b67 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -21,8 +21,6 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.security.Principal; import java.util.ArrayList; @@ -42,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; @@ -78,6 +75,7 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> private SSLEngineResult _status; private volatile boolean _fullyWritten = true; private AtomicBoolean _stateChanged = new AtomicBoolean(); + private boolean _workDone; public NonBlockingSenderReceiver(final NonBlockingConnection connection, @@ -152,12 +150,13 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> public boolean doWork() { _stateChanged.set(false); - boolean closed = _closed.get(); if (!closed) { try { + _workDone = false; + long currentTime = System.currentTimeMillis(); int tick = _ticker.getTimeToNextTick(currentTime); if (tick <= 0) @@ -170,6 +169,10 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> _fullyWritten = doWrite(); _receiver.setTransportBlockedForWriting(!_fullyWritten); + if(_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0) + { + _stateChanged.set(true); + } } catch (IOException e) { @@ -267,7 +270,6 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> public void close() { LOGGER.debug("Closing " + _remoteSocketAddress); - _closed.set(true); _stateChanged.set(true); _connection.getSelector().wakeup(); @@ -310,11 +312,11 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> else if(_transportEncryption == TransportEncryption.TLS) { int remaining = 0; - do { if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { + _workDone = true; final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); _status = _sslEngine.wrap(bufArray, netBuffer); runSSLEngineTasks(_status); @@ -337,14 +339,12 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> } while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); - ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); long written = _socketChannel.write(encryptedBuffers); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Written " + written + " encrypted bytes"); } - ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); while(iter.hasNext()) { @@ -409,25 +409,27 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer); + LOGGER.debug("Read " + read + " encrypted bytes "); } + _netInputBuffer.flip(); int unwrapped = 0; + boolean tasksRun; do { ByteBuffer appInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); - runSSLEngineTasks(_status); + tasksRun = runSSLEngineTasks(_status); appInputBuffer.flip(); unwrapped = appInputBuffer.remaining(); _receiver.received(appInputBuffer); } - while(unwrapped > 0); + while(unwrapped > 0 || tasksRun); _netInputBuffer.compact(); @@ -476,7 +478,7 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> } } - private void runSSLEngineTasks(final SSLEngineResult status) + private boolean runSSLEngineTasks(final SSLEngineResult status) { if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) { @@ -485,7 +487,9 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> { task.run(); } + return true; } + return false; } private boolean looksLikeSSL(byte[] headerBytes) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java index 7d4d3be083..0ff908568f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java @@ -39,7 +39,7 @@ public class SelectorThread extends Thread private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); - private final Selector _selector; + private Selector _selector; private final AtomicBoolean _closed = new AtomicBoolean(); private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); @@ -62,86 +62,80 @@ public class SelectorThread extends Thread { long nextTimeout = 0; - while(!_closed.get()) - { - - try + try + { + try (Selector selector = Selector.open()) { + _selector = selector; + while (!_closed.get()) + { - _selector.select(nextTimeout); - - List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - + _selector.select(nextTimeout); + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - Set<SelectionKey> selectionKeys = _selector.selectedKeys(); - for (SelectionKey key : selectionKeys) - { - NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - key.channel().register(_selector, 0); + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + for (SelectionKey key : selectionKeys) + { + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - toBeScheduled.add(connection); - _unscheduledConnections.remove(connection); + key.channel().register(_selector, 0); - } - selectionKeys.clear(); + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); - while(_unregisteredConnections.peek() != null) - { - NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); - _unscheduledConnections.add(unregisteredConnection); + } + selectionKeys.clear(); + while (_unregisteredConnections.peek() != null) + { + NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); + _unscheduledConnections.add(unregisteredConnection); - final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); - unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); - } + final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) + | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); + unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); - long currentTime = System.currentTimeMillis(); - Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); - nextTimeout = Integer.MAX_VALUE; - while(iterator.hasNext()) - { - NonBlockingConnection connection = iterator.next(); + } - int period = connection.getTicker().getTimeToNextTick(currentTime); - if (period < 0 || connection.isStateChanged()) + long currentTime = System.currentTimeMillis(); + Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); + nextTimeout = Integer.MAX_VALUE; + while (iterator.hasNext()) { - toBeScheduled.add(connection); - iterator.remove(); + NonBlockingConnection connection = iterator.next(); + + int period = connection.getTicker().getTimeToNextTick(currentTime); + if (period < 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); + iterator.remove(); + } + else + { + nextTimeout = Math.min(period, nextTimeout); + } } - else + + for (NonBlockingConnection connection : toBeScheduled) { - nextTimeout = Math.min(period, nextTimeout); + _scheduler.schedule(connection); } - } - for(NonBlockingConnection connection : toBeScheduled) - { - _scheduler.schedule(connection); } - } - catch (IOException e) - { - // Close ourselves? Inform accepting thread?? - e.printStackTrace(); - } - - } - try - { - _selector.close(); } catch (IOException e) { - // TODO + //TODO e.printStackTrace(); } + } public void addConnection(final NonBlockingConnection connection) |