diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-15 23:56:37 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-15 23:56:37 +0000 |
commit | bbfc9d1b5b1af45f963e3ee907063a03c1a92491 (patch) | |
tree | a01a46d0eb93c17e1667dcf49a9feb94e5dd4d62 | |
parent | d7c0650850404073fa1ab7e2d78c8691be259666 (diff) | |
download | qpid-python-bbfc9d1b5b1af45f963e3ee907063a03c1a92491.tar.gz |
Merged r1558363 from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1558617 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 44 insertions, 27 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 9e41a5234c..0bec85b2ec 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -24,7 +24,6 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v0_10.ServerConnection; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; @@ -164,7 +163,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void readerIdle() { - //Todo + _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); } public String getAddress() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 72d6a0832d..02d2d7828c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -549,16 +549,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, super.setLocalAddress(localAddress); } - public void setNetworkConnection(NetworkConnection network) - { - _networkConnection = network; - } - - public NetworkConnection getNetworkConnection() - { - return _networkConnection; - } - public void doHeartbeat() { super.doHeartBeat(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 129a811b61..a15fea1200 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -236,7 +236,6 @@ public class ServerConnectionDelegate extends ServerDelegate } final NetworkConnection networkConnection = sconn.getNetworkConnection(); - if(ok.hasHeartbeat()) { int heartbeat = ok.getHeartbeat(); @@ -352,4 +351,11 @@ public class ServerConnectionDelegate extends ServerDelegate { return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.PRODUCT); } + + @Override + protected int getHeartbeatMax() + { + int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY); + return delay == 0 ? super.getHeartbeatMax() : delay; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java index e9b946d5b7..51354a5941 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -179,12 +179,9 @@ public class ClientConnectionDelegate extends ClientDelegate } @Override - public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) + public void connectionHeartbeat(Connection conn, ConnectionHeartbeat heartbeat) { - // ClientDelegate simply responds to heartbeats with heartbeats _heartbeatListener.heartbeatReceived(); - super.connectionHeartbeat(conn, hearbeat); - _heartbeatListener.heartbeatSent(); } @@ -192,4 +189,11 @@ public class ClientConnectionDelegate extends ClientDelegate { _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; } + + @Override + public void writerIdle(final Connection connection) + { + super.writerIdle(connection); + _heartbeatListener.heartbeatSent(); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 75eb0e19a7..d48cd1754c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -143,6 +143,8 @@ public class ClientDelegate extends ConnectionDelegate actualHeartbeatInterval); int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); + conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); + conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index cdca726148..3547205df1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -70,6 +70,7 @@ public class Connection extends ConnectionInvoker public static final int MIN_USABLE_CHANNEL_NUM = 0; private long _lastSendTime; private long _lastReadTime; + private NetworkConnection _networkConnection; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -229,12 +230,13 @@ public class Connection extends ConnectionInvoker addConnectionListener((ConnectionListener)secureReceiver); } - NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity()); + _networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity()); - setRemoteAddress(network.getRemoteAddress()); - setLocalAddress(network.getLocalAddress()); - final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender()); + setRemoteAddress(_networkConnection.getRemoteAddress()); + setLocalAddress(_networkConnection.getLocalAddress()); + + final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender()); if(secureSender instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureSender); @@ -785,14 +787,26 @@ public class Connection extends ConnectionInvoker @Override public void writerIdle() { + getConnectionDelegate().writerIdle(Connection.this); connectionHeartbeat(); } @Override public void readerIdle() { - // TODO - + log.error("Closing connection as no heartbeat or other activity detected within specified interval"); + _networkConnection.close(); } } + + + public void setNetworkConnection(NetworkConnection network) + { + _networkConnection = network; + } + + public NetworkConnection getNetworkConnection() + { + return _networkConnection; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index fdd35d49ef..b3379890f3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -105,4 +105,9 @@ public abstract class ConnectionDelegate ssn.closed(); } } + + public void writerIdle(final Connection connection) + { + connection.doHeartBeat(); + } } diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 093821647d..8da2b5edd8 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -67,9 +67,5 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro // QPID-3604: Immediate Prefetch no longer supported by 0-10 org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener -// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based -org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating -org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide - // Java 0-10 client does not support re-binding the queue to the same exchange org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange |