summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java55
1 files changed, 53 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index d7d26cc772..7d263b517c 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -44,6 +44,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private ServerConnection _connection;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
+ private long _lastReadTime;
+ private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network,
@@ -71,13 +73,61 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
_network = network;
- _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+ _connection.setNetworkConnection(network);
+ _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
_connection.setPeerPrincipal(_network.getPeerPrincipal());
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
_connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
}
+ private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+ {
+ return new Sender<ByteBuffer>()
+ {
+ @Override
+ public void setIdleTimeout(int i)
+ {
+ sender.setIdleTimeout(i);
+
+ }
+
+ @Override
+ public void send(ByteBuffer msg)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ sender.send(msg);
+
+ }
+
+ @Override
+ public void flush()
+ {
+ sender.flush();
+
+ }
+
+ @Override
+ public void close()
+ {
+ sender.close();
+
+ }
+ };
+ }
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -90,6 +140,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void received(final ByteBuffer buf)
{
+ _lastReadTime = System.currentTimeMillis();
super.received(buf);
_connection.receivedComplete();
}
@@ -106,7 +157,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void writerIdle()
{
- //Todo
+ _connection.doHeartbeat();
}
public void readerIdle()