summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/security')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java185
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java66
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java86
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java123
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java100
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java202
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java274
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java197
8 files changed, 1233 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
new file mode 100644
index 0000000000..3f0966903d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -0,0 +1,185 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security;
+
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
+import org.apache.qpid.transport.network.security.sasl.SASLSender;
+import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
+import org.apache.qpid.transport.network.security.ssl.SSLSender;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+public class SecurityLayer
+{
+ ConnectionSettings settings;
+ Connection con;
+ SSLSecurityLayer sslLayer;
+ SASLSecurityLayer saslLayer;
+
+ public void init(Connection con) throws TransportException
+ {
+ this.con = con;
+ this.settings = con.getConnectionSettings();
+ if (settings.isUseSSL())
+ {
+ sslLayer = new SSLSecurityLayer();
+ }
+ if (settings.isUseSASLEncryption())
+ {
+ saslLayer = new SASLSecurityLayer();
+ }
+
+ }
+
+ public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
+ {
+ Sender<ByteBuffer> sender = delegate;
+
+ if (settings.isUseSSL())
+ {
+ sender = sslLayer.sender(sender);
+ }
+
+ if (settings.isUseSASLEncryption())
+ {
+ sender = saslLayer.sender(sender);
+ }
+
+ return sender;
+ }
+
+ public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
+ {
+ Receiver<ByteBuffer> receiver = delegate;
+
+ if (settings.isUseSSL())
+ {
+ receiver = sslLayer.receiver(receiver);
+ }
+
+ if (settings.isUseSASLEncryption())
+ {
+ receiver = saslLayer.receiver(receiver);
+ }
+
+ return receiver;
+ }
+
+ public String getUserID()
+ {
+ if (settings.isUseSSL())
+ {
+ return sslLayer.getUserID();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ class SSLSecurityLayer
+ {
+ SSLEngine engine;
+ SSLSender sender;
+
+ public SSLSecurityLayer()
+ {
+ SSLContext sslCtx;
+ try
+ {
+ sslCtx = SSLUtil.createSSLContext(settings);
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error creating SSL Context", e);
+ }
+
+ try
+ {
+ engine = sslCtx.createSSLEngine();
+ engine.setUseClientMode(true);
+ }
+ catch(Exception e)
+ {
+ throw new TransportException("Error creating SSL Engine", e);
+ }
+ }
+
+ public SSLSender sender(Sender<ByteBuffer> delegate)
+ {
+ sender = new SSLSender(engine,delegate);
+ sender.setConnectionSettings(settings);
+ return sender;
+ }
+
+ public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
+ {
+ if (sender == null)
+ {
+ throw new
+ IllegalStateException("SecurityLayer.sender method should be " +
+ "invoked before SecurityLayer.receiver");
+ }
+
+ SSLReceiver receiver = new SSLReceiver(engine,delegate,sender);
+ receiver.setConnectionSettings(settings);
+ return receiver;
+ }
+
+ public String getUserID()
+ {
+ return SSLUtil.retriveIdentity(engine);
+ }
+
+ }
+
+ class SASLSecurityLayer
+ {
+ public SASLSecurityLayer()
+ {
+ }
+
+ public SASLSender sender(Sender<ByteBuffer> delegate)
+ {
+ SASLSender sender = new SASLSender(delegate);
+ con.addConnectionListener((ConnectionListener)sender);
+ return sender;
+ }
+
+ public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
+ {
+ SASLReceiver receiver = new SASLReceiver(delegate);
+ con.addConnectionListener((ConnectionListener)receiver);
+ return receiver;
+ }
+
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
new file mode 100644
index 0000000000..7964239e31
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
@@ -0,0 +1,66 @@
+package org.apache.qpid.transport.network.security.sasl;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
+import org.apache.qpid.transport.ConnectionListener;
+
+public abstract class SASLEncryptor implements ConnectionListener
+{
+ protected SaslClient saslClient;
+ protected boolean securityLayerEstablished = false;
+ protected int sendBuffSize;
+ protected int recvBuffSize;
+
+ public boolean isSecurityLayerEstablished()
+ {
+ return securityLayerEstablished;
+ }
+
+ public void opened(Connection conn)
+ {
+ if (conn.getSaslClient() != null)
+ {
+ saslClient = conn.getSaslClient();
+ if (saslClient.isComplete() && saslClient.getNegotiatedProperty(Sasl.QOP) == "auth-conf")
+ {
+ sendBuffSize = Integer.parseInt(
+ (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE));
+ recvBuffSize = Integer.parseInt(
+ (String)saslClient.getNegotiatedProperty(Sasl.MAX_BUFFER));
+ securityLayerEstablished();
+ securityLayerEstablished = true;
+ }
+ }
+ }
+
+ public void exception(Connection conn, ConnectionException exception){}
+ public void closed(Connection conn) {}
+
+ public abstract void securityLayerEstablished();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
new file mode 100644
index 0000000000..86106318ef
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
@@ -0,0 +1,86 @@
+package org.apache.qpid.transport.network.security.sasl;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> {
+
+ Receiver<ByteBuffer> delegate;
+ private byte[] netData;
+ private static final Logger log = Logger.get(SASLReceiver.class);
+
+ public SASLReceiver(Receiver<ByteBuffer> delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ public void closed()
+ {
+ delegate.closed();
+ }
+
+
+ public void exception(Throwable t)
+ {
+ delegate.exception(t);
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ if (isSecurityLayerEstablished())
+ {
+ while (buf.hasRemaining())
+ {
+ int length = Math.min(buf.remaining(),recvBuffSize);
+ buf.get(netData, 0, length);
+ try
+ {
+ byte[] out = saslClient.unwrap(netData, 0, length);
+ delegate.received(ByteBuffer.wrap(out));
+ }
+ catch (SaslException e)
+ {
+ throw new SenderException("SASL Sender, Error occurred while encrypting data",e);
+ }
+ }
+ }
+ else
+ {
+ delegate.received(buf);
+ }
+ }
+
+ public void securityLayerEstablished()
+ {
+ netData = new byte[recvBuffSize];
+ log.debug("SASL Security Layer Established");
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
new file mode 100644
index 0000000000..27255f79f6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -0,0 +1,123 @@
+package org.apache.qpid.transport.network.security.sasl;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
+
+ protected Sender<ByteBuffer> delegate;
+ private byte[] appData;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private static final Logger log = Logger.get(SASLSender.class);
+
+ public SASLSender(Sender<ByteBuffer> delegate)
+ {
+ this.delegate = delegate;
+ log.debug("SASL Sender enabled");
+ }
+
+ @Override
+ public void close()
+ {
+
+ if (!closed.getAndSet(true))
+ {
+ delegate.close();
+ if (isSecurityLayerEstablished())
+ {
+ try
+ {
+ saslClient.dispose();
+ }
+ catch (SaslException e)
+ {
+ throw new SenderException("Error closing SASL Sender",e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ delegate.flush();
+ }
+
+ @Override
+ public void send(ByteBuffer buf)
+ {
+ if (closed.get())
+ {
+ throw new SenderException("SSL Sender is closed");
+ }
+
+ if (isSecurityLayerEstablished())
+ {
+ while (buf.hasRemaining())
+ {
+ int length = Math.min(buf.remaining(),sendBuffSize);
+ log.debug("sendBuffSize %s", sendBuffSize);
+ log.debug("buf.remaining() %s", buf.remaining());
+
+ buf.get(appData, 0, length);
+ try
+ {
+ byte[] out = saslClient.wrap(appData, 0, length);
+ log.debug("out.length %s", out.length);
+
+ delegate.send(ByteBuffer.wrap(out));
+ }
+ catch (SaslException e)
+ {
+ log.error("Exception while encrypting data.",e);
+ throw new SenderException("SASL Sender, Error occurred while encrypting data",e);
+ }
+ }
+ }
+ else
+ {
+ delegate.send(buf);
+ }
+ }
+
+ @Override
+ public void setIdleTimeout(int i)
+ {
+ delegate.setIdleTimeout(i);
+ }
+
+ public void securityLayerEstablished()
+ {
+ appData = new byte[sendBuffSize];
+ log.debug("SASL Security Layer Established");
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
new file mode 100644
index 0000000000..14f28f8828
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security.ssl;
+
+import java.net.Socket;
+import java.security.KeyStore;
+import java.security.Principal;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.X509ExtendedKeyManager;
+
+import org.apache.qpid.transport.util.Logger;
+
+public class QpidClientX509KeyManager extends X509ExtendedKeyManager
+{
+ private static final Logger log = Logger.get(QpidClientX509KeyManager.class);
+
+ X509ExtendedKeyManager delegate;
+ String alias;
+
+ public QpidClientX509KeyManager(String alias, String keyStorePath,
+ String keyStorePassword,String keyStoreCertType) throws Exception
+ {
+ this.alias = alias;
+ KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyStoreCertType);
+ kmf.init(ks, keyStorePassword.toCharArray());
+ this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];
+ }
+
+ @Override
+ public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket)
+ {
+ log.debug("chooseClientAlias:Returning alias " + alias);
+ return alias;
+ }
+
+ @Override
+ public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket)
+ {
+ return delegate.chooseServerAlias(keyType, issuers, socket);
+ }
+
+ @Override
+ public X509Certificate[] getCertificateChain(String alias)
+ {
+ return delegate.getCertificateChain(alias);
+ }
+
+ @Override
+ public String[] getClientAliases(String keyType, Principal[] issuers)
+ {
+ log.debug("getClientAliases:Returning alias " + alias);
+ return new String[]{alias};
+ }
+
+ @Override
+ public PrivateKey getPrivateKey(String alias)
+ {
+ return delegate.getPrivateKey(alias);
+ }
+
+ @Override
+ public String[] getServerAliases(String keyType, Principal[] issuers)
+ {
+ return delegate.getServerAliases(keyType, issuers);
+ }
+
+ public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine)
+ {
+ log.debug("chooseEngineClientAlias:Returning alias " + alias);
+ return alias;
+ }
+
+ public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine)
+ {
+ return delegate.chooseEngineServerAlias(keyType, issuers, engine);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
new file mode 100644
index 0000000000..e227a51729
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -0,0 +1,202 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security.ssl;
+
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLReceiver implements Receiver<ByteBuffer>
+{
+ private Receiver<ByteBuffer> delegate;
+ private SSLEngine engine;
+ private SSLSender sender;
+ private int sslBufSize;
+ private ByteBuffer appData;
+ private ByteBuffer localBuffer;
+ private boolean dataCached = false;
+ private final Object notificationToken;
+ private ConnectionSettings settings;
+
+ private static final Logger log = Logger.get(SSLReceiver.class);
+
+ public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
+ {
+ this.engine = engine;
+ this.delegate = delegate;
+ this.sender = sender;
+ this.sslBufSize = engine.getSession().getApplicationBufferSize();
+ appData = ByteBuffer.allocate(sslBufSize);
+ localBuffer = ByteBuffer.allocate(sslBufSize);
+ notificationToken = sender.getNotificationToken();
+ }
+
+ public void setConnectionSettings(ConnectionSettings settings)
+ {
+ this.settings = settings;
+ }
+
+ public void closed()
+ {
+ delegate.closed();
+ }
+
+ public void exception(Throwable t)
+ {
+ delegate.exception(t);
+ }
+
+ private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf)
+ {
+ if (dataCached)
+ {
+ ByteBuffer b = ByteBuffer.allocate(localBuffer.remaining() + buf.remaining());
+ b.put(localBuffer);
+ b.put(buf);
+ b.flip();
+ dataCached = false;
+ return b;
+ }
+ else
+ {
+ return buf;
+ }
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ ByteBuffer netData = addPreviouslyUnreadData(buf);
+
+ HandshakeStatus handshakeStatus;
+ Status status;
+
+ while (netData.hasRemaining())
+ {
+ try
+ {
+ SSLEngineResult result = engine.unwrap(netData, appData);
+ synchronized (notificationToken)
+ {
+ notificationToken.notifyAll();
+ }
+
+ int read = result.bytesProduced();
+ status = result.getStatus();
+ handshakeStatus = result.getHandshakeStatus();
+
+ if (read > 0)
+ {
+ int limit = appData.limit();
+ appData.limit(appData.position());
+ appData.position(appData.position() - read);
+
+ ByteBuffer data = appData.slice();
+
+ appData.limit(limit);
+ appData.position(appData.position() + read);
+
+ delegate.received(data);
+ }
+
+
+ switch(status)
+ {
+ case CLOSED:
+ synchronized(notificationToken)
+ {
+ notificationToken.notifyAll();
+ }
+ return;
+
+ case BUFFER_OVERFLOW:
+ appData = ByteBuffer.allocate(sslBufSize);
+ continue;
+
+ case BUFFER_UNDERFLOW:
+ localBuffer.clear();
+ localBuffer.put(netData);
+ localBuffer.flip();
+ dataCached = true;
+ break;
+
+ case OK:
+ break; // do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ switch (handshakeStatus)
+ {
+ case NEED_UNWRAP:
+ if (netData.hasRemaining())
+ {
+ continue;
+ }
+ break;
+
+ case NEED_TASK:
+ sender.doTasks();
+ handshakeStatus = engine.getHandshakeStatus();
+
+ case FINISHED:
+ if (this.settings != null && this.settings.isVerifyHostname() )
+ {
+ SSLUtil.verifyHostname(engine, this.settings.getHost());
+ }
+
+ case NEED_WRAP:
+ case NOT_HANDSHAKING:
+ synchronized(notificationToken)
+ {
+ notificationToken.notifyAll();
+ }
+ break;
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+
+ }
+ catch(SSLException e)
+ {
+ log.error(e, "Error caught in SSLReceiver");
+ sender.setErrorFlag();
+ synchronized(notificationToken)
+ {
+ notificationToken.notifyAll();
+ }
+ exception(new TransportException("Error in SSLReceiver",e));
+ }
+
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
new file mode 100644
index 0000000000..cd47a11825
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security.ssl;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLSender implements Sender<ByteBuffer>
+{
+ private Sender<ByteBuffer> delegate;
+ private SSLEngine engine;
+ private int sslBufSize;
+ private ByteBuffer netData;
+ private long timeout = 30000;
+ private ConnectionSettings settings;
+
+ private final Object engineState = new Object();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean error = new AtomicBoolean(false);
+
+ private static final Logger log = Logger.get(SSLSender.class);
+
+ public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
+ {
+ this.engine = engine;
+ this.delegate = delegate;
+ sslBufSize = engine.getSession().getPacketBufferSize();
+ netData = ByteBuffer.allocate(sslBufSize);
+ timeout = Long.getLong("qpid.ssl_timeout", 60000);
+ }
+
+ public void setConnectionSettings(ConnectionSettings settings)
+ {
+ this.settings = settings;
+ }
+
+ public void close()
+ {
+ if (!closed.getAndSet(true))
+ {
+ if (engine.isOutboundDone())
+ {
+ return;
+ }
+ log.debug("Closing SSL connection");
+
+ engine.closeOutbound();
+ try
+ {
+ tearDownSSLConnection();
+ }
+ catch(Exception e)
+ {
+ throw new SenderException("Error closing SSL connection",e);
+ }
+
+
+ synchronized(engineState)
+ {
+ while (!engine.isOutboundDone())
+ {
+ try
+ {
+ engineState.wait();
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+
+ }
+ }
+ delegate.close();
+ }
+ }
+
+ private void tearDownSSLConnection() throws Exception
+ {
+ SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
+ Status status = result.getStatus();
+ int read = result.bytesProduced();
+ while (status != Status.CLOSED)
+ {
+ if (status == Status.BUFFER_OVERFLOW)
+ {
+ netData.clear();
+ }
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ delegate.send(data);
+ flush();
+ }
+ result = engine.wrap(ByteBuffer.allocate(0), netData);
+ status = result.getStatus();
+ read = result.bytesProduced();
+ }
+ }
+
+ public void flush()
+ {
+ delegate.flush();
+ }
+
+ public void send(ByteBuffer appData)
+ {
+ if (closed.get())
+ {
+ throw new SenderException("SSL Sender is closed");
+ }
+
+ HandshakeStatus handshakeStatus;
+ Status status;
+
+ while(appData.hasRemaining() && !error.get())
+ {
+ int read = 0;
+ try
+ {
+ SSLEngineResult result = engine.wrap(appData, netData);
+ read = result.bytesProduced();
+ status = result.getStatus();
+ handshakeStatus = result.getHandshakeStatus();
+ }
+ catch(SSLException e)
+ {
+ throw new SenderException("SSL, Error occurred while encrypting data",e);
+ }
+
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ delegate.send(data);
+ }
+
+ switch(status)
+ {
+ case CLOSED:
+ throw new SenderException("SSLEngine is closed");
+
+ case BUFFER_OVERFLOW:
+ netData.clear();
+ continue;
+
+ case OK:
+ break; // do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ switch (handshakeStatus)
+ {
+ case NEED_WRAP:
+ if (netData.hasRemaining())
+ {
+ continue;
+ }
+
+ case NEED_TASK:
+ doTasks();
+ break;
+
+ case NEED_UNWRAP:
+ flush();
+ synchronized(engineState)
+ {
+ switch (engine.getHandshakeStatus())
+ {
+ case NEED_UNWRAP:
+ long start = System.currentTimeMillis();
+ try
+ {
+ engineState.wait(timeout);
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+
+ if (System.currentTimeMillis()- start >= timeout)
+ {
+ throw new SenderException(
+ "SSL Engine timed out waiting for a response." +
+ "To get more info,run with -Djavax.net.debug=ssl");
+ }
+ break;
+ }
+ }
+ break;
+
+ case FINISHED:
+ if (this.settings != null && this.settings.isVerifyHostname() )
+ {
+ SSLUtil.verifyHostname(engine, this.settings.getHost());
+ }
+
+ case NOT_HANDSHAKING:
+ break; //do nothing
+
+ default:
+ throw new IllegalStateException("SSLSender: Invalid State " + status);
+ }
+
+ }
+ }
+
+ public void doTasks()
+ {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ public Object getNotificationToken()
+ {
+ return engineState;
+ }
+
+ public void setErrorFlag()
+ {
+ error.set(true);
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ delegate.setIdleTimeout(i);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
new file mode 100644
index 0000000000..fd73915b65
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security.ssl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.Principal;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLUtil
+{
+ private static final Logger log = Logger.get(SSLUtil.class);
+
+ public static void verifyHostname(SSLEngine engine,String hostnameExpected)
+ {
+ try
+ {
+ Certificate cert = engine.getSession().getPeerCertificates()[0];
+ Principal p = ((X509Certificate)cert).getSubjectDN();
+ String dn = p.getName();
+ String hostname = null;
+
+ if (dn.contains("CN="))
+ {
+ hostname = dn.substring(3,
+ dn.indexOf(",") == -1? dn.length(): dn.indexOf(","));
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Hostname expected : " + hostnameExpected);
+ log.debug("Distinguished Name for server certificate : " + dn);
+ log.debug("Host Name obtained from DN : " + hostname);
+ }
+
+ if (hostname != null && !(hostname.equalsIgnoreCase(hostnameExpected) ||
+ hostname.equalsIgnoreCase(hostnameExpected + ".localdomain")))
+ {
+ throw new TransportException("SSL hostname verification failed." +
+ " Expected : " + hostnameExpected +
+ " Found in cert : " + hostname);
+ }
+
+ }
+ catch(SSLPeerUnverifiedException e)
+ {
+ log.warn("Exception received while trying to verify hostname",e);
+ // For some reason the SSL engine sets the handshake status to FINISH twice
+ // in succession. The first time the peer certificate
+ // info is not available. The second time it works !
+ // Therefore have no choice but to ignore the exception here.
+ }
+ }
+
+ public static String retriveIdentity(SSLEngine engine)
+ {
+ StringBuffer id = new StringBuffer();
+ try
+ {
+ Certificate cert = engine.getSession().getLocalCertificates()[0];
+ Principal p = ((X509Certificate)cert).getSubjectDN();
+ String dn = p.getName();
+
+ if (dn.contains("CN="))
+ {
+ id.append(dn.substring(3,
+ dn.indexOf(",") == -1? dn.length(): dn.indexOf(",")));
+ }
+
+ if (dn.contains("DC="))
+ {
+ id.append("@");
+ int c = 0;
+ for (String toks : dn.split(","))
+ {
+ if (toks.contains("DC"))
+ {
+ if (c > 0) {id.append(".");}
+ id.append(toks.substring(
+ toks.indexOf("=")+1,
+ toks.indexOf(",") == -1? toks.length(): toks.indexOf(",")));
+ c++;
+ }
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ log.info("Exception received while trying to retrive client identity from SSL cert",e);
+ }
+
+ log.debug("Extracted Identity from client certificate : " + id);
+ return id.toString();
+ }
+
+ public static SSLContext createSSLContext(ConnectionSettings settings) throws Exception
+ {
+ SSLContextFactory sslContextFactory;
+
+ if (settings.getCertAlias() == null)
+ {
+ sslContextFactory =
+ new SSLContextFactory(settings.getTrustStorePath(),
+ settings.getTrustStorePassword(),
+ settings.getTrustStoreCertType(),
+ settings.getKeyStorePath(),
+ settings.getKeyStorePassword(),
+ settings.getKeyStoreCertType());
+
+ } else
+ {
+ sslContextFactory =
+ new SSLContextFactory(settings.getTrustStorePath(),
+ settings.getTrustStorePassword(),
+ settings.getTrustStoreCertType(),
+ new QpidClientX509KeyManager(settings.getCertAlias(),
+ settings.getKeyStorePath(),
+ settings.getKeyStorePassword(),
+ settings.getKeyStoreCertType()));
+
+ log.debug("Using custom key manager");
+ }
+
+ return sslContextFactory.buildServerContext();
+
+ }
+
+ public static KeyStore getInitializedKeyStore(String storePath, String storePassword) throws GeneralSecurityException, IOException
+ {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ InputStream in = null;
+ try
+ {
+ File f = new File(storePath);
+ if (f.exists())
+ {
+ in = new FileInputStream(f);
+ }
+ else
+ {
+ in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath);
+ }
+ if (in == null)
+ {
+ throw new IOException("Unable to load keystore resource: " + storePath);
+ }
+ ks.load(in, storePassword.toCharArray());
+ }
+ finally
+ {
+ if (in != null)
+ {
+ //noinspection EmptyCatchBlock
+ try
+ {
+ in.close();
+ }
+ catch (IOException ignored)
+ {
+ }
+ }
+ }
+ return ks;
+ }
+}