summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java32
1 files changed, 27 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index af89000c5c..816caac824 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,6 +179,9 @@ public class AMQProtocolHandler implements ProtocolEngine
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private long _lastReadTime = System.currentTimeMillis();
+ private long _lastWriteTime = System.currentTimeMillis();
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -300,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
- HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
@@ -309,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
+ _heartbeatListener.heartbeatSent();
}
/**
@@ -442,6 +445,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public void received(ByteBuffer msg)
{
_readBytes += msg.remaining();
+ _lastReadTime = System.currentTimeMillis();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -470,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine
final AMQBody bodyFrame = frame.getBodyFrame();
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_readBytes);
@@ -560,6 +562,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
final ByteBuffer buf = asByteBuffer(frame);
+ _lastWriteTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
if(flush)
@@ -882,6 +885,18 @@ public class AMQProtocolHandler implements ProtocolEngine
_sender = sender;
}
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
protected Sender<ByteBuffer> getSender()
{
return _sender;
@@ -894,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
@@ -909,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine
}
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
+ public void heartbeatBodyReceived()
+ {
+ _heartbeatListener.heartbeatReceived();
+ }
}