summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-15 23:56:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-15 23:56:37 +0000
commitbbfc9d1b5b1af45f963e3ee907063a03c1a92491 (patch)
treea01a46d0eb93c17e1667dcf49a9feb94e5dd4d62
parentd7c0650850404073fa1ab7e2d78c8691be259666 (diff)
downloadqpid-python-bbfc9d1b5b1af45f963e3ee907063a03c1a92491.tar.gz
Merged r1558363 from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1558617 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java5
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes4
8 files changed, 44 insertions, 27 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 9e41a5234c..0bec85b2ec 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -24,7 +24,6 @@ import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
@@ -164,7 +163,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void readerIdle()
{
- //Todo
+ _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
}
public String getAddress()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 72d6a0832d..02d2d7828c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -549,16 +549,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
super.setLocalAddress(localAddress);
}
- public void setNetworkConnection(NetworkConnection network)
- {
- _networkConnection = network;
- }
-
- public NetworkConnection getNetworkConnection()
- {
- return _networkConnection;
- }
-
public void doHeartbeat()
{
super.doHeartBeat();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 129a811b61..a15fea1200 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -236,7 +236,6 @@ public class ServerConnectionDelegate extends ServerDelegate
}
final NetworkConnection networkConnection = sconn.getNetworkConnection();
-
if(ok.hasHeartbeat())
{
int heartbeat = ok.getHeartbeat();
@@ -352,4 +351,11 @@ public class ServerConnectionDelegate extends ServerDelegate
{
return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.PRODUCT);
}
+
+ @Override
+ protected int getHeartbeatMax()
+ {
+ int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY);
+ return delay == 0 ? super.getHeartbeatMax() : delay;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
index e9b946d5b7..51354a5941 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
@@ -179,12 +179,9 @@ public class ClientConnectionDelegate extends ClientDelegate
}
@Override
- public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat heartbeat)
{
- // ClientDelegate simply responds to heartbeats with heartbeats
_heartbeatListener.heartbeatReceived();
- super.connectionHeartbeat(conn, hearbeat);
- _heartbeatListener.heartbeatSent();
}
@@ -192,4 +189,11 @@ public class ClientConnectionDelegate extends ClientDelegate
{
_heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
}
+
+ @Override
+ public void writerIdle(final Connection connection)
+ {
+ super.writerIdle(connection);
+ _heartbeatListener.heartbeatSent();
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 75eb0e19a7..d48cd1754c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -143,6 +143,8 @@ public class ClientDelegate extends ConnectionDelegate
actualHeartbeatInterval);
int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+ conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
+ conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
conn.setIdleTimeout(idleTimeout);
int channelMax = tune.getChannelMax();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index cdca726148..3547205df1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -70,6 +70,7 @@ public class Connection extends ConnectionInvoker
public static final int MIN_USABLE_CHANNEL_NUM = 0;
private long _lastSendTime;
private long _lastReadTime;
+ private NetworkConnection _networkConnection;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -229,12 +230,13 @@ public class Connection extends ConnectionInvoker
addConnectionListener((ConnectionListener)secureReceiver);
}
- NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
+ _networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity());
- setRemoteAddress(network.getRemoteAddress());
- setLocalAddress(network.getLocalAddress());
- final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
+ setRemoteAddress(_networkConnection.getRemoteAddress());
+ setLocalAddress(_networkConnection.getLocalAddress());
+
+ final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
if(secureSender instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureSender);
@@ -785,14 +787,26 @@ public class Connection extends ConnectionInvoker
@Override
public void writerIdle()
{
+ getConnectionDelegate().writerIdle(Connection.this);
connectionHeartbeat();
}
@Override
public void readerIdle()
{
- // TODO
-
+ log.error("Closing connection as no heartbeat or other activity detected within specified interval");
+ _networkConnection.close();
}
}
+
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ _networkConnection = network;
+ }
+
+ public NetworkConnection getNetworkConnection()
+ {
+ return _networkConnection;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index fdd35d49ef..b3379890f3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -105,4 +105,9 @@ public abstract class ConnectionDelegate
ssn.closed();
}
}
+
+ public void writerIdle(final Connection connection)
+ {
+ connection.doHeartBeat();
+ }
}
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index 093821647d..8da2b5edd8 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -67,9 +67,5 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro
// QPID-3604: Immediate Prefetch no longer supported by 0-10
org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener
-// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based
-org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating
-org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide
-
// Java 0-10 client does not support re-binding the queue to the same exchange
org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange