diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/Connection.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/Connection.java | 88 |
1 files changed, 74 insertions, 14 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index e87851cf7d..cdca726148 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -21,12 +21,7 @@ package org.apache.qpid.transport; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.*; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.network.security.SecurityLayerFactory; import org.apache.qpid.transport.util.Logger; @@ -73,6 +68,8 @@ public class Connection extends ConnectionInvoker //Usable channels are numbered 0 to <ChannelMax> - 1 public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 0; + private long _lastSendTime; + private long _lastReadTime; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -89,15 +86,15 @@ public class Connection extends ConnectionInvoker public static interface SessionFactory { - Session newSession(Connection conn, Binary name, long expiry); + Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay); } private static final class DefaultSessionFactory implements SessionFactory { - public Session newSession(final Connection conn, final Binary name, final long expiry) + public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay) { - return new Session(conn, name, expiry); + return new Session(conn, name, expiry, isNoReplay); } } @@ -232,9 +229,10 @@ public class Connection extends ConnectionInvoker addConnectionListener((ConnectionListener)secureReceiver); } - NetworkConnection network = transport.connect(settings, secureReceiver, null); - _remoteAddress = network.getRemoteAddress(); - _localAddress = network.getLocalAddress(); + NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity()); + + setRemoteAddress(network.getRemoteAddress()); + setLocalAddress(network.getLocalAddress()); final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender()); if(secureSender instanceof ConnectionListener) @@ -298,7 +296,12 @@ public class Connection extends ConnectionInvoker public Session createSession(long expiry) { - return createSession(UUID.randomUUID().toString(), expiry); + return createSession(expiry, false); + } + + public Session createSession(long expiry, boolean isNoReplay) + { + return createSession(UUID.randomUUID().toString(), expiry, isNoReplay); } public Session createSession(String name) @@ -311,6 +314,11 @@ public class Connection extends ConnectionInvoker return createSession(Strings.toUTF8(name), expiry); } + public Session createSession(String name, long expiry,boolean isNoReplay) + { + return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay); + } + public Session createSession(byte[] name, long expiry) { return createSession(new Binary(name), expiry); @@ -318,6 +326,11 @@ public class Connection extends ConnectionInvoker public Session createSession(Binary name, long expiry) { + return createSession(name, expiry, false); + } + + public Session createSession(Binary name, long expiry, boolean isNoReplay) + { synchronized (lock) { Waiter w = new Waiter(lock, timeout); @@ -331,7 +344,7 @@ public class Connection extends ConnectionInvoker throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state); } - Session ssn = _sessionFactory.newSession(this, name, expiry); + Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay); registerSession(ssn); map(ssn); ssn.attach(); @@ -369,6 +382,7 @@ public class Connection extends ConnectionInvoker public void received(ProtocolEvent event) { + _lastReadTime = System.currentTimeMillis(); if(log.isDebugEnabled()) { log.debug("RECV: [%s] %s", this, event); @@ -378,6 +392,7 @@ public class Connection extends ConnectionInvoker public void send(ProtocolEvent event) { + _lastSendTime = System.currentTimeMillis(); if(log.isDebugEnabled()) { log.debug("SEND: [%s] %s", this, event); @@ -728,6 +743,17 @@ public class Connection extends ConnectionInvoker return _localAddress; } + protected void setRemoteAddress(SocketAddress remoteAddress) + { + _remoteAddress = remoteAddress; + } + + protected void setLocalAddress(SocketAddress localAddress) + { + _localAddress = localAddress; + } + + private void invokeSessionDetached(int channel, SessionDetachCode sessionDetachCode) { SessionDetached sessionDetached = new SessionDetached(); @@ -735,4 +761,38 @@ public class Connection extends ConnectionInvoker sessionDetached.setCode(sessionDetachCode); invoke(sessionDetached); } + + + protected void doHeartBeat() + { + connectionHeartbeat(); + } + + private class ConnectionActivity implements TransportActivity + { + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastSendTime; + } + + @Override + public void writerIdle() + { + connectionHeartbeat(); + } + + @Override + public void readerIdle() + { + // TODO + + } + } } |