summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-12-12 13:28:28 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-12-12 13:28:28 +0000
commit18e69f08d213313070f69074c699d0f70ba267cf (patch)
tree41053d2f3b8eb1ee398f334348b0e9db9e6cf30a /qpid/java/common
parentd9514d45f11c92aef06b8b880e291f76fdbff2a2 (diff)
downloadqpid-python-18e69f08d213313070f69074c699d0f70ba267cf.tar.gz
Notify engine when transport is blocked
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644870 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java9
3 files changed, 11 insertions, 11 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index cfa4f48c19..92cea345ca 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -31,7 +31,7 @@ import javax.net.ssl.SSLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
@@ -50,7 +50,7 @@ public class NonBlockingConnection implements NetworkConnection
private final Object _lock = new Object();
public NonBlockingConnection(SocketChannel socket,
- Receiver<ByteBuffer> delegate,
+ ServerProtocolEngine delegate,
int sendBufferSize,
int receiveBufferSize,
long timeout,
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index 372e1bc5fd..80ba7a0221 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
@@ -34,10 +33,9 @@ import javax.net.ssl.SSLContext;
import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -54,7 +52,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
private AcceptingThread _acceptor;
protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
- final Receiver<ByteBuffer> engine,
+ final ServerProtocolEngine engine,
final Integer sendBufferSize,
final Integer receiveBufferSize,
final int timeout,
@@ -166,7 +164,8 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
{
socket = _serverSocket.accept();
- final ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
if(engine != null)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index c056fc0015..8f96baff94 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -41,8 +41,8 @@ import javax.net.ssl.SSLPeerUnverifiedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
@@ -63,7 +63,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
private final Thread _ioThread;
private final String _remoteSocketAddress;
private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final Receiver<ByteBuffer> _receiver;
+ private final ServerProtocolEngine _receiver;
private final int _receiveBufSize;
private final Ticker _ticker;
private final Set<TransportEncryption> _encryptionSet;
@@ -79,7 +79,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
public NonBlockingSenderReceiver(final SocketChannel socketChannel,
- Receiver<ByteBuffer> receiver,
+ ServerProtocolEngine receiver,
int receiveBufSize,
Ticker ticker,
final Set<TransportEncryption> encryptionSet,
@@ -202,9 +202,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
LOGGER.debug("Number Ready " + numberReady);
- doWrite();
+ _receiver.setTransportBlockedForWriting(!doWrite());
doRead();
boolean fullyWritten = doWrite();
+ _receiver.setTransportBlockedForWriting(!fullyWritten);
_socketChannel.register(_selector,
fullyWritten