summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-11-28 21:21:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-11-28 21:21:05 +0000
commitf58f4189e28b940bac1134325796f0c5c2751068 (patch)
tree1b0e6e73dacbdb51e13c24fc5b73b6cfbf4a6d39
parent282ceb6824629a16efdf3d5e1c447b05be04ca09 (diff)
downloadqpid-python-f58f4189e28b940bac1134325796f0c5c2751068.tar.gz
QPID-2796 : Restore heartbeating functionality to the Java Broker, and the 0-8,0-9,0-9-1 Java Client
merge from trunk r1413376,1413539,1413549,1413567 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1414927 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java24
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java36
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java55
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java37
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java125
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java49
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java29
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java33
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java87
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java31
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java80
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java12
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java6
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java257
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java116
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes4
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes3
33 files changed, 994 insertions, 180 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 976d7fd28a..72c21d357e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -150,6 +151,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private boolean _blocking;
private final Lock _receivedLock;
+ private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
+
public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
@@ -541,7 +544,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
_sender.send(buf);
- _lastIoTime = System.currentTimeMillis();
+ final long time = System.currentTimeMillis();
+ _lastIoTime = time;
+ _lastWriteTime.set(time);
+
if(!_deferFlush)
{
_sender.flush();
@@ -1058,12 +1064,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void readerIdle()
{
- // Nothing
+ // TODO - enforce disconnect on lack of inbound data
}
public synchronized void writerIdle()
{
- _sender.send(asByteBuffer(HeartbeatBody.FRAME));
+ writeFrame(HeartbeatBody.FRAME);
}
public void exception(Throwable throwable)
@@ -1461,4 +1467,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
return _receivedLock;
}
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReceivedTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime.get();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index c62f1b56f0..c8126b3677 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -217,6 +217,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_sender = sender;
}
+ @Override
+ public long getLastReadTime()
+ {
+ return _delegate.getLastReadTime();
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _delegate.getLastWriteTime();
+ }
+
private static interface DelegateCreator
{
@@ -409,6 +421,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
+ @Override
+ public long getLastReadTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return 0;
+ }
+
public long getConnectionId()
{
return _id;
@@ -566,5 +590,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
}
+
+ @Override
+ public long getLastReadTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return 0;
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index d7d26cc772..7d263b517c 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -44,6 +44,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private ServerConnection _connection;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
+ private long _lastReadTime;
+ private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network,
@@ -71,13 +73,61 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
_network = network;
- _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+ _connection.setNetworkConnection(network);
+ _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
_connection.setPeerPrincipal(_network.getPeerPrincipal());
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
_connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
}
+ private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+ {
+ return new Sender<ByteBuffer>()
+ {
+ @Override
+ public void setIdleTimeout(int i)
+ {
+ sender.setIdleTimeout(i);
+
+ }
+
+ @Override
+ public void send(ByteBuffer msg)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ sender.send(msg);
+
+ }
+
+ @Override
+ public void flush()
+ {
+ sender.flush();
+
+ }
+
+ @Override
+ public void close()
+ {
+ sender.close();
+
+ }
+ };
+ }
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -90,6 +140,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void received(final ByteBuffer buf)
{
+ _lastReadTime = System.currentTimeMillis();
super.received(buf);
_connection.receivedComplete();
}
@@ -106,7 +157,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void writerIdle()
{
- //Todo
+ _connection.doHeartbeat();
}
public void readerIdle()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index b5110a21d8..715a512b47 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -53,6 +53,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
//private NetworkConnection _networkDriver;
private long _readBytes;
private long _writtenBytes;
+ private long _lastReadTime;
+ private long _lastWriteTime;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _conn;
@@ -182,6 +184,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
public synchronized void received(ByteBuffer msg)
{
+ _lastReadTime = System.currentTimeMillis();
if(RAW_LOGGER.isLoggable(Level.FINE))
{
ByteBuffer dup = msg.duplicate();
@@ -324,6 +327,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
synchronized(_sendLock)
{
+ _lastWriteTime = System.currentTimeMillis();
if(FRAME_LOGGER.isLoggable(Level.FINE))
{
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
@@ -378,4 +382,13 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
return _connectionId;
}
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
index 71d6df27e0..634c5e6255 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -51,6 +51,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
private long _readBytes;
private long _writtenBytes;
+
+ private long _lastReadTime;
+ private long _lastWriteTime;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _conn;
@@ -222,6 +225,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
public synchronized void received(ByteBuffer msg)
{
+ _lastReadTime = System.currentTimeMillis();
if(RAW_LOGGER.isLoggable(Level.FINE))
{
ByteBuffer dup = msg.duplicate();
@@ -364,7 +368,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
synchronized(_sendLock)
{
-
+ _lastWriteTime = System.currentTimeMillis();
if(FRAME_LOGGER.isLoggable(Level.FINE))
{
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
@@ -425,4 +429,13 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
return _connectionId;
}
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index ce3ede2dba..58de6a0cdf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -48,6 +48,7 @@ import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.network.NetworkConnection;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -68,6 +69,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Principal _peerPrincipal;
+ private NetworkConnection _networkConnection;
public ServerConnection(final long connectionId)
{
@@ -490,4 +492,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
super.setLocalAddress(localAddress);
}
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ _networkConnection = network;
+ }
+
+ public NetworkConnection getNetworkConnection()
+ {
+ return _networkConnection;
+ }
+
+ public void doHeartbeat()
+ {
+ super.doHeartBeat();
+
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 70f5afe5ac..f48121f9f0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -43,6 +43,8 @@ import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.NetworkConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,14 +228,18 @@ public class ServerConnectionDelegate extends ServerDelegate
return;
}
- setConnectionTuneOkChannelMax(sconn, okChannelMax);
- }
+ if(ok.hasHeartbeat())
+ {
+ final int heartbeat = ok.getHeartbeat();
+ if(heartbeat > 0)
+ {
+ final NetworkConnection networkConnection = sconn.getNetworkConnection();
+ networkConnection.setMaxReadIdle(2 * heartbeat);
+ networkConnection.setMaxWriteIdle(heartbeat);
+ }
+ }
- @Override
- protected int getHeartbeatMax()
- {
- //TODO: implement broker support for actually sending heartbeats
- return 0;
+ setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
@Override
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 6f2631ac05..a0e659c359 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1560,4 +1560,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
+ localAddress + " to " + remoteAddress);
}
}
+
+ void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _delegate.setHeartbeatListener(listener);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index b6f25a2cef..a8fdaeb65c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -78,4 +78,6 @@ public interface AMQConnectionDelegate
* @return true if the feature is supported by the server
*/
boolean isSupportedServerFeature(final String featureName);
+
+ void setHeartbeatListener(HeartbeatListener listener);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 5dd6e55e64..69e79d42a0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -422,6 +422,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return featureSupported;
}
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener);
+ }
+
private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail)
{
ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();
@@ -484,10 +490,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
_logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
}
- else
+ else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null)
{
heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
}
+ else
+ {
+ heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT);
+ }
return heartbeat;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 740a81b939..67d7c2a78c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -122,7 +122,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
- NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()));
+
+ NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
+ _conn.getProtocolHandler());
_conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
@@ -376,4 +378,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
// we just hardcode JMS selectors as supported.
return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
}
+
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _conn.getProtocolHandler().setHeartbeatListener(listener);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java
new file mode 100644
index 0000000000..32a7cb0b73
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+public interface HeartbeatListener
+{
+ void heartbeatReceived();
+
+ void heartbeatSent();
+
+ static final HeartbeatListener DEFAULT = new HeartbeatListener()
+ {
+ public void heartbeatReceived()
+ {
+ }
+
+ public void heartbeatSent()
+ {
+ }
+ };
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index af89000c5c..816caac824 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,6 +179,9 @@ public class AMQProtocolHandler implements ProtocolEngine
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private long _lastReadTime = System.currentTimeMillis();
+ private long _lastWriteTime = System.currentTimeMillis();
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -300,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
- HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
@@ -309,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
+ _heartbeatListener.heartbeatSent();
}
/**
@@ -442,6 +445,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public void received(ByteBuffer msg)
{
_readBytes += msg.remaining();
+ _lastReadTime = System.currentTimeMillis();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -470,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine
final AMQBody bodyFrame = frame.getBodyFrame();
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_readBytes);
@@ -560,6 +562,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
final ByteBuffer buf = asByteBuffer(frame);
+ _lastWriteTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
if(flush)
@@ -882,6 +885,18 @@ public class AMQProtocolHandler implements ProtocolEngine
_sender = sender;
}
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
protected Sender<ByteBuffer> getSender()
{
return _sender;
@@ -894,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
@@ -909,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine
}
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
+ public void heartbeatBodyReceived()
+ {
+ _heartbeatListener.heartbeatReceived();
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index cf521c8892..aed10cf15f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -267,7 +267,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
{
-
+ _protocolHandler.heartbeatBodyReceived();
}
/**
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
deleted file mode 100644
index d387a8ba93..0000000000
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-class HeartbeatDiagnostics
-{
- private static final Diagnostics _impl = init();
-
- private HeartbeatDiagnostics()
- {
- }
-
- private static Diagnostics init()
- {
- return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off();
- }
-
- static void sent()
- {
- _impl.sent();
- }
-
- static void timeout()
- {
- _impl.timeout();
- }
-
- static void received(boolean heartbeat)
- {
- _impl.received(heartbeat);
- }
-
- static void init(int delay, int timeout)
- {
- _impl.init(delay, timeout);
- }
-
- private static interface Diagnostics
- {
- void sent();
- void timeout();
- void received(boolean heartbeat);
- void init(int delay, int timeout);
- }
-
- private static class On implements Diagnostics
- {
- private final String[] messages = new String[50];
- private int i;
-
- private void save(String msg)
- {
- messages[i++] = msg;
- if(i >= messages.length){
- i = 0;//i.e. a circular buffer
- }
- }
-
- public void sent()
- {
- save(System.currentTimeMillis() + ": sent heartbeat");
- }
-
- public void timeout()
- {
- for(int i = 0; i < messages.length; i++)
- {
- if(messages[i] != null)
- {
- System.out.println(messages[i]);
- }
- }
- System.out.println(System.currentTimeMillis() + ": timed out");
- }
-
- public void received(boolean heartbeat)
- {
- save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data"));
- }
-
- public void init(int delay, int timeout)
- {
- System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout);
- }
- }
-
- private static class Off implements Diagnostics
- {
- public void sent()
- {
-
- }
- public void timeout()
- {
-
- }
- public void received(boolean heartbeat)
- {
-
- }
-
- public void init(int delay, int timeout)
- {
-
- }
- }
-}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
index 3c9a6e1500..4789dd0ed7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.transport;
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.transport.ConnectionHeartbeat;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
@@ -70,6 +72,7 @@ public class ClientConnectionDelegate extends ClientDelegate
}
private final ConnectionURL _connectionURL;
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* @param settings
@@ -165,4 +168,19 @@ public class ClientConnectionDelegate extends ClientDelegate
return null;
}
+
+ @Override
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ {
+ // ClientDelegate simply responds to heartbeats with heartbeats
+ _heartbeatListener.heartbeatReceived();
+ super.connectionHeartbeat(conn, hearbeat);
+ _heartbeatListener.heartbeatSent();
+ }
+
+
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 7ca588946b..6774d0a45a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -23,6 +23,7 @@ package org.apache.qpid.protocol;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportActivity;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -31,7 +32,7 @@ import java.nio.ByteBuffer;
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
*/
-public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, TransportActivity
{
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
@@ -56,6 +57,6 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
void readerIdle();
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index f95a0d215b..5ae2f1ceb2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -21,12 +21,7 @@
package org.apache.qpid.transport;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.*;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Logger;
@@ -73,6 +68,9 @@ public class Connection extends ConnectionInvoker
//Usable channels are numbered 0 to <ChannelMax> - 1
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
+ private long _lastSendTime;
+ private long _lastReadTime;
+
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -231,7 +229,8 @@ public class Connection extends ConnectionInvoker
addConnectionListener((ConnectionListener)secureReceiver);
}
- NetworkConnection network = transport.connect(settings, secureReceiver);
+ NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
+
setRemoteAddress(network.getRemoteAddress());
setLocalAddress(network.getLocalAddress());
@@ -368,6 +367,7 @@ public class Connection extends ConnectionInvoker
public void received(ProtocolEvent event)
{
+ _lastReadTime = System.currentTimeMillis();
if(log.isDebugEnabled())
{
log.debug("RECV: [%s] %s", this, event);
@@ -377,6 +377,7 @@ public class Connection extends ConnectionInvoker
public void send(ProtocolEvent event)
{
+ _lastSendTime = System.currentTimeMillis();
if(log.isDebugEnabled())
{
log.debug("SEND: [%s] %s", this, event);
@@ -745,4 +746,38 @@ public class Connection extends ConnectionInvoker
sessionDetached.setCode(sessionDetachCode);
invoke(sessionDetached);
}
+
+
+ protected void doHeartBeat()
+ {
+ connectionHeartbeat();
+ }
+
+ private class ConnectionActivity implements TransportActivity
+ {
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastSendTime;
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ connectionHeartbeat();
+ }
+
+ @Override
+ public void readerIdle()
+ {
+ // TODO
+
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
index 4d4274278f..8437ef1a94 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
@@ -27,5 +27,7 @@ import javax.net.ssl.SSLContext;
public interface IncomingNetworkTransport extends NetworkTransport
{
- public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext);
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext);
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 12c42d6643..050d194c47 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -50,4 +50,8 @@ public interface NetworkConnection
void setPeerPrincipal(Principal principal);
Principal getPeerPrincipal();
+
+ int getMaxReadIdle();
+
+ int getMaxWriteIdle();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
index 92774f5842..45231aa05d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
@@ -29,5 +29,7 @@ public interface OutgoingNetworkTransport extends NetworkTransport
{
public NetworkConnection getConnection();
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate);
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity);
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java
new file mode 100644
index 0000000000..210b014a57
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network;
+
+public interface Ticker
+{
+ int getTimeToNextTick(long currentTime);
+
+ int tick(long currentTime);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java
new file mode 100644
index 0000000000..2ee336d9b2
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network;
+
+public interface TransportActivity
+{
+ long getLastReadTime();
+
+ long getLastWriteTime();
+
+ void writerIdle();
+
+ void readerIdle();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
new file mode 100644
index 0000000000..54a2a360bb
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportActivity;
+
+class IdleTimeoutTicker implements Ticker
+{
+ private final TransportActivity _transport;
+ private final int _defaultTimeout;
+ private NetworkConnection _connection;
+
+ public IdleTimeoutTicker(TransportActivity transport, int defaultTimeout)
+ {
+ _transport = transport;
+ _defaultTimeout = defaultTimeout;
+ }
+
+ @Override
+ public int getTimeToNextTick(long currentTime)
+ {
+ long nextTime = -1;
+ final long maxReadIdle = 1000l * _connection.getMaxReadIdle();
+
+ if(maxReadIdle > 0)
+ {
+ nextTime = _transport.getLastReadTime() + maxReadIdle;
+ }
+
+ long maxWriteIdle = 1000l * _connection.getMaxWriteIdle();
+
+ if(maxWriteIdle > 0)
+ {
+ long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
+ if(nextTime == -1l || writeTime < nextTime)
+ {
+ nextTime = writeTime;
+ }
+ }
+ return nextTime == -1 ? _defaultTimeout : (int) (nextTime - currentTime);
+ }
+
+ @Override
+ public int tick(long currentTime)
+ {
+ // writer Idle
+ long maxWriteIdle = 1000l * _connection.getMaxWriteIdle();
+ if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime)
+ {
+ _transport.writerIdle();
+ }
+ // reader Idle
+ final long maxReadIdle = 1000l * _connection.getMaxReadIdle();
+ if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime)
+ {
+
+ _transport.readerIdle();
+ }
+ return getTimeToNextTick(currentTime);
+ }
+
+ public void setConnection(NetworkConnection connection)
+ {
+ _connection = connection;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index 2658296c5f..f5c09ac2cc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -26,7 +26,9 @@ import java.nio.ByteBuffer;
import java.security.Principal;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.NetworkConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,14 +40,23 @@ public class IoNetworkConnection implements NetworkConnection
private final IoSender _ioSender;
private final IoReceiver _ioReceiver;
private Principal _principal;
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
- int sendBufferSize, int receiveBufferSize, long timeout)
+ int sendBufferSize, int receiveBufferSize, long timeout)
+ {
+ this(socket,delegate,sendBufferSize,receiveBufferSize,timeout,null);
+ }
+
+ public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+ int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
{
_socket = socket;
_timeout = timeout;
_ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+ _ioReceiver.setTicker(ticker);
_ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
@@ -88,14 +99,12 @@ public class IoNetworkConnection implements NetworkConnection
public void setMaxWriteIdle(int sec)
{
- // TODO implement support for setting heartbeating config in this way
- // Currently a socket timeout is used in IoSender
+ _maxWriteIdle = sec;
}
public void setMaxReadIdle(int sec)
{
- // TODO implement support for setting heartbeating config in this way
- // Currently a socket timeout is used in IoSender
+ _maxReadIdle = sec;
}
@Override
@@ -109,4 +118,16 @@ public class IoNetworkConnection implements NetworkConnection
{
return _principal;
}
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index 4cb526ef73..3f1cd3519b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -41,9 +41,8 @@ import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.*;
+
import org.slf4j.LoggerFactory;
public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
@@ -56,7 +55,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
private IoNetworkConnection _connection;
private AcceptingThread _acceptor;
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate)
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity)
{
int sendBufferSize = settings.getWriteBufferSize();
int receiveBufferSize = settings.getReadBufferSize();
@@ -91,7 +92,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
try
{
- _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT);
+ IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+ _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+ ticker.setConnection(_connection);
_connection.start();
}
catch(Exception e)
@@ -128,7 +131,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
return _connection;
}
- public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext)
{
try
{
@@ -149,6 +154,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
private ProtocolEngineFactory _factory;
private SSLContext _sslContext;
private ServerSocket _serverSocket;
+ private int _timeout;
private AcceptingThread(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
@@ -157,6 +163,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
_config = config;
_factory = factory;
_sslContext = sslContext;
+ _timeout = TIMEOUT;
InetSocketAddress address = config.getAddress();
@@ -217,6 +224,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
socket = _serverSocket.accept();
socket.setTcpNoDelay(_config.getTcpNoDelay());
+ socket.setSoTimeout(_timeout);
final Integer sendBufferSize = _config.getSendBufferSize();
final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -226,7 +234,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
ProtocolEngine engine = _factory.newProtocolEngine();
- NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT);
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+ NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
+ ticker);
+ ticker.setConnection(connection);
if(_sslContext != null)
{
@@ -293,6 +304,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
}
}
}
+
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index a36e5fedee..06a43e21c6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Logger;
import javax.net.ssl.SSLSocket;
@@ -31,6 +32,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +53,8 @@ final class IoReceiver implements Runnable, Closeable
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
private static final boolean shutdownBroken;
+
+ private Ticker _ticker;
static
{
String osName = System.getProperty("os.name");
@@ -136,7 +140,7 @@ final class IoReceiver implements Runnable, Closeable
{
final int threshold = bufferSize / 2;
- // I set the read buffer size simillar to SO_RCVBUF
+ // I set the read buffer size similar to SO_RCVBUF
// Haven't tested with a lower value to see if it's better or worse
byte[] buffer = new byte[bufferSize];
try
@@ -144,17 +148,64 @@ final class IoReceiver implements Runnable, Closeable
InputStream in = socket.getInputStream();
int read = 0;
int offset = 0;
- while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ long currentTime;
+ while(read != -1)
{
- if (read > 0)
+ try
+ {
+ while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ {
+ if (read > 0)
+ {
+ ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
+ receiver.received(b);
+ offset+=read;
+ if (offset > threshold)
+ {
+ offset = 0;
+ buffer = new byte[bufferSize];
+ }
+ }
+ currentTime = System.currentTimeMillis();
+
+ if(_ticker != null)
+ {
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
+ try
+ {
+ if(!socket.isClosed())
+ {
+ socket.setSoTimeout(tick <= 0 ? 1 : tick);
+ }
+ }
+ catch(SocketException e)
+ {
+ // ignore - closed socket
+ }
+ }
+ }
+ }
+ catch (SocketTimeoutException e)
{
- ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
- receiver.received(b);
- offset+=read;
- if (offset > threshold)
+ currentTime = System.currentTimeMillis();
+ if(_ticker != null)
{
- offset = 0;
- buffer = new byte[bufferSize];
+ final int tick = _ticker.tick(currentTime);
+ if(!socket.isClosed())
+ {
+ try
+ {
+ socket.setSoTimeout(tick <= 0 ? 1 : tick );
+ }
+ catch(SocketException ex)
+ {
+ // ignore - closed socket
+ }
+ }
}
}
}
@@ -195,4 +246,15 @@ final class IoReceiver implements Runnable, Closeable
return !brokenClose && !sslSocketClosed;
}
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public void setTicker(Ticker ticker)
+ {
+ _ticker = ticker;
+ }
+
+
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
index 893f66c5ff..a19c2e7e43 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
@@ -83,6 +83,18 @@ public class TestNetworkConnection implements NetworkConnection
return null;
}
+ @Override
+ public int getMaxReadIdle()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return 0;
+ }
+
public void setMaxWriteIdle(int idleTime)
{
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
index b4c0981131..bf9a5843d6 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
@@ -127,7 +127,9 @@ public class TransportTest extends QpidTestCase
throw new UnsupportedOperationException();
}
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate)
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity)
{
throw new UnsupportedOperationException();
}
@@ -147,7 +149,7 @@ public class TransportTest extends QpidTestCase
}
public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory, SSLContext sslContext)
+ ProtocolEngineFactory factory, SSLContext sslContext)
{
throw new UnsupportedOperationException();
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
new file mode 100644
index 0000000000..5cdd7a8597
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import junit.framework.TestCase;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportActivity;
+
+public class IdleTimeoutTickerTest extends TestCase implements TransportActivity, NetworkConnection
+{
+ private IdleTimeoutTicker _ticker;
+ private static final int DEFAULT_TIMEOUT = 567890;
+ private long _lastReadTime;
+ private long _lastWriteTime;
+ private long _currentTime;
+ private int _maxWriteIdle;
+ private int _maxReadIdle;
+ private boolean _readerIdle;
+ private boolean _writerIdle;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _ticker = new IdleTimeoutTicker(this, DEFAULT_TIMEOUT);
+ _ticker.setConnection(this);
+ _readerIdle = false;
+ _writerIdle = false;
+ _lastReadTime = 0l;
+ _lastWriteTime = 0l;
+ _maxReadIdle = 0;
+ _maxWriteIdle = 0;
+ }
+
+ public void testNoIdle() throws Exception
+ {
+ _maxReadIdle = 4;
+ _maxWriteIdle = 2;
+ _lastReadTime = 0;
+ _lastWriteTime = 1500;
+ _currentTime = 3000;
+ // Current time = 3s,
+ // last read = 0s, max read idle = 4s, should check in 1s
+ // last write = 1.5s, max write idle = 2s, should check in 0.5s
+ long nextTime = _ticker.tick(_currentTime);
+ assertEquals("Incorrect next tick calculation", 500l, nextTime);
+ assertFalse("Incorrectly caused reader idle", _readerIdle);
+ assertFalse("Incorrectly caused writer idle", _writerIdle);
+
+
+ // Current time = 3.4s,
+ // last read = 0s, max read idle = 4s, should check in 0.6s
+ // last write = 3.1s, max write idle = 2s, should check in 1.7s
+ _lastWriteTime = 3100;
+ _currentTime = 3400;
+ nextTime = _ticker.tick(_currentTime);
+ assertEquals("Incorrect next tick calculation", 600l, nextTime);
+ assertFalse("Incorrectly caused reader idle", _readerIdle);
+ assertFalse("Incorrectly caused writer idle", _writerIdle);
+
+ _maxReadIdle = 0;
+ nextTime = _ticker.tick(_currentTime);
+ assertEquals("Incorrect next tick calculation", 1700l, nextTime);
+ assertFalse("Incorrectly caused reader idle", _readerIdle);
+ assertFalse("Incorrectly caused writer idle", _writerIdle);
+
+ _maxWriteIdle = 0;
+ nextTime = _ticker.tick(_currentTime);
+ assertEquals("Incorrect next tick calculation", DEFAULT_TIMEOUT, nextTime);
+ assertFalse("Incorrectly caused reader idle", _readerIdle);
+ assertFalse("Incorrectly caused writer idle", _writerIdle);
+
+ }
+
+ public void testReaderIdle() throws Exception
+ {
+ _maxReadIdle = 4;
+ _maxWriteIdle = 0;
+ _lastReadTime = 0;
+ _lastWriteTime = 2500;
+ _currentTime = 4000;
+ // Current time = 4s,
+ // last read = 0s, max read idle = 4s, reader idle
+ long nextTime = _ticker.tick(_currentTime);
+
+ assertTrue(_readerIdle);
+ assertFalse(_writerIdle);
+
+ _readerIdle = false;
+
+ // last write = 2.5s, max write idle = 2s, should check in 0.5s
+ _maxWriteIdle = 2;
+ nextTime = _ticker.tick(_currentTime);
+ assertTrue(_readerIdle);
+ assertFalse(_writerIdle);
+
+ _readerIdle = false;
+ // last write = 1.5s, max write idle = 2s, should check in 0.5s
+
+ _lastWriteTime = 1500;
+ nextTime = _ticker.tick(_currentTime);
+
+ assertTrue(_readerIdle);
+ assertTrue(_writerIdle);
+
+ }
+
+ public void testWriterIdle() throws Exception
+ {
+ _maxReadIdle = 0;
+ _maxWriteIdle = 2;
+ _lastReadTime = 0;
+ _lastWriteTime = 1500;
+ _currentTime = 4000;
+ // Current time = 4s,
+ // last write = 1.5s, max write idle = 2s, writer idle
+ long nextTime = _ticker.tick(_currentTime);
+
+ assertTrue(_writerIdle);
+ assertFalse(_readerIdle);
+ assertEquals(2000l,nextTime);
+
+ _writerIdle = false;
+ _lastWriteTime = 1500;
+ _maxReadIdle = 5;
+
+ nextTime = _ticker.tick(_currentTime);
+
+ assertTrue(_writerIdle);
+ assertFalse(_readerIdle);
+ assertEquals(1000l,nextTime);
+
+ }
+
+ //-------------------------------------------------------------------------
+ // Implement TransportActivity methods
+ //-------------------------------------------------------------------------
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _writerIdle = true;
+ _lastWriteTime = _currentTime;
+ }
+
+ @Override
+ public void readerIdle()
+ {
+ _readerIdle = true;
+ }
+
+ //-------------------------------------------------------------------------
+ // Implement NetworkConnection methods
+ // Only actually use those relating to idle timeouts
+ //-------------------------------------------------------------------------
+
+ @Override
+ public Sender<ByteBuffer> getSender()
+ {
+ return null;
+ }
+
+ @Override
+ public void start()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return null;
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return null;
+ }
+
+ @Override
+ public void setMaxWriteIdle(int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ @Override
+ public void setMaxReadIdle(int sec)
+ {
+ _maxReadIdle = sec;
+ }
+
+ @Override
+ public void setPeerPrincipal(Principal principal)
+ {
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ return null;
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
new file mode 100644
index 0000000000..0e01bda8d0
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class HeartbeatTest extends QpidBrokerTestCase
+{
+ public void testHeartbeats() throws Exception
+ {
+ setTestSystemProperty("amqj.heartbeat.delay", "1");
+ AMQConnection conn = (AMQConnection) getConnection();
+ TestListener listener = new TestListener();
+ conn.setHeartbeatListener(listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2);
+ assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2);
+
+ conn.close();
+ }
+
+ public void testNoHeartbeats() throws Exception
+ {
+ setTestSystemProperty("amqj.heartbeat.delay", "0");
+ AMQConnection conn = (AMQConnection) getConnection();
+ TestListener listener = new TestListener();
+ conn.setHeartbeatListener(listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived);
+ assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent);
+
+ conn.close();
+ }
+
+ public void testReadOnlyConnectionHeartbeats() throws Exception
+ {
+ setTestSystemProperty("amqj.heartbeat.delay","1");
+ AMQConnection receiveConn = (AMQConnection) getConnection();
+ AMQConnection sendConn = (AMQConnection) getConnection();
+ Destination destination = getTestQueue();
+ TestListener receiveListener = new TestListener();
+ TestListener sendListener = new TestListener();
+
+
+ Session receiveSession = receiveConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Session senderSession = sendConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer consumer = receiveSession.createConsumer(destination);
+ MessageProducer producer = senderSession.createProducer(destination);
+
+ receiveConn.setHeartbeatListener(receiveListener);
+ sendConn.setHeartbeatListener(sendListener);
+ receiveConn.start();
+
+ for(int i = 0; i < 5; i++)
+ {
+ producer.send(senderSession.createTextMessage("Msg " + i));
+ Thread.sleep(500);
+ assertNotNull("Expected to received message", consumer.receive(500));
+ }
+
+
+
+ assertTrue("Too few heartbeats sent "+receiveListener._heartbeatsSent+" (expected at least 2)", receiveListener._heartbeatsSent>=2);
+ assertEquals("Unexpected sent at the sender: ",0,sendListener._heartbeatsSent);
+
+ assertTrue("Too few heartbeats received at the sender "+sendListener._heartbeatsReceived+" (expected at least 2)", sendListener._heartbeatsReceived>=2);
+ assertEquals("Unexpected received at the receiver: ",0,receiveListener._heartbeatsReceived);
+
+ receiveConn.close();
+ sendConn.close();
+ }
+
+ private class TestListener implements HeartbeatListener
+ {
+ int _heartbeatsReceived;
+ int _heartbeatsSent;
+ @Override
+ public void heartbeatReceived()
+ {
+ _heartbeatsReceived++;
+ }
+
+ @Override
+ public void heartbeatSent()
+ {
+ _heartbeatsSent++;
+ }
+ }
+}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index fa3a2bc262..bfcc06a1bc 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -181,3 +181,7 @@ org.apache.qpid.client.failover.MultipleBrokersFailoverTest#*
// Uses Java broker specific configuration
org.apache.qpid.client.ssl.SSLTest#testClientCertMissingWhilstWanting
+
+// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based
+org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats
+
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index ca2383a8f3..3f12076dbe 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -64,3 +64,6 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro
// QPID-3604: Immediate Prefetch no longer supported by 0-10
org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener
+
+// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based
+org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats