summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java5
3 files changed, 27 insertions, 6 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 75eb0e19a7..d48cd1754c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -143,6 +143,8 @@ public class ClientDelegate extends ConnectionDelegate
actualHeartbeatInterval);
int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+ conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
+ conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
conn.setIdleTimeout(idleTimeout);
int channelMax = tune.getChannelMax();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index cdca726148..3547205df1 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -70,6 +70,7 @@ public class Connection extends ConnectionInvoker
public static final int MIN_USABLE_CHANNEL_NUM = 0;
private long _lastSendTime;
private long _lastReadTime;
+ private NetworkConnection _networkConnection;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -229,12 +230,13 @@ public class Connection extends ConnectionInvoker
addConnectionListener((ConnectionListener)secureReceiver);
}
- NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
+ _networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity());
- setRemoteAddress(network.getRemoteAddress());
- setLocalAddress(network.getLocalAddress());
- final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
+ setRemoteAddress(_networkConnection.getRemoteAddress());
+ setLocalAddress(_networkConnection.getLocalAddress());
+
+ final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
if(secureSender instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureSender);
@@ -785,14 +787,26 @@ public class Connection extends ConnectionInvoker
@Override
public void writerIdle()
{
+ getConnectionDelegate().writerIdle(Connection.this);
connectionHeartbeat();
}
@Override
public void readerIdle()
{
- // TODO
-
+ log.error("Closing connection as no heartbeat or other activity detected within specified interval");
+ _networkConnection.close();
}
}
+
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ _networkConnection = network;
+ }
+
+ public NetworkConnection getNetworkConnection()
+ {
+ return _networkConnection;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index fdd35d49ef..b3379890f3 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -105,4 +105,9 @@ public abstract class ConnectionDelegate
ssn.closed();
}
}
+
+ public void writerIdle(final Connection connection)
+ {
+ connection.doHeartBeat();
+ }
}