diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-12-12 13:28:28 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-12-12 13:28:28 +0000 |
commit | 18e69f08d213313070f69074c699d0f70ba267cf (patch) | |
tree | 41053d2f3b8eb1ee398f334348b0e9db9e6cf30a /qpid/java/common | |
parent | d9514d45f11c92aef06b8b880e291f76fdbff2a2 (diff) | |
download | qpid-python-18e69f08d213313070f69074c699d0f70ba267cf.tar.gz |
Notify engine when transport is blocked
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644870 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
3 files changed, 11 insertions, 11 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java index cfa4f48c19..92cea345ca 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java @@ -31,7 +31,7 @@ import javax.net.ssl.SSLContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.transport.Receiver; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; @@ -50,7 +50,7 @@ public class NonBlockingConnection implements NetworkConnection private final Object _lock = new Object(); public NonBlockingConnection(SocketChannel socket, - Receiver<ByteBuffer> delegate, + ServerProtocolEngine delegate, int sendBufferSize, int receiveBufferSize, long timeout, diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java index 372e1bc5fd..80ba7a0221 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; @@ -34,10 +33,9 @@ import javax.net.ssl.SSLContext; import org.slf4j.LoggerFactory; import org.apache.qpid.configuration.CommonProperties; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkConnection; @@ -54,7 +52,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport private AcceptingThread _acceptor; protected NonBlockingConnection createNetworkConnection(final SocketChannel socket, - final Receiver<ByteBuffer> engine, + final ServerProtocolEngine engine, final Integer sendBufferSize, final Integer receiveBufferSize, final int timeout, @@ -166,7 +164,8 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport { socket = _serverSocket.accept(); - final ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); if(engine != null) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index c056fc0015..8f96baff94 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -41,8 +41,8 @@ import javax.net.ssl.SSLPeerUnverifiedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; @@ -63,7 +63,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private final Thread _ioThread; private final String _remoteSocketAddress; private final AtomicBoolean _closed = new AtomicBoolean(false); - private final Receiver<ByteBuffer> _receiver; + private final ServerProtocolEngine _receiver; private final int _receiveBufSize; private final Ticker _ticker; private final Set<TransportEncryption> _encryptionSet; @@ -79,7 +79,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> public NonBlockingSenderReceiver(final SocketChannel socketChannel, - Receiver<ByteBuffer> receiver, + ServerProtocolEngine receiver, int receiveBufSize, Ticker ticker, final Set<TransportEncryption> encryptionSet, @@ -202,9 +202,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> LOGGER.debug("Number Ready " + numberReady); - doWrite(); + _receiver.setTransportBlockedForWriting(!doWrite()); doRead(); boolean fullyWritten = doWrite(); + _receiver.setTransportBlockedForWriting(!fullyWritten); _socketChannel.register(_selector, fullyWritten |