summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java85
1 files changed, 48 insertions, 37 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index fa3c1737a7..098a30175f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.transport;
-import static org.apache.qpid.transport.Connection.State.CLOSED;
-import static org.apache.qpid.transport.Connection.State.CLOSING;
-import static org.apache.qpid.transport.Connection.State.NEW;
-import static org.apache.qpid.transport.Connection.State.OPEN;
-import static org.apache.qpid.transport.Connection.State.OPENING;
+import static org.apache.qpid.transport.Connection.State.*;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -37,17 +36,21 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
import org.apache.qpid.util.Strings;
-
/**
* Connection
*
- * @author Rafael H. Schloming
- *
* @todo the channels map should probably be replaced with something
* more efficient, e.g. an array or a map implementation that can use
* short instead of Short
@@ -56,10 +59,8 @@ import org.apache.qpid.util.Strings;
public class Connection extends ConnectionInvoker
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
-
protected static final Logger log = Logger.get(Connection.class);
-
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
static class DefaultConnectionListener implements ConnectionListener
@@ -79,7 +80,6 @@ public class Connection extends ConnectionInvoker
private static final class DefaultSessionFactory implements SessionFactory
{
-
public Session newSession(final Connection conn, final Binary name, final long expiry)
{
return new Session(conn, name, expiry);
@@ -113,10 +113,8 @@ public class Connection extends ConnectionInvoker
private ConnectionSettings conSettings;
private SecurityLayer securityLayer;
private String _clientId;
+ private NetworkConnection network;
- private static final AtomicLong idGenerator = new AtomicLong(0);
- private final long _connectionId = idGenerator.incrementAndGet();
-
public Connection() {}
public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -196,16 +194,20 @@ public class Connection extends ConnectionInvoker
public void connect(String host, int port, String vhost, String username, String password, boolean ssl)
{
- connect(host, port, vhost, username, password, ssl,"PLAIN");
+ connect(host, port, vhost, username, password, ssl, "PLAIN");
}
- public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs)
+ public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs)
{
- connect(host, port, vhost, username, password, ssl,saslMechs, Collections.EMPTY_MAP);
+ connect(host, port, vhost, username, password, ssl, saslMechs, "TCP");
}
+ public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, String protocol)
+ {
+ connect(host, port, vhost, username, password, ssl, saslMechs, protocol, Collections.EMPTY_MAP);
+ }
- public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs,Map<String,Object> clientProps)
+ public void connect(String host, int port, String vhost, String username, String password, boolean ssl, String saslMechs, String protocol, Map<String,Object> clientProps)
{
ConnectionSettings settings = new ConnectionSettings();
settings.setHost(host);
@@ -216,24 +218,26 @@ public class Connection extends ConnectionInvoker
settings.setUseSSL(ssl);
settings.setSaslMechs(saslMechs);
settings.setClientProperties(clientProps);
- connect(settings);
+ settings.setProtocol(protocol);
+ connect(settings, null);
}
- public void connect(ConnectionSettings settings)
+ public void connect(ConnectionSettings settings, SSLContextFactory sslFactory)
{
-
synchronized (lock)
{
conSettings = settings;
state = OPENING;
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
+
+ securityLayer = new SecurityLayer();
+ securityLayer.init(this);
- TransportBuilder transport = new TransportBuilder();
- transport.init(this);
- this.sender = transport.buildSenderPipe();
- transport.buildReceiverPipe(this);
- this.securityLayer = transport.getSecurityLayer();
+ OutgoingNetworkTransport transport = Transport.getOutgoingTransport();
+ Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+ network = transport.connect(settings, receiver, sslFactory);
+ sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize());
send(new ProtocolHeader(1, 0, 10));
@@ -333,11 +337,6 @@ public class Connection extends ConnectionInvoker
_sessionFactory = sessionFactory;
}
- public long getConnectionId()
- {
- return _connectionId;
- }
-
public ConnectionDelegate getConnectionDelegate()
{
return delegate;
@@ -346,18 +345,25 @@ public class Connection extends ConnectionInvoker
public void received(ProtocolEvent event)
{
log.debug("RECV: [%s] %s", this, event);
- event.delegate(this, delegate);
+ try
+ {
+ event.delegate(this, delegate);
+ }
+ catch (RuntimeException re)
+ {
+ closed();
+ throw re;
+ }
}
public void send(ProtocolEvent event)
{
log.debug("SEND: [%s] %s", this, event);
- Sender s = sender;
- if (s == null)
+ if (sender == null)
{
throw new ConnectionException("connection closed");
}
- s.send(event);
+ sender.send(event);
}
public void flush()
@@ -386,7 +392,8 @@ public class Connection extends ConnectionInvoker
else
{
throw new ProtocolViolationException(
- "Received frames for an already dettached session", null);
+ String.format("Received frames for a detached session on connection:%s channel:%d", this, method.getChannel()),
+ null);
}
}
@@ -448,7 +455,7 @@ public class Connection extends ConnectionInvoker
{
for (Session ssn : sessions.values())
{
- map(ssn);
+ map(ssn, ssn.getChannel());
ssn.attach();
ssn.resume();
}
@@ -660,4 +667,8 @@ public class Connection extends ConnectionInvoker
return securityLayer;
}
+ public Collection<Session> getChannels()
+ {
+ return channels.values();
+ }
}