diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-14 08:58:03 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-14 08:58:03 +0000 |
commit | 6dfbb79c9ce4835cc744c466e41bb1b42cee81c1 (patch) | |
tree | f341296e462617b4a80ec0e192f80224a438c405 | |
parent | c9660933637b69a14ae870397a53b086a8d6ab85 (diff) | |
download | qpid-python-6dfbb79c9ce4835cc744c466e41bb1b42cee81c1.tar.gz |
QPID-5459 : Added configurable TLS parameters for AMQP 1.0 client (both TCP and WSS)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1557982 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 690 insertions, 108 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index f72c9b3020..f8af2d388e 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -26,11 +26,21 @@ import java.net.URL; import java.net.URLConnection; import java.net.URLDecoder; import java.net.URLStreamHandler; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.cert.X509Certificate; +import java.util.HashMap; +import java.util.Map; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.qpid.amqp_1_0.client.SSLUtil; import org.apache.qpid.amqp_1_0.jms.ConnectionFactory; @@ -50,6 +60,13 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0); + private int _maxPrefetch; + private String _keyStorePath; + private String _keyStorePassword; + private String _keyStoreCertAlias; + private String _trustStorePath; + private String _trustStorePassword; + private SSLContext _sslContext; public ConnectionFactoryImpl(final String host, @@ -128,124 +145,342 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection return createConnection(_username, _password); } - public ConnectionImpl createConnection(final String username, final String password) throws JMSException + public ConnectionImpl createConnection(String username, final String password) throws JMSException { - ConnectionImpl connection = new ConnectionImpl(_protocol,_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions); + synchronized (this) + { + if(_ssl && _sslContext == null) + { + try + { + _sslContext = SSLUtil.buildSslContext(_keyStoreCertAlias,_keyStorePath, + KeyStore.getDefaultType(), + _keyStorePassword, + KeyManagerFactory.getDefaultAlgorithm(), + _trustStorePath,_trustStorePassword, + KeyStore.getDefaultType(), + TrustManagerFactory.getDefaultAlgorithm()); + if(username == null && _keyStoreCertAlias != null) + { + X509Certificate[] certs = SSLUtil.getClientCertificates(_keyStoreCertAlias, + _keyStorePath, + _keyStorePassword, + KeyStore.getDefaultType(), + KeyManagerFactory.getDefaultAlgorithm()); + if(certs != null && certs.length != 0) + { + username = certs[0].getSubjectDN().getName(); + } + } + + } + catch (GeneralSecurityException e) + { + final JMSException jmsException = new JMSException("Unable to create SSL context"); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } + catch (IOException e) + { + final JMSException jmsException = new JMSException("Unable to create SSL context"); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; } + } + } + ConnectionImpl connection = new ConnectionImpl(_protocol,_host, _port, username, password, _clientId, _remoteHost, _sslContext, _maxSessions); connection.setQueuePrefix(_queuePrefix); connection.setTopicPrefix(_topicPrefix); connection.setUseBinaryMessageId(_useBinaryMessageId); connection.setSyncPublish(_syncPublish); + if(_maxPrefetch != 0) + { + connection.setMaxPrefetch(_maxPrefetch); + } return connection; } - public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException + public void setMaxPrefetch(final int maxPrefetch) { - URL url = new URL(null, urlString, new URLStreamHandler() - { - @Override - protected URLConnection openConnection(URL u) throws IOException - { - throw new UnsupportedOperationException(); - } - }); - String protocol = url.getProtocol(); - if(protocol == null || "".equals(protocol)) - { - protocol = "amqp"; - } -/* - else if(!protocol.equals("amqp") && !protocol.equals("amqps")) - { - throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'."); - } -*/ - String host = url.getHost(); - int port = url.getPort(); + _maxPrefetch = maxPrefetch; + } - boolean ssl = false; + public void setKeyStorePath(final String keyStorePath) + { + _keyStorePath = keyStorePath; + } - if(port == -1) - { - if("amqps".equals(protocol)) - { - port = 5671; - ssl = true; - } - else - { - port = 5672; - } - } - else if("amqps".equals(protocol)) - { - ssl = true; - } + public void setKeyStorePassword(final String keyStorePassword) + { + _keyStorePassword = keyStorePassword; + } - String userInfo = url.getUserInfo(); - String username = null; - String password = null; - String clientId = null; - String remoteHost = null; + public void setKeyStoreCertAlias(final String keyStoreCertAlias) + { + _keyStoreCertAlias = keyStoreCertAlias; + } + + public void setTrustStorePath(final String trustStorePath) + { + _trustStorePath = trustStorePath; + } + + public void setTrustStorePassword(final String trustStorePassword) + { + _trustStorePassword = trustStorePassword; + } + + private static class ConnectionOptions + { + String username; + String password; + String clientId; + String remoteHost; boolean binaryMessageId = true; - boolean syncPublish = false; - int maxSessions = 0; + boolean syncPublish; + int maxSessions; + public boolean ssl; + public int maxPrefetch; + public String trustStorePath; + public String trustStorePassword; + public String keyStorePath; + public String keyStorePassword; + public String keyStoreCertAlias; + } + + + + private static abstract class OptionSetter + { + + private static final Map<String, OptionSetter> OPTION_SETTER_MAP = new HashMap<String, OptionSetter>(); + private final String _name; + private final String _description; + + public OptionSetter(String name, String description) + { + OPTION_SETTER_MAP.put(name.toLowerCase(), this); + _name = name; + _description = description; + } + + public abstract void setOption(ConnectionOptions options, String value) throws MalformedURLException; - if(userInfo != null) + public static void parseOptions(URL url, ConnectionOptions options) throws MalformedURLException { - String[] components = userInfo.split(":",2); - username = URLDecoder.decode(components[0]); - if(components.length == 2) + String query = url.getQuery(); + if(query != null) { - password = URLDecoder.decode(components[1]); + for(String param : query.split("&")) + { + + String[] keyValuePair = param.split("=",2); + OptionSetter setter = OPTION_SETTER_MAP.get(keyValuePair[0]); + if(setter != null) + { + setter.setOption(options, keyValuePair[1]); + } + else + { + throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL"); + } + + } } } - String query = url.getQuery(); - if(query != null) + } + + private static final OptionSetter[] _options = { - for(String param : query.split("&")) + new OptionSetter("clientid", "JMS client id / AMQP container id") { - String[] keyValuePair = param.split("=",2); - if(keyValuePair[0].equalsIgnoreCase("clientid")) + public void setOption(ConnectionOptions options, String value) { - clientId = keyValuePair[1]; + options.clientId = value; } - else if(keyValuePair[0].equalsIgnoreCase("ssl")) + }, + new OptionSetter("ssl", "Set to \"true\" to use SSL encryption") + { + public void setOption(ConnectionOptions options, String value) + { + options.ssl = Boolean.valueOf(value); + } + }, + new OptionSetter("remote-host", "AMQP remote host") + { + public void setOption(ConnectionOptions options, String value) { - ssl = Boolean.valueOf(keyValuePair[1]); + options.remoteHost = value; } - else if(keyValuePair[0].equalsIgnoreCase("remote-host")) + }, + new OptionSetter("binary-messageid", "Use binary (rather than String) message ids") + { + public void setOption(ConnectionOptions options, String value) { - remoteHost = keyValuePair[1]; + options.binaryMessageId = Boolean.parseBoolean(value); } - else if (keyValuePair[0].equalsIgnoreCase("binary-messageid")) + }, + new OptionSetter("sync-publish", "Wait for acknowledge when sending messages") + { + public void setOption(ConnectionOptions options, String value) { - binaryMessageId = Boolean.parseBoolean(keyValuePair[1]); + options.syncPublish = Boolean.parseBoolean(value); } - else if (keyValuePair[0].equalsIgnoreCase("sync-publish")) + }, + new OptionSetter("max-sessions", "set maximum number of sessions allowed") + { + public void setOption(ConnectionOptions options, String value) { - syncPublish = Boolean.parseBoolean(keyValuePair[1]); + options.maxSessions = Integer.parseInt(value); } - else if(keyValuePair[0].equalsIgnoreCase("max-sessions")) + }, + new OptionSetter("max-prefetch", "set maximum number of messages prefetched on a link") + { + public void setOption(ConnectionOptions options, String value) { - maxSessions = Integer.parseInt(keyValuePair[1]); + options.maxPrefetch = Integer.parseInt(value); } - else + }, + new OptionSetter("trust-store","") + { + public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException + { + options.trustStorePath = value; + } + }, + new OptionSetter("trust-store-password","") + { + public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException + { + options.trustStorePassword = value; + } + }, + new OptionSetter("key-store","") + { + public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException + { + options.keyStorePath = value; + } + }, + new OptionSetter("key-store-password","") + { + public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException { - throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL: "+urlString); + options.keyStorePassword = value; } + }, + new OptionSetter("ssl-cert-alias","") + { + public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException + { + options.keyStoreCertAlias = value; + } + } + }; + + public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException + { + URL url = new URL(null, urlString, new URLStreamHandler() + { + @Override + protected URLConnection openConnection(URL u) throws IOException + { + throw new UnsupportedOperationException(); } + }); + String protocol = url.getProtocol(); + if (protocol == null || "".equals(protocol)) + { + protocol = "amqp"; } + String host = url.getHost(); + int port = url.getPort(); + + final ConnectionOptions options = new ConnectionOptions(); - if(remoteHost == null) + if (port == -1) { - remoteHost = host; + if ("amqps".equals(protocol)) + { + port = 5671; + options.ssl = true; + } + else if("amqp".equals(protocol)) + { + port = 5672; + } + else if("ws".equals(protocol)) + { + port = 80; + } + else if("wss".equals(protocol)) + { + port = 443; + } + } + else if ("amqps".equals(protocol) || "wss".equals(protocol)) + { + options.ssl = true; + } + + + String userInfo = url.getUserInfo(); + + if (userInfo != null) + { + String[] components = userInfo.split(":", 2); + options.username = URLDecoder.decode(components[0]); + if (components.length == 2) + { + options.password = URLDecoder.decode(components[1]); + } + } + + OptionSetter.parseOptions(url, options); + + if (options.remoteHost == null) + { + options.remoteHost = host; } ConnectionFactoryImpl connectionFactory = - new ConnectionFactoryImpl(protocol,host, port, username, password, clientId, remoteHost, ssl, maxSessions); - connectionFactory.setUseBinaryMessageId(binaryMessageId); - connectionFactory.setSyncPublish(syncPublish); + new ConnectionFactoryImpl(protocol, + host, + port, + options.username, + options.password, + options.clientId, + options.remoteHost, + options.ssl, + options.maxSessions); + connectionFactory.setUseBinaryMessageId(options.binaryMessageId); + connectionFactory.setSyncPublish(options.syncPublish); + if (options.maxPrefetch != 0) + { + connectionFactory.setMaxPrefetch(options.maxPrefetch); + } + if (options.keyStorePath != null) + { + connectionFactory.setKeyStorePath(options.keyStorePath); + } + if (options.keyStorePassword != null) + { + connectionFactory.setKeyStorePassword(options.keyStorePassword); + } + if (options.keyStoreCertAlias != null) + { + connectionFactory.setKeyStoreCertAlias(options.keyStoreCertAlias); + } + if (options.trustStorePath != null) + { + connectionFactory.setTrustStorePath(options.trustStorePath); + } + if (options.trustStorePassword != null) + { + connectionFactory.setTrustStorePassword(options.trustStorePassword); + } return connectionFactory; @@ -308,4 +543,6 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection { _syncPublish = syncPublish; } + + } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 55bc8e4f96..7ce445a9b2 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -28,7 +28,9 @@ import org.apache.qpid.amqp_1_0.transport.Container; import javax.jms.*; import javax.jms.IllegalStateException; import javax.jms.Queue; +import javax.net.ssl.SSLContext; +import java.security.NoSuchAlgorithmException; import java.util.*; import org.apache.qpid.amqp_1_0.type.Symbol; @@ -39,6 +41,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { private final String _protocol; + private final SSLContext _sslContext; private ConnectionMetaData _connectionMetaData; private volatile ExceptionListener _exceptionListener; @@ -55,13 +58,18 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private final String _username; private final String _password; private String _remoteHost; - private final boolean _ssl; private String _clientId; private String _queuePrefix; private String _topicPrefix; private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); private int _maxSessions; + private int _maxPrefetch; + + public void setMaxPrefetch(final int maxPrefetch) + { + _maxPrefetch = maxPrefetch; + } private static enum State { @@ -96,6 +104,34 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException { + this(protocol, + host, + port, + username, + password, + clientId, + remoteHost, + ssl ? getDefaultSSLContext() : null, + maxSessions); + } + + private static SSLContext getDefaultSSLContext() throws JMSException + { + try + { + return SSLContext.getDefault(); + } + catch (NoSuchAlgorithmException e) + { + JMSException jmsException = new JMSException(e.getMessage()); + jmsException.setLinkedException(e); + jmsException.initCause(e); + throw jmsException; + } + } + + public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, SSLContext sslContext, int maxSessions) throws JMSException + { _protocol = protocol; _host = host; _port = port; @@ -103,7 +139,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect _password = password; _clientId = clientId; _remoteHost = remoteHost; - _ssl = ssl; + _sslContext = sslContext; _maxSessions = maxSessions; } @@ -121,7 +157,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect try { _conn = new org.apache.qpid.amqp_1_0.client.Connection(_protocol, _host, - _port, _username, _password, container, _remoteHost, _ssl, + _port, _username, _password, container, _remoteHost, _sslContext, _maxSessions - 1); _conn.setConnectionErrorTask(new ConnectionErrorTask()); // TODO - retrieve negotiated AMQP version @@ -190,6 +226,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect SessionImpl session = new SessionImpl(this, acknowledgeMode); session.setQueueSession(_isQueueConnection); session.setTopicSession(_isTopicConnection); + if(_maxPrefetch != 0) + { + session.setMaxPrefetch(_maxPrefetch); + } boolean connectionStarted = false; synchronized(_lock) diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index fd6f09d162..96ee1e984d 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -76,6 +76,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi private Binary _lastTxnUpdate; private final List<Message> _recoverReplayMessages = new ArrayList<Message>(); private final List<Message> _replaymessages = new ArrayList<Message>(); + private int _maxPrefetch = 100; MessageConsumerImpl(final Destination destination, final SessionImpl session, @@ -117,6 +118,10 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName()); } _session = session; + if(session.getMaxPrefetch() != 0) + { + _maxPrefetch = session.getMaxPrefetch(); + } _receiver = createClientReceiver(); _receiver.setRemoteErrorListener(new Runnable() @@ -442,7 +447,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi public void start() { - _receiver.setCredit(UnsignedInteger.valueOf(100), true); + _receiver.setCredit(UnsignedInteger.valueOf(getMaxPrefetch()), true); } public Queue getQueue() throws JMSException @@ -487,4 +492,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi } } } + + public int getMaxPrefetch() + { + return _maxPrefetch; + } + + public void setMaxPrefetch(final int maxPrefetch) + { + _maxPrefetch = maxPrefetch; + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index e5e6ea938e..a1cf0ef4e7 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -81,6 +81,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession private boolean _isQueueSession; private boolean _isTopicSession; private Transaction _txn; + private int _maxPrefetch; protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode) throws JMSException { @@ -843,6 +844,16 @@ public class SessionImpl implements Session, QueueSession, TopicSession return _txn; } + public void setMaxPrefetch(final int maxPrefetch) + { + _maxPrefetch = maxPrefetch; + } + + public int getMaxPrefetch() + { + return _maxPrefetch; + } + private class Dispatcher implements Runnable { diff --git a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java index 6c35e555ca..1805b593f1 100644 --- a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java +++ b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java @@ -27,14 +27,15 @@ import org.apache.qpid.amqp_1_0.framing.AMQFrame; import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.websocket.WebSocket; import org.eclipse.jetty.websocket.WebSocketClient; import org.eclipse.jetty.websocket.WebSocketClientFactory; -import java.io.IOException; +import javax.net.ssl.SSLContext; import java.net.URI; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; @@ -45,6 +46,7 @@ class WebSocketProvider implements TransportProvider private static final byte AMQP_HEADER_FRAME_TYPE = (byte) 222; private static int _connections; + private static QueuedThreadPool _threadPool; private final String _transport; private static WebSocketClientFactory _factory; @@ -53,23 +55,51 @@ class WebSocketProvider implements TransportProvider _transport = transport; } - private static synchronized WebSocketClient createWebSocketClient() throws Exception + private static synchronized WebSocketClientFactory getWebSocketClientFactory(SSLContext context) throws Exception { - if(_factory == null) + if(_threadPool == null) { - _factory = new WebSocketClientFactory(); - _factory.start(); + _threadPool = new QueuedThreadPool(); + } + if(context != null) + { + WebSocketClientFactory factory = new WebSocketClientFactory(_threadPool); + SslContextFactory sslContextFactory = factory.getSslContextFactory(); + + + sslContextFactory.setSslContext(context); + + factory.start(); + + return factory; + } + else + { + if(_factory == null) + { + _factory = new WebSocketClientFactory(_threadPool); + _factory.start(); + } + _connections++; + return _factory; } - _connections++; - return _factory.newWebSocketClient(); } - private static synchronized void removeClient() throws Exception + + private static synchronized void removeClient(final WebSocketClientFactory factory) throws Exception { - if(--_connections == 0) + + if(factory == _factory) + { + if(--_connections == 0) + { + _factory.stop(); + _factory = null; + } + } + else { - _factory.stop(); - _factory = null; + factory.stop(); } } @@ -77,13 +107,13 @@ class WebSocketProvider implements TransportProvider public void connect(final ConnectionEndpoint conn, final String address, final int port, - final boolean ssl, - final ExceptionHandler exceptionHandler) throws ConnectionException + final SSLContext sslContext, final ExceptionHandler exceptionHandler) throws ConnectionException { try { - WebSocketClient client = createWebSocketClient(); + final WebSocketClientFactory webSocketClientFactory = getWebSocketClientFactory(sslContext); + WebSocketClient client = webSocketClientFactory.newWebSocketClient(); // Configure the client client.setProtocol(AMQP_WEBSOCKET_SUBPROTOCOL); @@ -138,7 +168,7 @@ class WebSocketProvider implements TransportProvider public void onOpen(Connection connection) { - Thread outputThread = new Thread(new FrameOutputThread(connection, src, conn, exceptionHandler)); + Thread outputThread = new Thread(new FrameOutputThread(connection, src, conn, exceptionHandler, webSocketClientFactory)); outputThread.setDaemon(true); outputThread.start(); } @@ -226,17 +256,19 @@ class WebSocketProvider implements TransportProvider private final ExceptionHandler _exceptionHandler; private final FrameWriter _frameWriter; private final byte[] _buffer; + private final WebSocketClientFactory _factory; public FrameOutputThread(final WebSocket.Connection connection, final ConnectionHandler.FrameSource src, final ConnectionEndpoint conn, - final ExceptionHandler exceptionHandler) + final ExceptionHandler exceptionHandler, final WebSocketClientFactory factory) { _connection = connection; _frameSource = src; _exceptionHandler = exceptionHandler; _frameWriter = new FrameWriter(conn.getDescribedTypeRegistry()); _buffer = new byte[conn.getMaxFrameSize()]; + _factory = factory; } @Override @@ -278,7 +310,7 @@ class WebSocketProvider implements TransportProvider { try { - removeClient(); + removeClient(_factory); } catch (Exception e) { diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index b2d86c4dbc..6157ec53f6 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.amqp_1_0.client; +import java.security.NoSuchAlgorithmException; import java.security.Principal; import java.util.ServiceLoader; import java.util.concurrent.TimeoutException; @@ -36,6 +37,8 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; import org.apache.qpid.amqp_1_0.type.transport.Error; +import javax.net.ssl.SSLContext; + public class Connection implements ExceptionHandler { private static final int MAX_FRAME_SIZE = 65536; @@ -143,10 +146,10 @@ public class Connection implements ExceptionHandler final String password, final Container container, final String remoteHost, - final boolean ssl, + final SSLContext sslContext, final int channelMax) throws ConnectionException { - this(protocol, address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl, + this(protocol, address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,sslContext, channelMax); } @@ -160,7 +163,19 @@ public class Connection implements ExceptionHandler boolean ssl, int channelMax) throws ConnectionException { - this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container,remoteHostname,ssl,channelMax); + this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container,remoteHostname,getSslContext(ssl),channelMax); + } + + private static SSLContext getSslContext(final boolean ssl) throws ConnectionException + { + try + { + return ssl ? SSLContext.getDefault() : null; + } + catch (NoSuchAlgorithmException e) + { + throw new ConnectionException(e); + } } public Connection(final String protocol, @@ -171,7 +186,7 @@ public class Connection implements ExceptionHandler final int maxFrameSize, final Container container, final String remoteHostname, - boolean ssl, + SSLContext sslContext, int channelMax) throws ConnectionException { @@ -240,7 +255,7 @@ public class Connection implements ExceptionHandler TransportProvider transportProvider = getTransportProvider(protocol); - transportProvider.connect(_conn,address,port,ssl, this); + transportProvider.connect(_conn,address,port, sslContext, this); _conn.open(); diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java new file mode 100644 index 0000000000..70e5d08f15 --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509ExtendedKeyManager; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.Principal; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; + +public class SSLUtil +{ + public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS"; + + public static SSLContext buildSslContext(final String certAlias, + final String keyStorePath, + final String keyStoreType, + final String keyStorePassword, + final String keyManagerFactoryAlgorithm, + final String trustStorePath, + final String trustStorePassword, + final String trustStoreType, + final String trustManagerFactoryAlgorithm) throws GeneralSecurityException, IOException + { + + final SSLContext sslContext = SSLContext + .getInstance(TRANSPORT_LAYER_SECURITY_CODE); + + final TrustManager[] trustManagers; + final KeyManager[] keyManagers; + + if (trustStorePath != null) + { + final KeyStore ts = getInitializedKeyStore(trustStorePath, trustStorePassword, trustStoreType); + final TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustManagerFactoryAlgorithm); + + tmf.init(ts); + + trustManagers = tmf.getTrustManagers(); + } + else + { + trustManagers = null; + } + + if (keyStorePath != null) + { + if (certAlias != null) + { + keyManagers = new KeyManager[] { new QpidClientX509KeyManager( + certAlias, keyStorePath, keyStoreType, keyStorePassword, + keyManagerFactoryAlgorithm) }; + } + else + { + final KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath, keyStorePassword, keyStoreType); + + char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray(); + // Set up key manager factory to use our key store + final KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm); + kmf.init(ks, keyStoreCharPassword); + keyManagers = kmf.getKeyManagers(); + } + } + else + { + keyManagers = null; + } + + + sslContext.init(keyManagers, trustManagers, null); + + return sslContext; + } + + public static X509Certificate[] getClientCertificates(final String alias, + final String keyStorePath, + final String keyStorePassword, + final String keyStoreType, + final String keyManagerFactoryAlgorithm) + throws GeneralSecurityException, IOException + { + return (new QpidClientX509KeyManager(alias,keyStorePath,keyStoreType,keyStorePassword,keyManagerFactoryAlgorithm)).getCertificateChain(alias); + } + + public static KeyStore getInitializedKeyStore(String storePath, String storePassword, String keyStoreType) throws GeneralSecurityException, IOException + { + KeyStore ks = KeyStore.getInstance(keyStoreType); + InputStream in = null; + try + { + File f = new File(storePath); + if (f.exists()) + { + in = new FileInputStream(f); + } + else + { + in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath); + } + if (in == null && !"PKCS11".equalsIgnoreCase(keyStoreType)) // PKCS11 will not require an explicit path + { + throw new IOException("Unable to load keystore resource: " + storePath); + } + + char[] storeCharPassword = storePassword == null ? null : storePassword.toCharArray(); + + ks.load(in, storeCharPassword); + } + finally + { + if (in != null) + { + //noinspection EmptyCatchBlock + try + { + in.close(); + } + catch (IOException ignored) + { + } + } + } + return ks; + } + + public static class QpidClientX509KeyManager extends X509ExtendedKeyManager + { + + private X509ExtendedKeyManager delegate; + private String alias; + + public QpidClientX509KeyManager(String alias, String keyStorePath, String keyStoreType, + String keyStorePassword, String keyManagerFactoryAlgorithmName) throws + GeneralSecurityException, + IOException + { + this.alias = alias; + KeyStore ks = getInitializedKeyStore(keyStorePath, keyStorePassword, keyStoreType); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithmName); + kmf.init(ks, keyStorePassword.toCharArray()); + this.delegate = (X509ExtendedKeyManager) kmf.getKeyManagers()[0]; + } + + public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) + { + return alias; + } + + public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) + { + return delegate.chooseServerAlias(keyType, issuers, socket); + } + + public X509Certificate[] getCertificateChain(String alias) + { + return delegate.getCertificateChain(alias); + } + + public String[] getClientAliases(String keyType, Principal[] issuers) + { + return new String[]{alias}; + } + + public PrivateKey getPrivateKey(String alias) + { + return delegate.getPrivateKey(alias); + } + + public String[] getServerAliases(String keyType, Principal[] issuers) + { + return delegate.getServerAliases(keyType, issuers); + } + + public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) + { + return alias; + } + + public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) + { + return delegate.chooseEngineServerAlias(keyType, issuers, engine); + } + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java index 1c5eb0a34c..6cc749d11d 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java @@ -26,6 +26,8 @@ import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; import java.io.IOException; import java.io.InputStream; @@ -46,15 +48,19 @@ class TCPTransportProvier implements TransportProvider public void connect(final ConnectionEndpoint conn, final String address, final int port, - final boolean ssl, + final SSLContext sslContext, final ExceptionHandler exceptionHandler) throws ConnectionException { try { final Socket s; - if(ssl) + if(sslContext != null) { - s = SSLSocketFactory.getDefault().createSocket(address, port); + final SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + + SSLSocket sslSocket = (SSLSocket) socketFactory.createSocket(address, port); + + s=sslSocket; } else { @@ -64,6 +70,7 @@ class TCPTransportProvier implements TransportProvider conn.setRemoteAddress(s.getRemoteSocketAddress()); + ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn); ConnectionHandler.BytesSource src; diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java index 2430b0e14b..2c11d6b6ef 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java @@ -25,11 +25,13 @@ import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.type.FrameBody; +import javax.net.ssl.SSLContext; + public interface TransportProvider { void connect(ConnectionEndpoint conn, String address, int port, - boolean ssl, + SSLContext sslContext, ExceptionHandler exceptionHandler) throws ConnectionException; } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 1c80668856..9c93c1f0a5 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -103,7 +103,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE; private ConnectionEventListener _connectionEventListener = ConnectionEventListener.DEFAULT; private String _password; - private final boolean _requiresSASLClient; + private boolean _requiresSASLClient; private final boolean _requiresSASLServer; @@ -140,6 +140,14 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour _requiresSASLServer = false; } + public void setPrincipal(Principal user) + { + if(_user == null) + { + _user = user; + _requiresSASLClient = user != null; + } + } public synchronized void open() { |