From bbfc9d1b5b1af45f963e3ee907063a03c1a92491 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 15 Jan 2014 23:56:37 +0000 Subject: Merged r1558363 from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1558617 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/protocol/v0_10/ProtocolEngine_0_10.java | 4 ++-- .../server/protocol/v0_10/ServerConnection.java | 10 --------- .../protocol/v0_10/ServerConnectionDelegate.java | 8 ++++++- .../client/transport/ClientConnectionDelegate.java | 12 ++++++---- .../org/apache/qpid/transport/ClientDelegate.java | 2 ++ .../java/org/apache/qpid/transport/Connection.java | 26 +++++++++++++++++----- .../apache/qpid/transport/ConnectionDelegate.java | 5 +++++ qpid/java/test-profiles/Java010Excludes | 4 ---- 8 files changed, 44 insertions(+), 27 deletions(-) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 9e41a5234c..0bec85b2ec 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -24,7 +24,6 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v0_10.ServerConnection; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; @@ -164,7 +163,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void readerIdle() { - //Todo + _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); } public String getAddress() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 72d6a0832d..02d2d7828c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -549,16 +549,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.setLocalAddress(localAddress); } - public void setNetworkConnection(NetworkConnection network) - { - _networkConnection = network; - } - - public NetworkConnection getNetworkConnection() - { - return _networkConnection; - } - public void doHeartbeat() { super.doHeartBeat(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 129a811b61..a15fea1200 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -236,7 +236,6 @@ public class ServerConnectionDelegate extends ServerDelegate } final NetworkConnection networkConnection = sconn.getNetworkConnection(); - if(ok.hasHeartbeat()) { int heartbeat = ok.getHeartbeat(); @@ -352,4 +351,11 @@ public class ServerConnectionDelegate extends ServerDelegate { return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.PRODUCT); } + + @Override + protected int getHeartbeatMax() + { + int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY); + return delay == 0 ? super.getHeartbeatMax() : delay; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java index e9b946d5b7..51354a5941 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -179,12 +179,9 @@ public class ClientConnectionDelegate extends ClientDelegate } @Override - public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) + public void connectionHeartbeat(Connection conn, ConnectionHeartbeat heartbeat) { - // ClientDelegate simply responds to heartbeats with heartbeats _heartbeatListener.heartbeatReceived(); - super.connectionHeartbeat(conn, hearbeat); - _heartbeatListener.heartbeatSent(); } @@ -192,4 +189,11 @@ public class ClientConnectionDelegate extends ClientDelegate { _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; } + + @Override + public void writerIdle(final Connection connection) + { + super.writerIdle(connection); + _heartbeatListener.heartbeatSent(); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 75eb0e19a7..d48cd1754c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -143,6 +143,8 @@ public class ClientDelegate extends ConnectionDelegate actualHeartbeatInterval); int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); + conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); + conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index cdca726148..3547205df1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -70,6 +70,7 @@ public class Connection extends ConnectionInvoker public static final int MIN_USABLE_CHANNEL_NUM = 0; private long _lastSendTime; private long _lastReadTime; + private NetworkConnection _networkConnection; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -229,12 +230,13 @@ public class Connection extends ConnectionInvoker addConnectionListener((ConnectionListener)secureReceiver); } - NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity()); + _networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity()); - setRemoteAddress(network.getRemoteAddress()); - setLocalAddress(network.getLocalAddress()); - final Sender secureSender = securityLayer.sender(network.getSender()); + setRemoteAddress(_networkConnection.getRemoteAddress()); + setLocalAddress(_networkConnection.getLocalAddress()); + + final Sender secureSender = securityLayer.sender(_networkConnection.getSender()); if(secureSender instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureSender); @@ -785,14 +787,26 @@ public class Connection extends ConnectionInvoker @Override public void writerIdle() { + getConnectionDelegate().writerIdle(Connection.this); connectionHeartbeat(); } @Override public void readerIdle() { - // TODO - + log.error("Closing connection as no heartbeat or other activity detected within specified interval"); + _networkConnection.close(); } } + + + public void setNetworkConnection(NetworkConnection network) + { + _networkConnection = network; + } + + public NetworkConnection getNetworkConnection() + { + return _networkConnection; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index fdd35d49ef..b3379890f3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -105,4 +105,9 @@ public abstract class ConnectionDelegate ssn.closed(); } } + + public void writerIdle(final Connection connection) + { + connection.doHeartBeat(); + } } diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 093821647d..8da2b5edd8 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -67,9 +67,5 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro // QPID-3604: Immediate Prefetch no longer supported by 0-10 org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener -// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based -org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating -org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide - // Java 0-10 client does not support re-binding the queue to the same exchange org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange -- cgit v1.2.1