/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.transport; import static org.apache.qpid.transport.Connection.State.CLOSED; import static org.apache.qpid.transport.Connection.State.CLOSING; import static org.apache.qpid.transport.Connection.State.NEW; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.network.security.SecurityLayerFactory; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; import org.apache.qpid.util.Strings; /** * Connection * * @author Rafael H. Schloming * * @todo the channels map should probably be replaced with something * more efficient, e.g. an array or a map implementation that can use * short instead of Short */ public class Connection extends ConnectionInvoker implements Receiver, Sender { protected static final Logger log = Logger.get(Connection.class); //Usable channels are numbered 0 to - 1 public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 0; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } static class DefaultConnectionListener implements ConnectionListener { public void opened(Connection conn) {} public void exception(Connection conn, ConnectionException exception) { log.error(exception, "connection exception"); } public void closed(Connection conn) {} } public static interface SessionFactory { Session newSession(Connection conn, Binary name, long expiry); } private static final class DefaultSessionFactory implements SessionFactory { public Session newSession(final Connection conn, final Binary name, final long expiry) { return new Session(conn, name, expiry); } } private static final SessionFactory DEFAULT_SESSION_FACTORY = new DefaultSessionFactory(); private SessionFactory _sessionFactory = DEFAULT_SESSION_FACTORY; private ConnectionDelegate delegate; private Sender sender; final private Map sessions = new HashMap(); final private Map channels = new HashMap(); private State state = NEW; final private Object lock = new Object(); private long timeout = 60000; private List listeners = new ArrayList(); private ConnectionException error = null; private int channelMax = 1; private String locale; private SaslServer saslServer; private SaslClient saslClient; private int idleTimeout = 0; private Map _serverProperties; private String userID; private ConnectionSettings conSettings; private SecurityLayer securityLayer; private String _clientId; private final AtomicBoolean connectionLost = new AtomicBoolean(false); public Connection() {} public void setConnectionDelegate(ConnectionDelegate delegate) { this.delegate = delegate; } public void addConnectionListener(ConnectionListener listener) { listeners.add(listener); } public Sender getSender() { return sender; } public void setSender(Sender sender) { this.sender = sender; sender.setIdleTimeout(idleTimeout); } protected void setState(State state) { synchronized (lock) { this.state = state; lock.notifyAll(); } } public String getClientId() { return _clientId; } public void setClientId(String id) { _clientId = id; } void setLocale(String locale) { this.locale = locale; } String getLocale() { return locale; } void setSaslServer(SaslServer saslServer) { this.saslServer = saslServer; } SaslServer getSaslServer() { return saslServer; } void setSaslClient(SaslClient saslClient) { this.saslClient = saslClient; } public SaslClient getSaslClient() { return saslClient; } public void connect(String host, int port, String vhost, String username, String password) { connect(host, port, vhost, username, password, false); } public void connect(String host, int port, String vhost, String username, String password, boolean ssl) { connect(host, port, vhost, username, password, ssl,"PLAIN"); } public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs) { connect(host, port, vhost, username, password, ssl,saslMechs, Collections.EMPTY_MAP); } public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs,Map clientProps) { ConnectionSettings settings = new ConnectionSettings(); settings.setHost(host); settings.setPort(port); settings.setVhost(vhost); settings.setUsername(username); settings.setPassword(password); settings.setUseSSL(ssl); settings.setSaslMechs(saslMechs); settings.setClientProperties(clientProps); connect(settings); } public void connect(ConnectionSettings settings) { synchronized (lock) { conSettings = settings; state = OPENING; userID = settings.getUsername(); securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings()); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); Receiver secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this))); if(secureReceiver instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureReceiver); } NetworkConnection network = transport.connect(settings, secureReceiver, null); final Sender secureSender = securityLayer.sender(network.getSender()); if(secureSender instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureSender); } sender = new Disassembler(secureSender, settings.getMaxFrameSize()); send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); while (w.hasTime() && state == OPENING && error == null) { w.await(); } if (error != null) { ConnectionException t = error; error = null; try { close(); } catch (ConnectionException ce) { if (!(t instanceof ProtocolVersionException)) { throw ce; } } t.rethrow(); } switch (state) { case OPENING: close(); throw new ConnectionException("connect() timed out"); case OPEN: case RESUMING: connectionLost.set(false); break; case CLOSED: throw new ConnectionException("connect() aborted"); default: throw new IllegalStateException(String.valueOf(state)); } } for (ConnectionListener listener: listeners) { listener.opened(this); } } public Session createSession() { return createSession(0); } public Session createSession(long expiry) { return createSession(UUID.randomUUID().toString(), expiry); } public Session createSession(String name) { return createSession(name, 0); } public Session createSession(String name, long expiry) { return createSession(Strings.toUTF8(name), expiry); } public Session createSession(byte[] name, long expiry) { return createSession(new Binary(name), expiry); } public Session createSession(Binary name, long expiry) { synchronized (lock) { Waiter w = new Waiter(lock, timeout); while (w.hasTime() && state != OPEN && error == null) { w.await(); } if (state != OPEN) { throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state); } Session ssn = _sessionFactory.newSession(this, name, expiry); registerSession(ssn); map(ssn); ssn.attach(); return ssn; } } public void registerSession(Session ssn) { synchronized (lock) { sessions.put(ssn.getName(),ssn); } } public void removeSession(Session ssn) { synchronized (lock) { sessions.remove(ssn.getName()); } } public void setSessionFactory(SessionFactory sessionFactory) { assert sessionFactory != null; _sessionFactory = sessionFactory; } public ConnectionDelegate getConnectionDelegate() { return delegate; } public void received(ProtocolEvent event) { if(log.isDebugEnabled()) { log.debug("RECV: [%s] %s", this, event); } event.delegate(this, delegate); } public void send(ProtocolEvent event) { if(log.isDebugEnabled()) { log.debug("SEND: [%s] %s", this, event); } Sender s = sender; if (s == null) { throw new ConnectionException("connection closed"); } s.send(event); } public void flush() { if(log.isDebugEnabled()) { log.debug("FLUSH: [%s]", this); } final Sender theSender = sender; if(theSender != null) { theSender.flush(); } } protected void invoke(Method method) { method.setChannel(0); send(method); if (!method.isBatch()) { flush(); } } public void dispatch(Method method) { Session ssn = getSession(method.getChannel()); if(ssn != null) { ssn.received(method); } else { throw new ProtocolViolationException( "Received frames for an already detached session", null); } } public int getChannelMax() { return channelMax; } void setChannelMax(int max) { channelMax = max; } private int map(Session ssn) { synchronized (lock) { //For a negotiated channelMax N, there are channels 0 to N-1 available. for (int i = 0; i < getChannelMax(); i++) { if (!channels.containsKey(i)) { map(ssn, i); return i; } } throw new RuntimeException("no more channels available"); } } void map(Session ssn, int channel) { synchronized (lock) { channels.put(channel, ssn); ssn.setChannel(channel); } } void unmap(Session ssn) { synchronized (lock) { channels.remove(ssn.getChannel()); } } public Session getSession(int channel) { synchronized (lock) { return channels.get(channel); } } public void resume() { synchronized (lock) { for (Session ssn : sessions.values()) { map(ssn); ssn.resume(); } setState(OPEN); } } public void exception(ConnectionException e) { connectionLost.set(true); synchronized (lock) { switch (state) { case OPENING: case CLOSING: error = e; lock.notifyAll(); return; } } for (ConnectionListener listener: listeners) { listener.exception(this, e); } } public void exception(Throwable t) { exception(new ConnectionException(t)); } void closeCode(ConnectionClose close) { synchronized (lock) { ConnectionCloseCode code = close.getReplyCode(); if (code != ConnectionCloseCode.NORMAL) { exception(new ConnectionException(close)); } } } public void closed() { if (state == OPEN) { exception(new ConnectionException("connection aborted")); } log.debug("connection closed: %s", this); synchronized (lock) { List values = new ArrayList(channels.values()); for (Session ssn : values) { ssn.closed(); } try { sender.close(); } catch(Exception e) { // ignore. } sender = null; setState(CLOSED); } for (ConnectionListener listener: listeners) { listener.closed(this); } } public void close() { close(ConnectionCloseCode.NORMAL, null); } public void mgmtClose() { close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface."); } public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options) { synchronized (lock) { switch (state) { case OPEN: state = CLOSING; connectionClose(replyCode, replyText, _options); Waiter w = new Waiter(lock, timeout); while (w.hasTime() && state == CLOSING && error == null) { w.await(); } if (error != null) { close(replyCode, replyText, _options); throw new ConnectionException(error); } switch (state) { case CLOSING: close(replyCode, replyText, _options); throw new ConnectionException("close() timed out"); case CLOSED: break; default: throw new IllegalStateException(String.valueOf(state)); } break; case CLOSED: break; default: if (sender != null) { sender.close(); w = new Waiter(lock, timeout); while (w.hasTime() && sender != null && error == null) { w.await(); } if (error != null) { throw new ConnectionException(error); } if (sender != null) { throw new ConnectionException("close() timed out"); } } break; } } } public void setIdleTimeout(int i) { idleTimeout = i; if (sender != null) { sender.setIdleTimeout(i); } } public int getIdleTimeout() { return idleTimeout; } public String getUserID() { return userID; } public void setUserID(String id) { userID = id; } public void setServerProperties(final Map serverProperties) { _serverProperties = serverProperties == null ? Collections.EMPTY_MAP : serverProperties; } public Map getServerProperties() { return _serverProperties; } public String toString() { return String.format("conn:%x", System.identityHashCode(this)); } public ConnectionSettings getConnectionSettings() { return conSettings; } public SecurityLayer getSecurityLayer() { return securityLayer; } public boolean isConnectionResuming() { return connectionLost.get(); } protected Collection getChannels() { return channels.values(); } public boolean hasSessionWithName(final byte[] name) { return sessions.containsKey(new Binary(name)); } public void notifyFailoverRequired() { List values = new ArrayList(channels.values()); for (Session ssn : values) { ssn.notifyFailoverRequired(); } } }