diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-05-11 14:27:01 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-05-11 14:27:01 +0000 |
commit | 90593b58d192ce16ab9945a279ddf12a09f8e0e2 (patch) | |
tree | 94b940b35178ec8d1cec63552a32dab7aa3b0deb /java | |
parent | 779e1b19631d1eb62e9b7f89340f76736b462f7e (diff) | |
download | qpid-python-90593b58d192ce16ab9945a279ddf12a09f8e0e2.tar.gz |
QPID-4831 : [Java Broker] Allow SSL and non-SSL connections on the same port
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1481331 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
19 files changed, 888 insertions, 215 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 451754e6d8..a539743081 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -88,13 +88,6 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable } } } - synchronized (_listeners) - { - for(RegistryChangeListener listener : _listeners) - { - listener.connectionRegistered(connnection); - } - } } public void deregisterConnection(AMQConnectionModel connnection) @@ -111,14 +104,6 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable } } } - - synchronized (_listeners) - { - for(RegistryChangeListener listener : _listeners) - { - listener.connectionUnregistered(connnection); - } - } } public void addRegistryChangeListener(RegistryChangeListener listener) diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java index 1fc22c736c..ddfbf51322 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java @@ -99,9 +99,11 @@ public class AmqpPortAdapter extends PortAdapter _transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( - _broker, supported, defaultSupportedProtocolReply); + _broker, transports.contains(Transport.TCP) ? sslContext : null, + settings.wantClientAuth(), settings.needClientAuth(), + supported, defaultSupportedProtocolReply, this, transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL); - _transport.accept(settings, protocolEngineFactory, sslContext); + _transport.accept(settings, protocolEngineFactory, transports.contains(Transport.TCP) ? null : sslContext); for(Transport transport : getTransports()) { CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort())); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 36fafba1cd..b52da3039d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.stats.StatisticsGatherer; import java.util.List; @@ -82,4 +84,9 @@ public interface AMQConnectionModel extends StatisticsGatherer long getSessionCountLimit(); long getLastIoTime(); + + Port getPort(); + + Transport getTransport(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index a84a4783ac..4574f87b54 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -62,6 +62,8 @@ import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.queue.QueueEntry; @@ -86,6 +88,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; + private final Port _port; private AMQShortString _contextKey; @@ -153,11 +156,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Lock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); private final Broker _broker; + private final Transport _transport; - public AMQProtocolEngine(Broker broker, NetworkConnection network, final long connectionId) + public AMQProtocolEngine(Broker broker, + NetworkConnection network, + final long connectionId, + Port port, + Transport transport) { _broker = broker; + _port = port; + _transport = transport; _maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT); _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); @@ -1142,6 +1152,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _lastIoTime; } + @Override + public Port getPort() + { + return _port; + } + + @Override + public Transport getTransport() + { + return _transport; + } + public long getLastReceivedTime() { return _lastReceivedTime; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index d9e5e1c473..267857a34a 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -24,21 +24,35 @@ package org.apache.qpid.server.protocol; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.Principal; import java.util.Set; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLPeerUnverifiedException; import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender; +import org.apache.qpid.transport.network.security.ssl.SSLReceiver; public class MultiVersionProtocolEngine implements ServerProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); private final long _id; + private final SSLContext _sslContext; + private final boolean _wantClientAuth; + private final boolean _needClientAuth; + private final Port _port; + private final Transport _transport; private Set<AmqpProtocolVersion> _supported; private String _fqdn; @@ -50,19 +64,10 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(final Broker broker, + SSLContext sslContext, boolean wantClientAuth, boolean needClientAuth, final Set<AmqpProtocolVersion> supported, final AmqpProtocolVersion defaultSupportedReply, - final long id, - final NetworkConnection network) - { - this(broker, supported, defaultSupportedReply, id); - setNetworkConnection(network); - } - - public MultiVersionProtocolEngine(final Broker broker, - final Set<AmqpProtocolVersion> supported, - final AmqpProtocolVersion defaultSupportedReply, - final long id) + Port port, Transport transport, final long id) { if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply)) { @@ -74,6 +79,11 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _broker = broker; _supported = supported; _defaultSupportedReply = defaultSupportedReply; + _sslContext = sslContext; + _wantClientAuth = wantClientAuth; + _needClientAuth = needClientAuth; + _port = port; + _transport = transport; } @@ -252,7 +262,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_broker, _network, _id); + return new AMQProtocolEngine(_broker, _network, _id, _port, _transport); } }; @@ -272,7 +282,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_broker, _network, _id); + return new AMQProtocolEngine(_broker, _network, _id, _port, _transport); } }; @@ -292,7 +302,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_broker, _network, _id); + return new AMQProtocolEngine(_broker, _network, _id, _port, _transport); } }; @@ -321,7 +331,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine conn.setConnectionDelegate(connDelegate); conn.setRemoteAddress(_network.getRemoteAddress()); conn.setLocalAddress(_network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, _network); + return new ProtocolEngine_0_10( conn, _network, _port, _transport); } }; @@ -341,7 +351,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new ProtocolEngine_1_0_0(_network, _broker, _id); + return new ProtocolEngine_1_0_0(_network, _broker, _id, _port, _transport); } }; @@ -361,7 +371,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new ProtocolEngine_1_0_0_SASL(_network, _broker, _id); + return new ProtocolEngine_1_0_0_SASL(_network, _broker, _id, _port, _transport); } }; @@ -518,6 +528,14 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } } + if(newDelegate == null && looksLikeSSL(headerBytes)) + { + if(_sslContext != null) + { + newDelegate = new SslDelegateProtocolEngine(); + } + } + // If no delegate is found then send back a supported protocol version id if(newDelegate == null) { @@ -625,4 +643,218 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return 0; } } + + private class SslDelegateProtocolEngine implements ServerProtocolEngine + { + private final MultiVersionProtocolEngine _decryptEngine; + private final SSLEngine _engine; + private final SSLReceiver _sslReceiver; + private final SSLBufferingSender _sslSender; + + private SslDelegateProtocolEngine() + { + + _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported, + _defaultSupportedReply, _port, Transport.SSL, _id); + + _engine = _sslContext.createSSLEngine(); + _engine.setUseClientMode(false); + + if(_needClientAuth) + { + _engine.setNeedClientAuth(_needClientAuth); + } + else if(_wantClientAuth) + { + _engine.setWantClientAuth(_wantClientAuth); + } + + SSLStatus sslStatus = new SSLStatus(); + _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus); + _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus); + _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender)); + } + + @Override + public void received(ByteBuffer msg) + { + _sslReceiver.received(msg); + _sslSender.send(); + _sslSender.flush(); + } + + @Override + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + //TODO - Implement + } + + @Override + public SocketAddress getRemoteAddress() + { + return _decryptEngine.getRemoteAddress(); + } + + @Override + public SocketAddress getLocalAddress() + { + return _decryptEngine.getLocalAddress(); + } + + @Override + public long getWrittenBytes() + { + return _decryptEngine.getWrittenBytes(); + } + + @Override + public long getReadBytes() + { + return _decryptEngine.getReadBytes(); + } + + @Override + public void closed() + { + _decryptEngine.closed(); + } + + @Override + public void writerIdle() + { + _decryptEngine.writerIdle(); + } + + @Override + public void readerIdle() + { + _decryptEngine.readerIdle(); + } + + @Override + public void exception(Throwable t) + { + _decryptEngine.exception(t); + } + + @Override + public long getConnectionId() + { + return _decryptEngine.getConnectionId(); + } + + @Override + public long getLastReadTime() + { + return _decryptEngine.getLastReadTime(); + } + + @Override + public long getLastWriteTime() + { + return _decryptEngine.getLastWriteTime(); + } + } + + private boolean looksLikeSSL(byte[] headerBytes) + { + return headerBytes[0] == 22 && // SSL Handshake + (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[2] == 0 || // SSL 3.0 + headerBytes[2] == 1 || // TLS 1.0 + headerBytes[2] == 2 || // TLS 1.1 + headerBytes[2] == 3)) && // TLS1.2 + (headerBytes[5] == 1); // client_hello + } + + private static class SSLNetworkConnection implements NetworkConnection + { + private final NetworkConnection _network; + private final SSLBufferingSender _sslSender; + private final SSLEngine _engine; + + public SSLNetworkConnection(SSLEngine engine, NetworkConnection network, + SSLBufferingSender sslSender) + { + _engine = engine; + _network = network; + _sslSender = sslSender; + + } + + @Override + public Sender<ByteBuffer> getSender() + { + return _sslSender; + } + + @Override + public void start() + { + _network.start(); + } + + @Override + public void close() + { + _sslSender.close(); + + _network.close(); + } + + @Override + public SocketAddress getRemoteAddress() + { + return _network.getRemoteAddress(); + } + + @Override + public SocketAddress getLocalAddress() + { + return _network.getLocalAddress(); + } + + @Override + public void setMaxWriteIdle(int sec) + { + _network.setMaxWriteIdle(sec); + } + + @Override + public void setMaxReadIdle(int sec) + { + _network.setMaxReadIdle(sec); + } + + @Override + public void setPeerPrincipal(Principal principal) + { + _network.setPeerPrincipal(principal); + } + + @Override + public Principal getPeerPrincipal() + { + try + { + return _engine.getSession().getPeerPrincipal(); + } + catch (SSLPeerUnverifiedException e) + { + return null; + } + } + + @Override + public int getMaxReadIdle() + { + return _network.getMaxReadIdle(); + } + + @Override + public int getMaxWriteIdle() + { + return _network.getMaxWriteIdle(); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 9f078c8999..4b76546da1 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.server.protocol; +import javax.net.ssl.SSLContext; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { @@ -34,9 +37,20 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory private final Broker _broker; private final Set<AmqpProtocolVersion> _supported; private final AmqpProtocolVersion _defaultSupportedReply; + private final SSLContext _sslContext; + private final boolean _wantClientAuth; + private final boolean _needClientAuth; + private final Port _port; + private final Transport _transport; public MultiVersionProtocolEngineFactory(Broker broker, - final Set<AmqpProtocolVersion> supportedVersions, final AmqpProtocolVersion defaultSupportedReply) + SSLContext sslContext, + boolean wantClientAuth, + boolean needClientAuth, + final Set<AmqpProtocolVersion> supportedVersions, + final AmqpProtocolVersion defaultSupportedReply, + Port port, + Transport transport) { if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply)) { @@ -45,13 +59,20 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } _broker = broker; + _sslContext = sslContext; _supported = supportedVersions; _defaultSupportedReply = defaultSupportedReply; + _wantClientAuth = wantClientAuth; + _needClientAuth = needClientAuth; + _port = port; + _transport = transport; } public ServerProtocolEngine newProtocolEngine() { - return new MultiVersionProtocolEngine(_broker, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement()); + return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth, + _supported, _defaultSupportedReply, _port, _transport, + ID_GENERATOR.getAndIncrement() + ); } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index d5f7fe486c..8275ab690c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; @@ -47,11 +49,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime; public ProtocolEngine_0_10(ServerConnection conn, - NetworkConnection network) + NetworkConnection network, Port port, Transport transport) { super(new Assembler(conn)); _connection = conn; - + _connection.setPort(port); + _connection.setTransport(transport); if(network != null) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java index ed9cd324b4..c2210be935 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java @@ -40,6 +40,8 @@ import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.v1_0.Connection_1_0; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; @@ -50,6 +52,8 @@ import org.apache.qpid.transport.network.NetworkConnection; public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler { static final AtomicLong _connectionIdSource = new AtomicLong(0L); + private final Port _port; + private final Transport _transport; //private NetworkConnection _networkDriver; private long _readBytes; @@ -100,9 +104,15 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa - public ProtocolEngine_1_0_0(final NetworkConnection networkDriver, final Broker broker, long id) + public ProtocolEngine_1_0_0(final NetworkConnection networkDriver, + final Broker broker, + long id, + Port port, + Transport transport) { _broker = broker; + _port = port; + _transport = transport; _connectionId = id; if(networkDriver != null) { @@ -153,7 +163,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa _conn = new ConnectionEndpoint(container, asSaslServerProvider(_broker.getSubjectCreator( getLocalAddress()))); _conn.setRemoteAddress(_network.getRemoteAddress()); - _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId)); + _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport)); _conn.setFrameOutputHandler(this); _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java index 124eb779d5..8e64ca74f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java @@ -41,6 +41,8 @@ import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.v1_0.Connection_1_0; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; @@ -50,50 +52,52 @@ import org.apache.qpid.transport.network.NetworkConnection; public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler { - private long _readBytes; - private long _writtenBytes; - - private long _lastReadTime; - private long _lastWriteTime; - private final Broker _broker; - private long _createTime = System.currentTimeMillis(); - private ConnectionEndpoint _conn; - private long _connectionId; - - private static final ByteBuffer HEADER = - ByteBuffer.wrap(new byte[] - { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 3, - (byte) 1, - (byte) 0, - (byte) 0 - }); - - private static final ByteBuffer PROTOCOL_HEADER = - ByteBuffer.wrap(new byte[] - { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 0, - (byte) 1, - (byte) 0, - (byte) 0 - }); - - - private FrameWriter _frameWriter; - private ProtocolHandler _frameHandler; - private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); - private Object _sendLock = new Object(); - private byte _major; - private byte _minor; - private byte _revision; + private final Port _port; + private final Transport _transport; + private long _readBytes; + private long _writtenBytes; + + private long _lastReadTime; + private long _lastWriteTime; + private final Broker _broker; + private long _createTime = System.currentTimeMillis(); + private ConnectionEndpoint _conn; + private long _connectionId; + + private static final ByteBuffer HEADER = + ByteBuffer.wrap(new byte[] + { + (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte) 3, + (byte) 1, + (byte) 0, + (byte) 0 + }); + + private static final ByteBuffer PROTOCOL_HEADER = + ByteBuffer.wrap(new byte[] + { + (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte) 0, + (byte) 1, + (byte) 0, + (byte) 0 + }); + + + private FrameWriter _frameWriter; + private ProtocolHandler _frameHandler; + private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); + private Object _sendLock = new Object(); + private byte _major; + private byte _minor; + private byte _revision; private PrintWriter _out; private NetworkConnection _network; private Sender<ByteBuffer> _sender; @@ -111,14 +115,16 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut FRAME } - private State _state = State.A; + private State _state = State.A; public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker broker, - long id) + long id, Port port, Transport transport) { _connectionId = id; _broker = broker; + _port = port; + _transport = transport; if(networkDriver != null) { setNetworkConnection(networkDriver, networkDriver.getSender()); @@ -167,7 +173,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut SubjectCreator subjectCreator = _broker.getSubjectCreator(getLocalAddress()); _conn = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator)); _conn.setRemoteAddress(getRemoteAddress()); - _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId)); + _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport)); _conn.setFrameOutputHandler(this); _conn.setSaslFrameOutput(this); @@ -333,102 +339,102 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } - public void exception(Throwable t) - { - t.printStackTrace(); - } + public void exception(Throwable t) + { + t.printStackTrace(); + } - public void closed() - { - // todo + public void closed() + { + // todo _conn.inputClosed(); - if(_conn != null && _conn.getConnectionEventListener() != null) + if (_conn != null && _conn.getConnectionEventListener() != null) { - ((Connection_1_0)_conn.getConnectionEventListener()).closed(); + ((Connection_1_0) _conn.getConnectionEventListener()).closed(); } - } + } - public long getCreateTime() - { - return _createTime; - } + public long getCreateTime() + { + return _createTime; + } - public boolean canSend() - { - return true; - } + public boolean canSend() + { + return true; + } - public void send(final AMQFrame amqFrame) - { - send(amqFrame, null); - } + public void send(final AMQFrame amqFrame) + { + send(amqFrame, null); + } - private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); + private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); - public void send(final AMQFrame amqFrame, ByteBuffer buf) - { + public void send(final AMQFrame amqFrame, ByteBuffer buf) + { - synchronized(_sendLock) - { - _lastWriteTime = System.currentTimeMillis(); - if(FRAME_LOGGER.isLoggable(Level.FINE)) - { - FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); - } + synchronized (_sendLock) + { + _lastWriteTime = System.currentTimeMillis(); + if (FRAME_LOGGER.isLoggable(Level.FINE)) + { + FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); + } - _frameWriter.setValue(amqFrame); + _frameWriter.setValue(amqFrame); - ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); + ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); - int size = _frameWriter.writeToBuffer(dup); - if(size > _conn.getMaxFrameSize()) - { - throw new OversizeFrameException(amqFrame,size); - } + int size = _frameWriter.writeToBuffer(dup); + if (size > _conn.getMaxFrameSize()) + { + throw new OversizeFrameException(amqFrame, size); + } - dup.flip(); - _writtenBytes += dup.limit(); + dup.flip(); + _writtenBytes += dup.limit(); - if(RAW_LOGGER.isLoggable(Level.FINE)) - { - ByteBuffer dup2 = dup.duplicate(); - byte[] data = new byte[dup2.remaining()]; - dup2.get(data); - Binary bin = new Binary(data); - RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); - } + if (RAW_LOGGER.isLoggable(Level.FINE)) + { + ByteBuffer dup2 = dup.duplicate(); + byte[] data = new byte[dup2.remaining()]; + dup2.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString()); + } - _sender.send(dup); - _sender.flush(); + _sender.send(dup); + _sender.flush(); - } - } + } + } - public void send(short channel, FrameBody body) - { - AMQFrame frame = AMQFrame.createAMQFrame(channel, body); - send(frame); + public void send(short channel, FrameBody body) + { + AMQFrame frame = AMQFrame.createAMQFrame(channel, body); + send(frame); - } + } - public void close() - { - _sender.close(); - } + public void close() + { + _sender.close(); + } - public void setLogOutput(final PrintWriter out) - { - _out = out; - } + public void setLogOutput(final PrintWriter out) + { + _out = out; + } - public long getConnectionId() - { - return _connectionId; - } + public long getConnectionId() + { + return _connectionId; + } public long getLastReadTime() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index f79d34ea71..b274eabf04 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -29,6 +29,8 @@ import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.stats.StatisticsCounter; @@ -43,7 +45,9 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO public class Connection_1_0 implements ConnectionEventListener { + private final Port _port; private VirtualHost _vhost; + private final Transport _transport; private final ConnectionEndpoint _conn; private final long _connectionId; private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); @@ -60,9 +64,15 @@ public class Connection_1_0 implements ConnectionEventListener - public Connection_1_0(VirtualHost virtualHost, ConnectionEndpoint conn, long connectionId) + public Connection_1_0(VirtualHost virtualHost, + ConnectionEndpoint conn, + long connectionId, + Port port, + Transport transport) { _vhost = virtualHost; + _port = port; + _transport = transport; _conn = conn; _connectionId = connectionId; _vhost.getConnectionRegistry().registerConnection(_model); @@ -230,6 +240,18 @@ public class Connection_1_0 implements ConnectionEventListener } @Override + public Port getPort() + { + return _port; + } + + @Override + public Transport getTransport() + { + return _transport; + } + + @Override public void initialiseStatistics() { _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 58de6a0cdf..b49bd89266 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -35,6 +35,8 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; @@ -66,10 +68,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private final long _connectionId; private final Object _reference = new Object(); private VirtualHost _virtualHost; + private Port _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Principal _peerPrincipal; private NetworkConnection _networkConnection; + private Transport _transport; public ServerConnection(final long connectionId) { @@ -148,6 +152,28 @@ public class ServerConnection extends Connection implements AMQConnectionModel, initialiseStatistics(); } + @Override + public Port getPort() + { + return _port; + } + + public void setPort(Port port) + { + _port = port; + } + + @Override + public Transport getTransport() + { + return _transport; + } + + public void setTransport(Transport transport) + { + _transport = transport; + } + public void onOpen(final Runnable task) { _onOpenTask = task; @@ -451,6 +477,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return _lastIoTime.longValue(); } + public String getClientId() { return getConnectionDelegate().getClientId(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 3216f8886a..5d3051f7b2 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -60,7 +60,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr public InternalTestProtocolSession(VirtualHost virtualHost, Broker broker) throws AMQException { - super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement()); + super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null); _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 02b8c74feb..0d4b4ba9f8 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -154,7 +154,8 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class); MultiVersionProtocolEngineFactory factory = - new MultiVersionProtocolEngineFactory(_broker, versions, null); + new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, null, null, + org.apache.qpid.server.model.Transport.TCP); //create a dummy to retrieve the 'current' ID number long previousId = factory.newProtocolEngine().getConnectionId(); @@ -192,7 +193,8 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase try { - new MultiVersionProtocolEngineFactory(_broker, versions, AmqpProtocolVersion.v0_9); + new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, AmqpProtocolVersion.v0_9, null, + org.apache.qpid.server.model.Transport.TCP); fail("should not have been allowed to create the factory"); } catch(IllegalArgumentException iae) diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 65dbf7bae1..0bc20836f6 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -27,6 +27,8 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -539,5 +541,17 @@ public class MockSubscription implements Subscription { return 0; } + + @Override + public Port getPort() + { + return null; + } + + @Override + public Transport getTransport() + { + return null; + } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java index 89d434e95d..efb62ba085 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java @@ -73,7 +73,7 @@ public class SubscriptionFactoryImplTest extends QpidTestCase } /** - * Tests that while creating Subscriptions of various types, the + * Tests that while creating Subscriptions of various types, the * ID numbers assigned are allocated from a common sequence * (in increasing order). */ @@ -104,7 +104,7 @@ public class SubscriptionFactoryImplTest extends QpidTestCase //create a 0-10 subscription ServerConnection conn = new ServerConnection(1); - ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection()); + ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), null, null); conn.setVirtualHost(_session.getVirtualHost()); ServerSessionDelegate sesDel = new ServerSessionDelegate(); Binary name = new Binary(new byte[]{new Byte("1")}); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index c8027e143e..5742667dbe 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -239,7 +239,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet ticker); ticker.setConnection(connection); - if(_sslContext != null) + if(_sslContext != null && socket instanceof SSLSocket) { try { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java new file mode 100644 index 0000000000..0d36b96cd4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.security.ssl; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; +import javax.net.ssl.SSLException; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.util.Logger; + +public class SSLBufferingSender implements Sender<ByteBuffer> +{ + private static final Logger log = Logger.get(SSLBufferingSender.class); + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); + + private final Sender<ByteBuffer> delegate; + private final SSLEngine engine; + private final int sslBufSize; + private final ByteBuffer netData; + private final SSLStatus _sslStatus; + + private String _hostname; + + private final AtomicBoolean closed = new AtomicBoolean(false); + private ByteBuffer _appData = EMPTY_BYTE_BUFFER; + + + public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) + { + this.engine = engine; + this.delegate = delegate; + sslBufSize = engine.getSession().getPacketBufferSize(); + netData = ByteBuffer.allocate(sslBufSize); + _sslStatus = sslStatus; + } + + public void setHostname(String hostname) + { + _hostname = hostname; + } + + public void close() + { + if (!closed.getAndSet(true)) + { + if (engine.isOutboundDone()) + { + return; + } + log.debug("Closing SSL connection"); + + engine.closeOutbound(); + try + { + tearDownSSLConnection(); + } + catch(Exception e) + { + throw new SenderException("Error closing SSL connection",e); + } + + + synchronized(_sslStatus.getSslLock()) + { + while (!engine.isOutboundDone()) + { + try + { + _sslStatus.getSslLock().wait(); + } + catch(InterruptedException e) + { + // pass + } + + } + } + delegate.close(); + } + } + + private void tearDownSSLConnection() throws Exception + { + SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData); + Status status = result.getStatus(); + int read = result.bytesProduced(); + while (status != Status.CLOSED) + { + if (status == Status.BUFFER_OVERFLOW) + { + netData.clear(); + } + if(read > 0) + { + int limit = netData.limit(); + netData.limit(netData.position()); + netData.position(netData.position() - read); + + ByteBuffer data = netData.slice(); + + netData.limit(limit); + netData.position(netData.position() + read); + + delegate.send(data); + flush(); + } + result = engine.wrap(ByteBuffer.allocate(0), netData); + status = result.getStatus(); + read = result.bytesProduced(); + } + } + + public void flush() + { + delegate.flush(); + } + + public void send() + { + doSend(); + } + + public synchronized void send(ByteBuffer appData) + { + boolean buffered; + if(buffered = _appData.hasRemaining()) + { + ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining()); + newBuf.put(_appData); + newBuf.put(appData); + newBuf.flip(); + _appData = newBuf; + } + doSend(); + if(!appData.hasRemaining()) + { + _appData = EMPTY_BYTE_BUFFER; + } + else if(!buffered) + { + _appData = ByteBuffer.allocate(appData.remaining()); + _appData.put(appData); + _appData.flip(); + } + } + + private synchronized void doSend() + { + if (closed.get()) + { + throw new SenderException("SSL Sender is closed"); + } + + HandshakeStatus handshakeStatus; + Status status; + + while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) + && !_sslStatus.getSslErrorFlag()) + { + int read = 0; + try + { + SSLEngineResult result = engine.wrap(_appData, netData); + read = result.bytesProduced(); + status = result.getStatus(); + handshakeStatus = result.getHandshakeStatus(); + } + catch(SSLException e) + { + // Should this set _sslError?? + throw new SenderException("SSL, Error occurred while encrypting data",e); + } + + if(read > 0) + { + int limit = netData.limit(); + netData.limit(netData.position()); + netData.position(netData.position() - read); + + ByteBuffer data = netData.slice(); + + netData.limit(limit); + netData.position(netData.position() + read); + + delegate.send(data); + } + + switch(status) + { + case CLOSED: + throw new SenderException("SSLEngine is closed"); + + case BUFFER_OVERFLOW: + netData.clear(); + continue; + + case OK: + break; // do nothing + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + switch (handshakeStatus) + { + case NEED_WRAP: + if (netData.hasRemaining()) + { + continue; + } + + case NEED_TASK: + doTasks(); + break; + + case NEED_UNWRAP: + flush(); + return; + + case FINISHED: + if (_hostname != null) + { + SSLUtil.verifyHostname(engine, _hostname); + } + + case NOT_HANDSHAKING: + break; //do nothing + + default: + throw new IllegalStateException("SSLSender: Invalid State " + status); + } + + } + } + + private void doTasks() + { + Runnable runnable; + while ((runnable = engine.getDelegatedTask()) != null) { + runnable.run(); + } + } + + public void setIdleTimeout(int i) + { + delegate.setIdleTimeout(i); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java index 71b763685e..7492d062fd 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,16 +7,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ package org.apache.qpid.client.ssl; @@ -25,6 +25,7 @@ import static org.apache.qpid.test.utils.TestSSLConstants.KEYSTORE_PASSWORD; import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE; import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE_PASSWORD; +import java.util.Arrays; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQTestConnection_0_10; @@ -64,23 +65,23 @@ public class SSLTest extends QpidBrokerTestCase if (shouldPerformTest()) { clearSslStoreSystemProperties(); - + //Start the broker (NEEDing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, true, false); + configureJavaBrokerIfNecessary(true, true, true, false, false); super.setUp(); String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" + - "?ssl='true'&ssl_verify_hostname='true'" + + "?ssl='true'&ssl_verify_hostname='true'" + "&key_store='%s'&key_store_password='%s'" + "&trust_store='%s'&trust_store_password='%s'" + "'"; - + url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT, KEYSTORE,KEYSTORE_PASSWORD,TRUSTSTORE,TRUSTSTORE_PASSWORD); Connection con = getConnection(new AMQConnectionURL(url)); assertNotNull("connection should be successful", con); - Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); assertNotNull("create session should be successful", ssn); } } @@ -95,7 +96,7 @@ public class SSLTest extends QpidBrokerTestCase if (shouldPerformTest()) { //Start the broker (NEEDing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, true, false); + configureJavaBrokerIfNecessary(true, true, true, false, false); super.setUp(); //Create URL enabling SSL at the connection rather than brokerlist level @@ -119,7 +120,7 @@ public class SSLTest extends QpidBrokerTestCase if (shouldPerformTest()) { //Start the broker (NEEDing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, true, false); + configureJavaBrokerIfNecessary(true, true, true, false, false); super.setUp(); //Create URL enabling SSL at the connection, overriding the false at the brokerlist level @@ -138,18 +139,18 @@ public class SSLTest extends QpidBrokerTestCase if (shouldPerformTest()) { //Start the broker (NEEDing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, true, false); + configureJavaBrokerIfNecessary(true, true, true, false, false); super.setUp(); String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s?ssl='true''"; url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT); - + Connection con = getConnection(new AMQConnectionURL(url)); assertNotNull("connection should be successful", con); - Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); assertNotNull("create session should be successful", ssn); - } + } } public void testMultipleCertsInSingleStore() throws Exception @@ -157,43 +158,43 @@ public class SSLTest extends QpidBrokerTestCase if (shouldPerformTest()) { //Start the broker (NEEDing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, true, false); + configureJavaBrokerIfNecessary(true, true, true, false, false); super.setUp(); - String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" + - QpidBrokerTestCase.DEFAULT_SSL_PORT + + String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" + + QpidBrokerTestCase.DEFAULT_SSL_PORT + "?ssl='true'&ssl_cert_alias='" + CERT_ALIAS_APP1 + "''"; - - AMQTestConnection_0_10 con = new AMQTestConnection_0_10(url); + + AMQTestConnection_0_10 con = new AMQTestConnection_0_10(url); org.apache.qpid.transport.Connection transportCon = con.getConnection(); String userID = transportCon.getSecurityLayer().getUserID(); assertEquals("The correct certificate was not choosen","app1@acme.org",userID); con.close(); - - url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" + - QpidBrokerTestCase.DEFAULT_SSL_PORT + + + url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" + + QpidBrokerTestCase.DEFAULT_SSL_PORT + "?ssl='true'&ssl_cert_alias='" + CERT_ALIAS_APP2 + "''"; - - con = new AMQTestConnection_0_10(url); + + con = new AMQTestConnection_0_10(url); transportCon = con.getConnection(); userID = transportCon.getSecurityLayer().getUserID(); assertEquals("The correct certificate was not choosen","app2@acme.org",userID); con.close(); - } + } } - + public void testVerifyHostNameWithIncorrectHostname() throws Exception { if (shouldPerformTest()) { //Start the broker (WANTing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, false, true); + configureJavaBrokerIfNecessary(true, true, false, true, false); super.setUp(); - String url = "amqp://guest:guest@test/?brokerlist='tcp://127.0.0.1:" + - QpidBrokerTestCase.DEFAULT_SSL_PORT + + String url = "amqp://guest:guest@test/?brokerlist='tcp://127.0.0.1:" + + QpidBrokerTestCase.DEFAULT_SSL_PORT + "?ssl='true'&ssl_verify_hostname='true''"; - + try { getConnection(new AMQConnectionURL(url)); @@ -203,7 +204,7 @@ public class SSLTest extends QpidBrokerTestCase { verifyExceptionCausesContains(e, "SSL hostname verification failed"); } - } + } } private void verifyExceptionCausesContains(Exception e, String expectedString) @@ -213,39 +214,39 @@ public class SSLTest extends QpidBrokerTestCase String strace = bout.toString(); assertTrue("Correct exception not thrown", strace.contains(expectedString)); } - + public void testVerifyLocalHost() throws Exception { if (shouldPerformTest()) { //Start the broker (WANTing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, false, true); + configureJavaBrokerIfNecessary(true, true, false, true, false); super.setUp(); - String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" + - QpidBrokerTestCase.DEFAULT_SSL_PORT + + String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" + + QpidBrokerTestCase.DEFAULT_SSL_PORT + "?ssl='true'&ssl_verify_hostname='true''"; Connection con = getConnection(new AMQConnectionURL(url)); assertNotNull("connection should have been created", con); } } - + public void testVerifyLocalHostLocalDomain() throws Exception { if (shouldPerformTest()) { //Start the broker (WANTing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, false, true); + configureJavaBrokerIfNecessary(true, true, false, true, false); super.setUp(); - String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost.localdomain:" + - QpidBrokerTestCase.DEFAULT_SSL_PORT + + String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost.localdomain:" + + QpidBrokerTestCase.DEFAULT_SSL_PORT + "?ssl='true'&ssl_verify_hostname='true''"; Connection con = getConnection(new AMQConnectionURL(url)); assertNotNull("connection should have been created", con); - } + } } public void testCreateSSLConnectionUsingConnectionURLParamsTrustStoreOnly() throws Exception @@ -255,12 +256,12 @@ public class SSLTest extends QpidBrokerTestCase clearSslStoreSystemProperties(); //Start the broker (WANTing client certificate authentication) - configureJavaBrokerIfNecessary(true, true, false, true); + configureJavaBrokerIfNecessary(true, true, false, true, false); super.setUp(); - + String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" + - "?ssl='true'&ssl_verify_hostname='true'" + + "?ssl='true'&ssl_verify_hostname='true'" + "&trust_store='%s'&trust_store_password='%s'" + "'"; @@ -268,9 +269,9 @@ public class SSLTest extends QpidBrokerTestCase Connection con = getConnection(new AMQConnectionURL(url)); assertNotNull("connection should be successful", con); - Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); assertNotNull("create session should be successful", ssn); - } + } } /** @@ -308,7 +309,7 @@ public class SSLTest extends QpidBrokerTestCase clearSslStoreSystemProperties(); //Start the broker - configureJavaBrokerIfNecessary(true, true, needClientCerts, wantClientCerts); + configureJavaBrokerIfNecessary(true, true, needClientCerts, wantClientCerts, false); super.setUp(); String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" + @@ -344,6 +345,47 @@ public class SSLTest extends QpidBrokerTestCase } } + /** + * Test running TLS and unencrypted on the same port works and both TLS and non-TLS connections can be established + * + */ + public void testCreateSSLandTCPonSamePort() throws Exception + { + if (shouldPerformTest()) + { + clearSslStoreSystemProperties(); + + //Start the broker (NEEDing client certificate authentication) + configureJavaBrokerIfNecessary(true, false, false, false, true); + super.setUp(); + + String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" + + "?ssl='true'&ssl_verify_hostname='true'" + + "&key_store='%s'&key_store_password='%s'" + + "&trust_store='%s'&trust_store_password='%s'" + + "'"; + + url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT, + KEYSTORE,KEYSTORE_PASSWORD,TRUSTSTORE,TRUSTSTORE_PASSWORD); + + Connection con = getConnection(new AMQConnectionURL(url)); + assertNotNull("connection should be successful", con); + Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); + assertNotNull("create session should be successful", ssn); + + url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s'"; + + url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT); + + con = getConnection(new AMQConnectionURL(url)); + assertNotNull("connection should be successful", con); + ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE); + assertNotNull("create session should be successful", ssn); + + } + } + + private boolean shouldPerformTest() { // We run the SSL tests on all the Java broker profiles @@ -355,12 +397,17 @@ public class SSLTest extends QpidBrokerTestCase return Boolean.getBoolean(PROFILE_USE_SSL); } - private void configureJavaBrokerIfNecessary(boolean sslEnabled, boolean sslOnly, boolean needClientAuth, boolean wantClientAuth) throws ConfigurationException + private void configureJavaBrokerIfNecessary(boolean sslEnabled, + boolean sslOnly, + boolean needClientAuth, + boolean wantClientAuth, + boolean samePort) throws ConfigurationException { if(isJavaBroker()) { Map<String, Object> sslPortAttributes = new HashMap<String, Object>(); - sslPortAttributes.put(Port.TRANSPORTS, Collections.singleton(Transport.SSL)); + sslPortAttributes.put(Port.TRANSPORTS, samePort ? Arrays.asList(Transport.SSL, Transport.TCP) + : Collections.singleton(Transport.SSL)); sslPortAttributes.put(Port.PORT, DEFAULT_SSL_PORT); sslPortAttributes.put(Port.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER); sslPortAttributes.put(Port.NEED_CLIENT_AUTH, needClientAuth); diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes index b429305572..a404ff4389 100755 --- a/java/test-profiles/CPPExcludes +++ b/java/test-profiles/CPPExcludes @@ -180,6 +180,8 @@ org.apache.qpid.client.failover.MultipleBrokersFailoverTest#* // Uses Java broker specific configuration org.apache.qpid.client.ssl.SSLTest#testClientCertMissingWhilstWanting +org.apache.qpid.client.ssl.SSLTest#testCreateSSLandTCPonSamePort + // QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats |