diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java | 85 |
1 files changed, 48 insertions, 37 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index fa3c1737a7..098a30175f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.transport; -import static org.apache.qpid.transport.Connection.State.CLOSED; -import static org.apache.qpid.transport.Connection.State.CLOSING; -import static org.apache.qpid.transport.Connection.State.NEW; -import static org.apache.qpid.transport.Connection.State.OPEN; -import static org.apache.qpid.transport.Connection.State.OPENING; +import static org.apache.qpid.transport.Connection.State.*; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -37,17 +36,21 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; import org.apache.qpid.util.Strings; - /** * Connection * - * @author Rafael H. Schloming - * * @todo the channels map should probably be replaced with something * more efficient, e.g. an array or a map implementation that can use * short instead of Short @@ -56,10 +59,8 @@ import org.apache.qpid.util.Strings; public class Connection extends ConnectionInvoker implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { - protected static final Logger log = Logger.get(Connection.class); - public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } static class DefaultConnectionListener implements ConnectionListener @@ -79,7 +80,6 @@ public class Connection extends ConnectionInvoker private static final class DefaultSessionFactory implements SessionFactory { - public Session newSession(final Connection conn, final Binary name, final long expiry) { return new Session(conn, name, expiry); @@ -113,10 +113,8 @@ public class Connection extends ConnectionInvoker private ConnectionSettings conSettings; private SecurityLayer securityLayer; private String _clientId; + private NetworkConnection network; - private static final AtomicLong idGenerator = new AtomicLong(0); - private final long _connectionId = idGenerator.incrementAndGet(); - public Connection() {} public void setConnectionDelegate(ConnectionDelegate delegate) @@ -196,16 +194,20 @@ public class Connection extends ConnectionInvoker public void connect(String host, int port, String vhost, String username, String password, boolean ssl) { - connect(host, port, vhost, username, password, ssl,"PLAIN"); + connect(host, port, vhost, username, password, ssl, "PLAIN"); } - public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs) + public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs) { - connect(host, port, vhost, username, password, ssl,saslMechs, Collections.EMPTY_MAP); + connect(host, port, vhost, username, password, ssl, saslMechs, "TCP"); } + public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, String protocol) + { + connect(host, port, vhost, username, password, ssl, saslMechs, protocol, Collections.EMPTY_MAP); + } - public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs,Map<String,Object> clientProps) + public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, String protocol, Map<String,Object> clientProps) { ConnectionSettings settings = new ConnectionSettings(); settings.setHost(host); @@ -216,24 +218,26 @@ public class Connection extends ConnectionInvoker settings.setUseSSL(ssl); settings.setSaslMechs(saslMechs); settings.setClientProperties(clientProps); - connect(settings); + settings.setProtocol(protocol); + connect(settings, null); } - public void connect(ConnectionSettings settings) + public void connect(ConnectionSettings settings, SSLContextFactory sslFactory) { - synchronized (lock) { conSettings = settings; state = OPENING; userID = settings.getUsername(); delegate = new ClientDelegate(settings); + + securityLayer = new SecurityLayer(); + securityLayer.init(this); - TransportBuilder transport = new TransportBuilder(); - transport.init(this); - this.sender = transport.buildSenderPipe(); - transport.buildReceiverPipe(this); - this.securityLayer = transport.getSecurityLayer(); + OutgoingNetworkTransport transport = Transport.getOutgoingTransport(); + Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this))); + network = transport.connect(settings, receiver, sslFactory); + sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize()); send(new ProtocolHeader(1, 0, 10)); @@ -333,11 +337,6 @@ public class Connection extends ConnectionInvoker _sessionFactory = sessionFactory; } - public long getConnectionId() - { - return _connectionId; - } - public ConnectionDelegate getConnectionDelegate() { return delegate; @@ -346,18 +345,25 @@ public class Connection extends ConnectionInvoker public void received(ProtocolEvent event) { log.debug("RECV: [%s] %s", this, event); - event.delegate(this, delegate); + try + { + event.delegate(this, delegate); + } + catch (RuntimeException re) + { + closed(); + throw re; + } } public void send(ProtocolEvent event) { log.debug("SEND: [%s] %s", this, event); - Sender s = sender; - if (s == null) + if (sender == null) { throw new ConnectionException("connection closed"); } - s.send(event); + sender.send(event); } public void flush() @@ -386,7 +392,8 @@ public class Connection extends ConnectionInvoker else { throw new ProtocolViolationException( - "Received frames for an already dettached session", null); + String.format("Received frames for a detached session on connection:%s channel:%d", this, method.getChannel()), + null); } } @@ -448,7 +455,7 @@ public class Connection extends ConnectionInvoker { for (Session ssn : sessions.values()) { - map(ssn); + map(ssn, ssn.getChannel()); ssn.attach(); ssn.resume(); } @@ -660,4 +667,8 @@ public class Connection extends ConnectionInvoker return securityLayer; } + public Collection<Session> getChannels() + { + return channels.values(); + } } |