summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-06-14 20:55:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-06-14 20:55:03 +0000
commit72a2924a6d8a74528450c1b659ed58d74b1b9f53 (patch)
tree976609d5121b84ee2c6a469a87d00dad5cee876f
parente9dee27706bc63d96dfa68803d985f300d4becb8 (diff)
downloadqpid-python-72a2924a6d8a74528450c1b659ed58d74b1b9f53.tar.gz
QPID-4925 : [Java Broker] Timeout connections which do not complete connection handshake
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1493240 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java5
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java19
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java8
6 files changed, 42 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
index 8559862a45..a99bcc7352 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Connection_logmessages.properties
@@ -22,4 +22,5 @@
# 1 - Protocol Version
# 2 - Client Version
OPEN = CON-1001 : Open[ : Client ID : {0}][ : Protocol Version : {1}][ : Client Version : {2}]
-CLOSE = CON-1002 : Close \ No newline at end of file
+CLOSE = CON-1002 : Close
+IDLE_CLOSE = CON-1003 : Closed due to inactivity
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 3274cffc08..bbf90fad86 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -790,6 +790,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
}
+ else
+ {
+ _network.setMaxWriteIdle(0);
+ _network.setMaxReadIdle(0);
+ }
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index c0764272a4..3834d22180 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -32,6 +32,8 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
@@ -452,6 +454,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
+ private long _lastReadTime;
public SocketAddress getRemoteAddress()
{
@@ -475,6 +478,8 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public void received(ByteBuffer msg)
{
+
+ _lastReadTime = System.currentTimeMillis();
ByteBuffer msgheader = msg.duplicate();
if(_header.remaining() > msgheader.limit())
{
@@ -623,7 +628,8 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public void readerIdle()
{
-
+ CurrentActor.get().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
}
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
@@ -634,7 +640,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
@Override
public long getLastReadTime()
{
- return 0;
+ return _lastReadTime;
}
@Override
@@ -650,6 +656,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private final SSLEngine _engine;
private final SSLReceiver _sslReceiver;
private final SSLBufferingSender _sslSender;
+ private long _lastReadTime;
private SslDelegateProtocolEngine()
{
@@ -678,6 +685,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
@Override
public void received(ByteBuffer msg)
{
+ _lastReadTime = System.currentTimeMillis();
_sslReceiver.received(msg);
_sslSender.send();
_sslSender.flush();
@@ -746,7 +754,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
@Override
public long getLastReadTime()
{
- return _decryptEngine.getLastReadTime();
+ return _lastReadTime;
}
@Override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 9e25182b10..4170f36771 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -236,15 +236,24 @@ public class ServerConnectionDelegate extends ServerDelegate
return;
}
+ final NetworkConnection networkConnection = sconn.getNetworkConnection();
+
if(ok.hasHeartbeat())
{
- final int heartbeat = ok.getHeartbeat();
- if(heartbeat > 0)
+ int heartbeat = ok.getHeartbeat();
+ if(heartbeat < 0)
{
- final NetworkConnection networkConnection = sconn.getNetworkConnection();
- networkConnection.setMaxReadIdle(2 * heartbeat);
- networkConnection.setMaxWriteIdle(heartbeat);
+ heartbeat = 0;
}
+
+ networkConnection.setMaxReadIdle(2 * heartbeat);
+ networkConnection.setMaxWriteIdle(heartbeat);
+
+ }
+ else
+ {
+ networkConnection.setMaxReadIdle(0);
+ networkConnection.setMaxWriteIdle(0);
}
setConnectionTuneOkChannelMax(sconn, okChannelMax);
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
index 2449f457e5..6bae93a1b8 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
@@ -33,6 +33,9 @@ public class CommonProperties
public static final String IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME = "qpid.io_network_transport_timeout";
public static final int IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT = 60000;
+ public static final String HANDSHAKE_TIMEOUT_PROP_NAME = "qpid.handshake_timeout";
+ public static final int HANDSHAKE_TIMEOUT_DEFAULT = 2;
+
private CommonProperties()
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index 5742667dbe..18a8bf2779 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -50,6 +50,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+
private Socket _socket;
private IoNetworkConnection _connection;
@@ -224,7 +227,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
socket = _serverSocket.accept();
socket.setTcpNoDelay(_config.getTcpNoDelay());
- socket.setSoTimeout(_timeout);
+ socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
final Integer sendBufferSize = _config.getSendBufferSize();
final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -237,6 +240,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
ticker);
+
+ connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+
ticker.setConnection(connection);
if(_sslContext != null && socket instanceof SSLSocket)