diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-14 20:55:03 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-14 20:55:03 +0000 |
commit | 72a2924a6d8a74528450c1b659ed58d74b1b9f53 (patch) | |
tree | 976609d5121b84ee2c6a469a87d00dad5cee876f | |
parent | e9dee27706bc63d96dfa68803d985f300d4becb8 (diff) | |
download | qpid-python-72a2924a6d8a74528450c1b659ed58d74b1b9f53.tar.gz |
QPID-4925 : [Java Broker] Timeout connections which do not complete connection handshake
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1493240 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 42 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties index 8559862a45..a99bcc7352 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties @@ -22,4 +22,5 @@ # 1 - Protocol Version # 2 - Client Version OPEN = CON-1001 : Open[ : Client ID : {0}][ : Protocol Version : {1}][ : Client Version : {2}] -CLOSE = CON-1002 : Close
\ No newline at end of file +CLOSE = CON-1002 : Close +IDLE_CLOSE = CON-1003 : Closed due to inactivity diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 3274cffc08..bbf90fad86 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -790,6 +790,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _network.setMaxWriteIdle(delay); _network.setMaxReadIdle(BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay); } + else + { + _network.setMaxWriteIdle(0); + _network.setMaxReadIdle(0); + } } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index c0764272a4..3834d22180 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -32,6 +32,8 @@ import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLPeerUnverifiedException; import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; @@ -452,6 +454,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private class SelfDelegateProtocolEngine implements ServerProtocolEngine { private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); + private long _lastReadTime; public SocketAddress getRemoteAddress() { @@ -475,6 +478,8 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void received(ByteBuffer msg) { + + _lastReadTime = System.currentTimeMillis(); ByteBuffer msgheader = msg.duplicate(); if(_header.remaining() > msgheader.limit()) { @@ -623,7 +628,8 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void readerIdle() { - + CurrentActor.get().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); } public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) @@ -634,7 +640,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine @Override public long getLastReadTime() { - return 0; + return _lastReadTime; } @Override @@ -650,6 +656,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private final SSLEngine _engine; private final SSLReceiver _sslReceiver; private final SSLBufferingSender _sslSender; + private long _lastReadTime; private SslDelegateProtocolEngine() { @@ -678,6 +685,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine @Override public void received(ByteBuffer msg) { + _lastReadTime = System.currentTimeMillis(); _sslReceiver.received(msg); _sslSender.send(); _sslSender.flush(); @@ -746,7 +754,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine @Override public long getLastReadTime() { - return _decryptEngine.getLastReadTime(); + return _lastReadTime; } @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 9e25182b10..4170f36771 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -236,15 +236,24 @@ public class ServerConnectionDelegate extends ServerDelegate return; } + final NetworkConnection networkConnection = sconn.getNetworkConnection(); + if(ok.hasHeartbeat()) { - final int heartbeat = ok.getHeartbeat(); - if(heartbeat > 0) + int heartbeat = ok.getHeartbeat(); + if(heartbeat < 0) { - final NetworkConnection networkConnection = sconn.getNetworkConnection(); - networkConnection.setMaxReadIdle(2 * heartbeat); - networkConnection.setMaxWriteIdle(heartbeat); + heartbeat = 0; } + + networkConnection.setMaxReadIdle(2 * heartbeat); + networkConnection.setMaxWriteIdle(heartbeat); + + } + else + { + networkConnection.setMaxReadIdle(0); + networkConnection.setMaxWriteIdle(0); } setConnectionTuneOkChannelMax(sconn, okChannelMax); diff --git a/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java index 2449f457e5..6bae93a1b8 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java @@ -33,6 +33,9 @@ public class CommonProperties public static final String IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME = "qpid.io_network_transport_timeout"; public static final int IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT = 60000; + public static final String HANDSHAKE_TIMEOUT_PROP_NAME = "qpid.handshake_timeout"; + public static final int HANDSHAKE_TIMEOUT_DEFAULT = 2; + private CommonProperties() { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 5742667dbe..18a8bf2779 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -50,6 +50,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class); private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private Socket _socket; private IoNetworkConnection _connection; @@ -224,7 +227,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { socket = _serverSocket.accept(); socket.setTcpNoDelay(_config.getTcpNoDelay()); - socket.setSoTimeout(_timeout); + socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); final Integer sendBufferSize = _config.getSendBufferSize(); final Integer receiveBufferSize = _config.getReceiveBufferSize(); @@ -237,6 +240,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, ticker); + + connection.setMaxReadIdle(HANSHAKE_TIMEOUT); + ticker.setConnection(connection); if(_sslContext != null && socket instanceof SSLSocket) |