summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-29 11:28:32 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-29 11:28:32 +0000
commitaf3fc3b6c71ff90d07f92104e65d5d6cc103840d (patch)
tree604bbaa687ca86b44fa3da82826cc82bc0d1d547 /qpid/java/common
parentd05c56b43851ce4b8fda1bb51f971ed4870844b6 (diff)
downloadqpid-python-af3fc3b6c71ff90d07f92104e65d5d6cc103840d.tar.gz
Ensure selectors are closed, and all work which can be done on a read is done
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655596 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java28
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java100
2 files changed, 63 insertions, 65 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index eaccaa2098..151af64b67 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -21,8 +21,6 @@ package org.apache.qpid.transport.network.io;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.util.ArrayList;
@@ -42,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
@@ -78,6 +75,7 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
private SSLEngineResult _status;
private volatile boolean _fullyWritten = true;
private AtomicBoolean _stateChanged = new AtomicBoolean();
+ private boolean _workDone;
public NonBlockingSenderReceiver(final NonBlockingConnection connection,
@@ -152,12 +150,13 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
public boolean doWork()
{
_stateChanged.set(false);
-
boolean closed = _closed.get();
if (!closed)
{
try
{
+ _workDone = false;
+
long currentTime = System.currentTimeMillis();
int tick = _ticker.getTimeToNextTick(currentTime);
if (tick <= 0)
@@ -170,6 +169,10 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
_fullyWritten = doWrite();
_receiver.setTransportBlockedForWriting(!_fullyWritten);
+ if(_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)
+ {
+ _stateChanged.set(true);
+ }
}
catch (IOException e)
{
@@ -267,7 +270,6 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
public void close()
{
LOGGER.debug("Closing " + _remoteSocketAddress);
-
_closed.set(true);
_stateChanged.set(true);
_connection.getSelector().wakeup();
@@ -310,11 +312,11 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
else if(_transportEncryption == TransportEncryption.TLS)
{
int remaining = 0;
-
do
{
if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
{
+ _workDone = true;
final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
_status = _sslEngine.wrap(bufArray, netBuffer);
runSSLEngineTasks(_status);
@@ -337,14 +339,12 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
}
while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
-
ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
long written = _socketChannel.write(encryptedBuffers);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Written " + written + " encrypted bytes");
}
-
ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
while(iter.hasNext())
{
@@ -409,25 +409,27 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+ LOGGER.debug("Read " + read + " encrypted bytes ");
}
+
_netInputBuffer.flip();
int unwrapped = 0;
+ boolean tasksRun;
do
{
ByteBuffer appInputBuffer =
ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
_status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
- runSSLEngineTasks(_status);
+ tasksRun = runSSLEngineTasks(_status);
appInputBuffer.flip();
unwrapped = appInputBuffer.remaining();
_receiver.received(appInputBuffer);
}
- while(unwrapped > 0);
+ while(unwrapped > 0 || tasksRun);
_netInputBuffer.compact();
@@ -476,7 +478,7 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
}
}
- private void runSSLEngineTasks(final SSLEngineResult status)
+ private boolean runSSLEngineTasks(final SSLEngineResult status)
{
if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
{
@@ -485,7 +487,9 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
{
task.run();
}
+ return true;
}
+ return false;
}
private boolean looksLikeSSL(byte[] headerBytes)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
index 7d4d3be083..0ff908568f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
@@ -39,7 +39,7 @@ public class SelectorThread extends Thread
private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
- private final Selector _selector;
+ private Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
@@ -62,86 +62,80 @@ public class SelectorThread extends Thread
{
long nextTimeout = 0;
- while(!_closed.get())
- {
-
- try
+ try
+ {
+ try (Selector selector = Selector.open())
{
+ _selector = selector;
+ while (!_closed.get())
+ {
- _selector.select(nextTimeout);
-
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
+ _selector.select(nextTimeout);
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- for (SelectionKey key : selectionKeys)
- {
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
- key.channel().register(_selector, 0);
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
+ key.channel().register(_selector, 0);
- }
- selectionKeys.clear();
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
- while(_unregisteredConnections.peek() != null)
- {
- NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
- _unscheduledConnections.add(unregisteredConnection);
+ }
+ selectionKeys.clear();
+ while (_unregisteredConnections.peek() != null)
+ {
+ NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
- final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
- unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
- }
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
- long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
- nextTimeout = Integer.MAX_VALUE;
- while(iterator.hasNext())
- {
- NonBlockingConnection connection = iterator.next();
+ }
- int period = connection.getTicker().getTimeToNextTick(currentTime);
- if (period < 0 || connection.isStateChanged())
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+ nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
{
- toBeScheduled.add(connection);
- iterator.remove();
+ NonBlockingConnection connection = iterator.next();
+
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
+ if (period < 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ iterator.remove();
+ }
+ else
+ {
+ nextTimeout = Math.min(period, nextTimeout);
+ }
}
- else
+
+ for (NonBlockingConnection connection : toBeScheduled)
{
- nextTimeout = Math.min(period, nextTimeout);
+ _scheduler.schedule(connection);
}
- }
- for(NonBlockingConnection connection : toBeScheduled)
- {
- _scheduler.schedule(connection);
}
-
}
- catch (IOException e)
- {
- // Close ourselves? Inform accepting thread??
- e.printStackTrace();
- }
-
- }
- try
- {
- _selector.close();
}
catch (IOException e)
{
- // TODO
+ //TODO
e.printStackTrace();
}
+
}
public void addConnection(final NonBlockingConnection connection)