diff options
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java | 76 | ||||
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java | 90 |
2 files changed, 83 insertions, 83 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java index a9d2d50c51..011c38c2d7 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java @@ -36,36 +36,36 @@ public class SSLReceiver implements Receiver<ByteBuffer> { private Receiver<ByteBuffer> delegate; private SSLEngine engine; - private SSLSender sender; + private SSLSender sender; private int sslBufSize; private ByteBuffer appData; private ByteBuffer localBuffer; private boolean dataCached = false; private final Object notificationToken; - + private static final Logger log = Logger.get(SSLReceiver.class); - + public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender) { this.engine = engine; this.delegate = delegate; this.sender = sender; - this.sslBufSize = engine.getSession().getApplicationBufferSize(); + this.sslBufSize = engine.getSession().getApplicationBufferSize(); appData = ByteBuffer.allocate(sslBufSize); localBuffer = ByteBuffer.allocate(sslBufSize); notificationToken = sender.getNotificationToken(); } - + public void closed() - { + { delegate.closed(); } public void exception(Throwable t) { - delegate.exception(t); + delegate.exception(t); } - + private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf) { if (dataCached) @@ -86,35 +86,35 @@ public class SSLReceiver implements Receiver<ByteBuffer> public void received(ByteBuffer buf) { ByteBuffer netData = addPreviouslyUnreadData(buf); - + HandshakeStatus handshakeStatus; Status status; - + while (netData.hasRemaining()) - { + { try { SSLEngineResult result = engine.unwrap(netData, appData); int read = result.bytesProduced(); status = result.getStatus(); - handshakeStatus = result.getHandshakeStatus(); - + handshakeStatus = result.getHandshakeStatus(); + if (read > 0) { int limit = appData.limit(); appData.limit(appData.position()); appData.position(appData.position() - read); - + ByteBuffer data = appData.slice(); - + appData.limit(limit); appData.position(appData.position() + read); - - delegate.received(data); - } - - - switch(status) + + delegate.received(data); + } + + + switch(status) { case CLOSED: synchronized(notificationToken) @@ -122,25 +122,25 @@ public class SSLReceiver implements Receiver<ByteBuffer> notificationToken.notifyAll(); } return; - + case BUFFER_OVERFLOW: appData = ByteBuffer.allocate(sslBufSize); continue; - + case BUFFER_UNDERFLOW: localBuffer.clear(); localBuffer.put(netData); localBuffer.flip(); dataCached = true; break; - - case OK: - break; // do nothing - + + case OK: + break; // do nothing + default: throw new IllegalStateException("SSLReceiver: Invalid State " + status); - } - + } + switch (handshakeStatus) { case NEED_UNWRAP: @@ -149,31 +149,31 @@ public class SSLReceiver implements Receiver<ByteBuffer> continue; } break; - + case NEED_TASK: sender.doTasks(); handshakeStatus = engine.getHandshakeStatus(); - - case NEED_WRAP: + + case NEED_WRAP: case FINISHED: - case NOT_HANDSHAKING: + case NOT_HANDSHAKING: synchronized(notificationToken) { notificationToken.notifyAll(); } - break; - + break; + default: throw new IllegalStateException("SSLReceiver: Invalid State " + status); } - - + + } catch(SSLException e) { throw new TransportException("Error in SSLReceiver",e); } - + } } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java index 5e878da531..e8d50d9020 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java @@ -39,18 +39,18 @@ public class SSLSender implements Sender<ByteBuffer> private int sslBufSize; private ByteBuffer netData; private long timeout = 30000; - + private final Object engineState = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); - - private static final Logger log = Logger.get(SSLSender.class); - + + private static final Logger log = Logger.get(SSLSender.class); + public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate) { this.engine = engine; - this.delegate = delegate; + this.delegate = delegate; sslBufSize = engine.getSession().getPacketBufferSize(); - netData = ByteBuffer.allocate(sslBufSize); + netData = ByteBuffer.allocate(sslBufSize); timeout = Long.getLong("qpid.ssl_timeout", 60000); } @@ -66,13 +66,13 @@ public class SSLSender implements Sender<ByteBuffer> engine.closeOutbound(); try { - tearDownSSLConnection(); + tearDownSSLConnection(); } catch(Exception e) { throw new SenderException("Error closing SSL connection",e); } - + while (!engine.isOutboundDone()) { synchronized(engineState) @@ -107,24 +107,24 @@ public class SSLSender implements Sender<ByteBuffer> int limit = netData.limit(); netData.limit(netData.position()); netData.position(netData.position() - read); - + ByteBuffer data = netData.slice(); - + netData.limit(limit); netData.position(netData.position() + read); - + delegate.send(data); flush(); - } + } result = engine.wrap(ByteBuffer.allocate(0), netData); - status = result.getStatus(); + status = result.getStatus(); read = result.bytesProduced(); } } - + public void flush() { - delegate.flush(); + delegate.flush(); } public void send(ByteBuffer appData) @@ -136,54 +136,54 @@ public class SSLSender implements Sender<ByteBuffer> HandshakeStatus handshakeStatus; Status status; - + while(appData.hasRemaining()) - { + { int read = 0; try { - SSLEngineResult result = engine.wrap(appData, netData); + SSLEngineResult result = engine.wrap(appData, netData); read = result.bytesProduced(); status = result.getStatus(); handshakeStatus = result.getHandshakeStatus(); - + } catch(SSLException e) { throw new SenderException("SSL, Error occurred while encrypting data",e); - } - + } + if(read > 0) { int limit = netData.limit(); netData.limit(netData.position()); netData.position(netData.position() - read); - + ByteBuffer data = netData.slice(); - + netData.limit(limit); netData.position(netData.position() + read); - + delegate.send(data); } - - switch(status) + + switch(status) { case CLOSED: throw new SenderException("SSLEngine is closed"); - + case BUFFER_OVERFLOW: netData.clear(); continue; - - case OK: - break; // do nothing - + + case OK: + break; // do nothing + default: throw new IllegalStateException("SSLReceiver: Invalid State " + status); - } - + } + switch (handshakeStatus) { case NEED_WRAP: @@ -191,11 +191,11 @@ public class SSLSender implements Sender<ByteBuffer> { continue; } - + case NEED_TASK: doTasks(); break; - + case NEED_UNWRAP: flush(); synchronized(engineState) @@ -209,7 +209,7 @@ public class SSLSender implements Sender<ByteBuffer> { // pass } - + if (System.currentTimeMillis()- start >= timeout) { throw new SenderException( @@ -218,31 +218,31 @@ public class SSLSender implements Sender<ByteBuffer> } } break; - - case FINISHED: + + case FINISHED: case NOT_HANDSHAKING: break; //do nothing - + default: throw new IllegalStateException("SSLReceiver: Invalid State " + status); } - + } } - - public void doTasks() + + public void doTasks() { Runnable runnable; while ((runnable = engine.getDelegatedTask()) != null) { runnable.run(); } - } - + } + public Object getNotificationToken() { return engineState; } - + public void setIdleTimeout(long l) { delegate.setIdleTimeout(l); |