summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/transport/Connection.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/Connection.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java88
1 files changed, 74 insertions, 14 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index e87851cf7d..cdca726148 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -21,12 +21,7 @@
package org.apache.qpid.transport;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.*;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Logger;
@@ -73,6 +68,8 @@ public class Connection extends ConnectionInvoker
//Usable channels are numbered 0 to <ChannelMax> - 1
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
+ private long _lastSendTime;
+ private long _lastReadTime;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -89,15 +86,15 @@ public class Connection extends ConnectionInvoker
public static interface SessionFactory
{
- Session newSession(Connection conn, Binary name, long expiry);
+ Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay);
}
private static final class DefaultSessionFactory implements SessionFactory
{
- public Session newSession(final Connection conn, final Binary name, final long expiry)
+ public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay)
{
- return new Session(conn, name, expiry);
+ return new Session(conn, name, expiry, isNoReplay);
}
}
@@ -232,9 +229,10 @@ public class Connection extends ConnectionInvoker
addConnectionListener((ConnectionListener)secureReceiver);
}
- NetworkConnection network = transport.connect(settings, secureReceiver, null);
- _remoteAddress = network.getRemoteAddress();
- _localAddress = network.getLocalAddress();
+ NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
+
+ setRemoteAddress(network.getRemoteAddress());
+ setLocalAddress(network.getLocalAddress());
final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
if(secureSender instanceof ConnectionListener)
@@ -298,7 +296,12 @@ public class Connection extends ConnectionInvoker
public Session createSession(long expiry)
{
- return createSession(UUID.randomUUID().toString(), expiry);
+ return createSession(expiry, false);
+ }
+
+ public Session createSession(long expiry, boolean isNoReplay)
+ {
+ return createSession(UUID.randomUUID().toString(), expiry, isNoReplay);
}
public Session createSession(String name)
@@ -311,6 +314,11 @@ public class Connection extends ConnectionInvoker
return createSession(Strings.toUTF8(name), expiry);
}
+ public Session createSession(String name, long expiry,boolean isNoReplay)
+ {
+ return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay);
+ }
+
public Session createSession(byte[] name, long expiry)
{
return createSession(new Binary(name), expiry);
@@ -318,6 +326,11 @@ public class Connection extends ConnectionInvoker
public Session createSession(Binary name, long expiry)
{
+ return createSession(name, expiry, false);
+ }
+
+ public Session createSession(Binary name, long expiry, boolean isNoReplay)
+ {
synchronized (lock)
{
Waiter w = new Waiter(lock, timeout);
@@ -331,7 +344,7 @@ public class Connection extends ConnectionInvoker
throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
}
- Session ssn = _sessionFactory.newSession(this, name, expiry);
+ Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay);
registerSession(ssn);
map(ssn);
ssn.attach();
@@ -369,6 +382,7 @@ public class Connection extends ConnectionInvoker
public void received(ProtocolEvent event)
{
+ _lastReadTime = System.currentTimeMillis();
if(log.isDebugEnabled())
{
log.debug("RECV: [%s] %s", this, event);
@@ -378,6 +392,7 @@ public class Connection extends ConnectionInvoker
public void send(ProtocolEvent event)
{
+ _lastSendTime = System.currentTimeMillis();
if(log.isDebugEnabled())
{
log.debug("SEND: [%s] %s", this, event);
@@ -728,6 +743,17 @@ public class Connection extends ConnectionInvoker
return _localAddress;
}
+ protected void setRemoteAddress(SocketAddress remoteAddress)
+ {
+ _remoteAddress = remoteAddress;
+ }
+
+ protected void setLocalAddress(SocketAddress localAddress)
+ {
+ _localAddress = localAddress;
+ }
+
+
private void invokeSessionDetached(int channel, SessionDetachCode sessionDetachCode)
{
SessionDetached sessionDetached = new SessionDetached();
@@ -735,4 +761,38 @@ public class Connection extends ConnectionInvoker
sessionDetached.setCode(sessionDetachCode);
invoke(sessionDetached);
}
+
+
+ protected void doHeartBeat()
+ {
+ connectionHeartbeat();
+ }
+
+ private class ConnectionActivity implements TransportActivity
+ {
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastSendTime;
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ connectionHeartbeat();
+ }
+
+ @Override
+ public void readerIdle()
+ {
+ // TODO
+
+ }
+ }
}