summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java76
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java90
2 files changed, 83 insertions, 83 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java
index a9d2d50c51..011c38c2d7 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java
@@ -36,36 +36,36 @@ public class SSLReceiver implements Receiver<ByteBuffer>
{
private Receiver<ByteBuffer> delegate;
private SSLEngine engine;
- private SSLSender sender;
+ private SSLSender sender;
private int sslBufSize;
private ByteBuffer appData;
private ByteBuffer localBuffer;
private boolean dataCached = false;
private final Object notificationToken;
-
+
private static final Logger log = Logger.get(SSLReceiver.class);
-
+
public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
{
this.engine = engine;
this.delegate = delegate;
this.sender = sender;
- this.sslBufSize = engine.getSession().getApplicationBufferSize();
+ this.sslBufSize = engine.getSession().getApplicationBufferSize();
appData = ByteBuffer.allocate(sslBufSize);
localBuffer = ByteBuffer.allocate(sslBufSize);
notificationToken = sender.getNotificationToken();
}
-
+
public void closed()
- {
+ {
delegate.closed();
}
public void exception(Throwable t)
{
- delegate.exception(t);
+ delegate.exception(t);
}
-
+
private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf)
{
if (dataCached)
@@ -86,35 +86,35 @@ public class SSLReceiver implements Receiver<ByteBuffer>
public void received(ByteBuffer buf)
{
ByteBuffer netData = addPreviouslyUnreadData(buf);
-
+
HandshakeStatus handshakeStatus;
Status status;
-
+
while (netData.hasRemaining())
- {
+ {
try
{
SSLEngineResult result = engine.unwrap(netData, appData);
int read = result.bytesProduced();
status = result.getStatus();
- handshakeStatus = result.getHandshakeStatus();
-
+ handshakeStatus = result.getHandshakeStatus();
+
if (read > 0)
{
int limit = appData.limit();
appData.limit(appData.position());
appData.position(appData.position() - read);
-
+
ByteBuffer data = appData.slice();
-
+
appData.limit(limit);
appData.position(appData.position() + read);
-
- delegate.received(data);
- }
-
-
- switch(status)
+
+ delegate.received(data);
+ }
+
+
+ switch(status)
{
case CLOSED:
synchronized(notificationToken)
@@ -122,25 +122,25 @@ public class SSLReceiver implements Receiver<ByteBuffer>
notificationToken.notifyAll();
}
return;
-
+
case BUFFER_OVERFLOW:
appData = ByteBuffer.allocate(sslBufSize);
continue;
-
+
case BUFFER_UNDERFLOW:
localBuffer.clear();
localBuffer.put(netData);
localBuffer.flip();
dataCached = true;
break;
-
- case OK:
- break; // do nothing
-
+
+ case OK:
+ break; // do nothing
+
default:
throw new IllegalStateException("SSLReceiver: Invalid State " + status);
- }
-
+ }
+
switch (handshakeStatus)
{
case NEED_UNWRAP:
@@ -149,31 +149,31 @@ public class SSLReceiver implements Receiver<ByteBuffer>
continue;
}
break;
-
+
case NEED_TASK:
sender.doTasks();
handshakeStatus = engine.getHandshakeStatus();
-
- case NEED_WRAP:
+
+ case NEED_WRAP:
case FINISHED:
- case NOT_HANDSHAKING:
+ case NOT_HANDSHAKING:
synchronized(notificationToken)
{
notificationToken.notifyAll();
}
- break;
-
+ break;
+
default:
throw new IllegalStateException("SSLReceiver: Invalid State " + status);
}
-
-
+
+
}
catch(SSLException e)
{
throw new TransportException("Error in SSLReceiver",e);
}
-
+
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
index 5e878da531..e8d50d9020 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
@@ -39,18 +39,18 @@ public class SSLSender implements Sender<ByteBuffer>
private int sslBufSize;
private ByteBuffer netData;
private long timeout = 30000;
-
+
private final Object engineState = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
-
- private static final Logger log = Logger.get(SSLSender.class);
-
+
+ private static final Logger log = Logger.get(SSLSender.class);
+
public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
{
this.engine = engine;
- this.delegate = delegate;
+ this.delegate = delegate;
sslBufSize = engine.getSession().getPacketBufferSize();
- netData = ByteBuffer.allocate(sslBufSize);
+ netData = ByteBuffer.allocate(sslBufSize);
timeout = Long.getLong("qpid.ssl_timeout", 60000);
}
@@ -66,13 +66,13 @@ public class SSLSender implements Sender<ByteBuffer>
engine.closeOutbound();
try
{
- tearDownSSLConnection();
+ tearDownSSLConnection();
}
catch(Exception e)
{
throw new SenderException("Error closing SSL connection",e);
}
-
+
while (!engine.isOutboundDone())
{
synchronized(engineState)
@@ -107,24 +107,24 @@ public class SSLSender implements Sender<ByteBuffer>
int limit = netData.limit();
netData.limit(netData.position());
netData.position(netData.position() - read);
-
+
ByteBuffer data = netData.slice();
-
+
netData.limit(limit);
netData.position(netData.position() + read);
-
+
delegate.send(data);
flush();
- }
+ }
result = engine.wrap(ByteBuffer.allocate(0), netData);
- status = result.getStatus();
+ status = result.getStatus();
read = result.bytesProduced();
}
}
-
+
public void flush()
{
- delegate.flush();
+ delegate.flush();
}
public void send(ByteBuffer appData)
@@ -136,54 +136,54 @@ public class SSLSender implements Sender<ByteBuffer>
HandshakeStatus handshakeStatus;
Status status;
-
+
while(appData.hasRemaining())
- {
+ {
int read = 0;
try
{
- SSLEngineResult result = engine.wrap(appData, netData);
+ SSLEngineResult result = engine.wrap(appData, netData);
read = result.bytesProduced();
status = result.getStatus();
handshakeStatus = result.getHandshakeStatus();
-
+
}
catch(SSLException e)
{
throw new SenderException("SSL, Error occurred while encrypting data",e);
- }
-
+ }
+
if(read > 0)
{
int limit = netData.limit();
netData.limit(netData.position());
netData.position(netData.position() - read);
-
+
ByteBuffer data = netData.slice();
-
+
netData.limit(limit);
netData.position(netData.position() + read);
-
+
delegate.send(data);
}
-
- switch(status)
+
+ switch(status)
{
case CLOSED:
throw new SenderException("SSLEngine is closed");
-
+
case BUFFER_OVERFLOW:
netData.clear();
continue;
-
- case OK:
- break; // do nothing
-
+
+ case OK:
+ break; // do nothing
+
default:
throw new IllegalStateException("SSLReceiver: Invalid State " + status);
- }
-
+ }
+
switch (handshakeStatus)
{
case NEED_WRAP:
@@ -191,11 +191,11 @@ public class SSLSender implements Sender<ByteBuffer>
{
continue;
}
-
+
case NEED_TASK:
doTasks();
break;
-
+
case NEED_UNWRAP:
flush();
synchronized(engineState)
@@ -209,7 +209,7 @@ public class SSLSender implements Sender<ByteBuffer>
{
// pass
}
-
+
if (System.currentTimeMillis()- start >= timeout)
{
throw new SenderException(
@@ -218,31 +218,31 @@ public class SSLSender implements Sender<ByteBuffer>
}
}
break;
-
- case FINISHED:
+
+ case FINISHED:
case NOT_HANDSHAKING:
break; //do nothing
-
+
default:
throw new IllegalStateException("SSLReceiver: Invalid State " + status);
}
-
+
}
}
-
- public void doTasks()
+
+ public void doTasks()
{
Runnable runnable;
while ((runnable = engine.getDelegatedTask()) != null) {
runnable.run();
}
- }
-
+ }
+
public Object getNotificationToken()
{
return engineState;
}
-
+
public void setIdleTimeout(long l)
{
delegate.setIdleTimeout(l);