summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-05-11 14:27:01 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-05-11 14:27:01 +0000
commit90593b58d192ce16ab9945a279ddf12a09f8e0e2 (patch)
tree94b940b35178ec8d1cec63552a32dab7aa3b0deb /java/common/src
parent779e1b19631d1eb62e9b7f89340f76736b462f7e (diff)
downloadqpid-python-90593b58d192ce16ab9945a279ddf12a09f8e0e2.tar.gz
QPID-4831 : [Java Broker] Allow SSL and non-SSL connections on the same port
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1481331 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java271
2 files changed, 272 insertions, 1 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index c8027e143e..5742667dbe 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -239,7 +239,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
ticker);
ticker.setConnection(connection);
- if(_sslContext != null)
+ if(_sslContext != null && socket instanceof SSLSocket)
{
try
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
new file mode 100644
index 0000000000..0d36b96cd4
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security.ssl;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLBufferingSender implements Sender<ByteBuffer>
+{
+ private static final Logger log = Logger.get(SSLBufferingSender.class);
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ private final Sender<ByteBuffer> delegate;
+ private final SSLEngine engine;
+ private final int sslBufSize;
+ private final ByteBuffer netData;
+ private final SSLStatus _sslStatus;
+
+ private String _hostname;
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
+
+
+ public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
+ {
+ this.engine = engine;
+ this.delegate = delegate;
+ sslBufSize = engine.getSession().getPacketBufferSize();
+ netData = ByteBuffer.allocate(sslBufSize);
+ _sslStatus = sslStatus;
+ }
+
+ public void setHostname(String hostname)
+ {
+ _hostname = hostname;
+ }
+
+ public void close()
+ {
+ if (!closed.getAndSet(true))
+ {
+ if (engine.isOutboundDone())
+ {
+ return;
+ }
+ log.debug("Closing SSL connection");
+
+ engine.closeOutbound();
+ try
+ {
+ tearDownSSLConnection();
+ }
+ catch(Exception e)
+ {
+ throw new SenderException("Error closing SSL connection",e);
+ }
+
+
+ synchronized(_sslStatus.getSslLock())
+ {
+ while (!engine.isOutboundDone())
+ {
+ try
+ {
+ _sslStatus.getSslLock().wait();
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+
+ }
+ }
+ delegate.close();
+ }
+ }
+
+ private void tearDownSSLConnection() throws Exception
+ {
+ SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
+ Status status = result.getStatus();
+ int read = result.bytesProduced();
+ while (status != Status.CLOSED)
+ {
+ if (status == Status.BUFFER_OVERFLOW)
+ {
+ netData.clear();
+ }
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ delegate.send(data);
+ flush();
+ }
+ result = engine.wrap(ByteBuffer.allocate(0), netData);
+ status = result.getStatus();
+ read = result.bytesProduced();
+ }
+ }
+
+ public void flush()
+ {
+ delegate.flush();
+ }
+
+ public void send()
+ {
+ doSend();
+ }
+
+ public synchronized void send(ByteBuffer appData)
+ {
+ boolean buffered;
+ if(buffered = _appData.hasRemaining())
+ {
+ ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
+ newBuf.put(_appData);
+ newBuf.put(appData);
+ newBuf.flip();
+ _appData = newBuf;
+ }
+ doSend();
+ if(!appData.hasRemaining())
+ {
+ _appData = EMPTY_BYTE_BUFFER;
+ }
+ else if(!buffered)
+ {
+ _appData = ByteBuffer.allocate(appData.remaining());
+ _appData.put(appData);
+ _appData.flip();
+ }
+ }
+
+ private synchronized void doSend()
+ {
+ if (closed.get())
+ {
+ throw new SenderException("SSL Sender is closed");
+ }
+
+ HandshakeStatus handshakeStatus;
+ Status status;
+
+ while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
+ && !_sslStatus.getSslErrorFlag())
+ {
+ int read = 0;
+ try
+ {
+ SSLEngineResult result = engine.wrap(_appData, netData);
+ read = result.bytesProduced();
+ status = result.getStatus();
+ handshakeStatus = result.getHandshakeStatus();
+ }
+ catch(SSLException e)
+ {
+ // Should this set _sslError??
+ throw new SenderException("SSL, Error occurred while encrypting data",e);
+ }
+
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ delegate.send(data);
+ }
+
+ switch(status)
+ {
+ case CLOSED:
+ throw new SenderException("SSLEngine is closed");
+
+ case BUFFER_OVERFLOW:
+ netData.clear();
+ continue;
+
+ case OK:
+ break; // do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ switch (handshakeStatus)
+ {
+ case NEED_WRAP:
+ if (netData.hasRemaining())
+ {
+ continue;
+ }
+
+ case NEED_TASK:
+ doTasks();
+ break;
+
+ case NEED_UNWRAP:
+ flush();
+ return;
+
+ case FINISHED:
+ if (_hostname != null)
+ {
+ SSLUtil.verifyHostname(engine, _hostname);
+ }
+
+ case NOT_HANDSHAKING:
+ break; //do nothing
+
+ default:
+ throw new IllegalStateException("SSLSender: Invalid State " + status);
+ }
+
+ }
+ }
+
+ private void doTasks()
+ {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ delegate.setIdleTimeout(i);
+ }
+}