summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-05-11 14:27:01 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-05-11 14:27:01 +0000
commit90593b58d192ce16ab9945a279ddf12a09f8e0e2 (patch)
tree94b940b35178ec8d1cec63552a32dab7aa3b0deb /java
parent779e1b19631d1eb62e9b7f89340f76736b462f7e (diff)
downloadqpid-python-90593b58d192ce16ab9945a279ddf12a09f8e0e2.tar.gz
QPID-4831 : [Java Broker] Allow SSL and non-SSL connections on the same port
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1481331 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java24
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java266
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java27
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java7
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java240
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java27
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java14
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java271
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java145
-rwxr-xr-xjava/test-profiles/CPPExcludes2
19 files changed, 888 insertions, 215 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 451754e6d8..a539743081 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -88,13 +88,6 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
}
}
}
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.connectionRegistered(connnection);
- }
- }
}
public void deregisterConnection(AMQConnectionModel connnection)
@@ -111,14 +104,6 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
}
}
}
-
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.connectionUnregistered(connnection);
- }
- }
}
public void addRegistryChangeListener(RegistryChangeListener listener)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
index 1fc22c736c..ddfbf51322 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
@@ -99,9 +99,11 @@ public class AmqpPortAdapter extends PortAdapter
_transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(
- _broker, supported, defaultSupportedProtocolReply);
+ _broker, transports.contains(Transport.TCP) ? sslContext : null,
+ settings.wantClientAuth(), settings.needClientAuth(),
+ supported, defaultSupportedProtocolReply, this, transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
- _transport.accept(settings, protocolEngineFactory, sslContext);
+ _transport.accept(settings, protocolEngineFactory, transports.contains(Transport.TCP) ? null : sslContext);
for(Transport transport : getTransports())
{
CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 36fafba1cd..b52da3039d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.stats.StatisticsGatherer;
import java.util.List;
@@ -82,4 +84,9 @@ public interface AMQConnectionModel extends StatisticsGatherer
long getSessionCountLimit();
long getLastIoTime();
+
+ Port getPort();
+
+ Transport getTransport();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index a84a4783ac..4574f87b54 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -62,6 +62,8 @@ import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.queue.QueueEntry;
@@ -86,6 +88,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+ private final Port _port;
private AMQShortString _contextKey;
@@ -153,11 +156,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Lock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
private final Broker _broker;
+ private final Transport _transport;
- public AMQProtocolEngine(Broker broker, NetworkConnection network, final long connectionId)
+ public AMQProtocolEngine(Broker broker,
+ NetworkConnection network,
+ final long connectionId,
+ Port port,
+ Transport transport)
{
_broker = broker;
+ _port = port;
+ _transport = transport;
_maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT);
_receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(broker, this);
@@ -1142,6 +1152,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _lastIoTime;
}
+ @Override
+ public Port getPort()
+ {
+ return _port;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return _transport;
+ }
+
public long getLastReceivedTime()
{
return _lastReceivedTime;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index d9e5e1c473..267857a34a 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -24,21 +24,35 @@ package org.apache.qpid.server.protocol;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.Principal;
import java.util.Set;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
+import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
private final long _id;
+ private final SSLContext _sslContext;
+ private final boolean _wantClientAuth;
+ private final boolean _needClientAuth;
+ private final Port _port;
+ private final Transport _transport;
private Set<AmqpProtocolVersion> _supported;
private String _fqdn;
@@ -50,19 +64,10 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(final Broker broker,
+ SSLContext sslContext, boolean wantClientAuth, boolean needClientAuth,
final Set<AmqpProtocolVersion> supported,
final AmqpProtocolVersion defaultSupportedReply,
- final long id,
- final NetworkConnection network)
- {
- this(broker, supported, defaultSupportedReply, id);
- setNetworkConnection(network);
- }
-
- public MultiVersionProtocolEngine(final Broker broker,
- final Set<AmqpProtocolVersion> supported,
- final AmqpProtocolVersion defaultSupportedReply,
- final long id)
+ Port port, Transport transport, final long id)
{
if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply))
{
@@ -74,6 +79,11 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_broker = broker;
_supported = supported;
_defaultSupportedReply = defaultSupportedReply;
+ _sslContext = sslContext;
+ _wantClientAuth = wantClientAuth;
+ _needClientAuth = needClientAuth;
+ _port = port;
+ _transport = transport;
}
@@ -252,7 +262,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_broker, _network, _id);
+ return new AMQProtocolEngine(_broker, _network, _id, _port, _transport);
}
};
@@ -272,7 +282,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_broker, _network, _id);
+ return new AMQProtocolEngine(_broker, _network, _id, _port, _transport);
}
};
@@ -292,7 +302,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_broker, _network, _id);
+ return new AMQProtocolEngine(_broker, _network, _id, _port, _transport);
}
};
@@ -321,7 +331,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
conn.setConnectionDelegate(connDelegate);
conn.setRemoteAddress(_network.getRemoteAddress());
conn.setLocalAddress(_network.getLocalAddress());
- return new ProtocolEngine_0_10( conn, _network);
+ return new ProtocolEngine_0_10( conn, _network, _port, _transport);
}
};
@@ -341,7 +351,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new ProtocolEngine_1_0_0(_network, _broker, _id);
+ return new ProtocolEngine_1_0_0(_network, _broker, _id, _port, _transport);
}
};
@@ -361,7 +371,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new ProtocolEngine_1_0_0_SASL(_network, _broker, _id);
+ return new ProtocolEngine_1_0_0_SASL(_network, _broker, _id, _port, _transport);
}
};
@@ -518,6 +528,14 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
}
+ if(newDelegate == null && looksLikeSSL(headerBytes))
+ {
+ if(_sslContext != null)
+ {
+ newDelegate = new SslDelegateProtocolEngine();
+ }
+ }
+
// If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
@@ -625,4 +643,218 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return 0;
}
}
+
+ private class SslDelegateProtocolEngine implements ServerProtocolEngine
+ {
+ private final MultiVersionProtocolEngine _decryptEngine;
+ private final SSLEngine _engine;
+ private final SSLReceiver _sslReceiver;
+ private final SSLBufferingSender _sslSender;
+
+ private SslDelegateProtocolEngine()
+ {
+
+ _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
+ _defaultSupportedReply, _port, Transport.SSL, _id);
+
+ _engine = _sslContext.createSSLEngine();
+ _engine.setUseClientMode(false);
+
+ if(_needClientAuth)
+ {
+ _engine.setNeedClientAuth(_needClientAuth);
+ }
+ else if(_wantClientAuth)
+ {
+ _engine.setWantClientAuth(_wantClientAuth);
+ }
+
+ SSLStatus sslStatus = new SSLStatus();
+ _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
+ _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
+ _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender));
+ }
+
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ _sslReceiver.received(msg);
+ _sslSender.send();
+ _sslSender.flush();
+ }
+
+ @Override
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ //TODO - Implement
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return _decryptEngine.getRemoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return _decryptEngine.getLocalAddress();
+ }
+
+ @Override
+ public long getWrittenBytes()
+ {
+ return _decryptEngine.getWrittenBytes();
+ }
+
+ @Override
+ public long getReadBytes()
+ {
+ return _decryptEngine.getReadBytes();
+ }
+
+ @Override
+ public void closed()
+ {
+ _decryptEngine.closed();
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _decryptEngine.writerIdle();
+ }
+
+ @Override
+ public void readerIdle()
+ {
+ _decryptEngine.readerIdle();
+ }
+
+ @Override
+ public void exception(Throwable t)
+ {
+ _decryptEngine.exception(t);
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return _decryptEngine.getConnectionId();
+ }
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _decryptEngine.getLastReadTime();
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _decryptEngine.getLastWriteTime();
+ }
+ }
+
+ private boolean looksLikeSSL(byte[] headerBytes)
+ {
+ return headerBytes[0] == 22 && // SSL Handshake
+ (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[2] == 0 || // SSL 3.0
+ headerBytes[2] == 1 || // TLS 1.0
+ headerBytes[2] == 2 || // TLS 1.1
+ headerBytes[2] == 3)) && // TLS1.2
+ (headerBytes[5] == 1); // client_hello
+ }
+
+ private static class SSLNetworkConnection implements NetworkConnection
+ {
+ private final NetworkConnection _network;
+ private final SSLBufferingSender _sslSender;
+ private final SSLEngine _engine;
+
+ public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
+ SSLBufferingSender sslSender)
+ {
+ _engine = engine;
+ _network = network;
+ _sslSender = sslSender;
+
+ }
+
+ @Override
+ public Sender<ByteBuffer> getSender()
+ {
+ return _sslSender;
+ }
+
+ @Override
+ public void start()
+ {
+ _network.start();
+ }
+
+ @Override
+ public void close()
+ {
+ _sslSender.close();
+
+ _network.close();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return _network.getRemoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return _network.getLocalAddress();
+ }
+
+ @Override
+ public void setMaxWriteIdle(int sec)
+ {
+ _network.setMaxWriteIdle(sec);
+ }
+
+ @Override
+ public void setMaxReadIdle(int sec)
+ {
+ _network.setMaxReadIdle(sec);
+ }
+
+ @Override
+ public void setPeerPrincipal(Principal principal)
+ {
+ _network.setPeerPrincipal(principal);
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ try
+ {
+ return _engine.getSession().getPeerPrincipal();
+ }
+ catch (SSLPeerUnverifiedException e)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _network.getMaxReadIdle();
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _network.getMaxWriteIdle();
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 9f078c8999..4b76546da1 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.protocol;
+import javax.net.ssl.SSLContext;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
@@ -34,9 +37,20 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
private final Broker _broker;
private final Set<AmqpProtocolVersion> _supported;
private final AmqpProtocolVersion _defaultSupportedReply;
+ private final SSLContext _sslContext;
+ private final boolean _wantClientAuth;
+ private final boolean _needClientAuth;
+ private final Port _port;
+ private final Transport _transport;
public MultiVersionProtocolEngineFactory(Broker broker,
- final Set<AmqpProtocolVersion> supportedVersions, final AmqpProtocolVersion defaultSupportedReply)
+ SSLContext sslContext,
+ boolean wantClientAuth,
+ boolean needClientAuth,
+ final Set<AmqpProtocolVersion> supportedVersions,
+ final AmqpProtocolVersion defaultSupportedReply,
+ Port port,
+ Transport transport)
{
if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply))
{
@@ -45,13 +59,20 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
}
_broker = broker;
+ _sslContext = sslContext;
_supported = supportedVersions;
_defaultSupportedReply = defaultSupportedReply;
+ _wantClientAuth = wantClientAuth;
+ _needClientAuth = needClientAuth;
+ _port = port;
+ _transport = transport;
}
public ServerProtocolEngine newProtocolEngine()
{
- return new MultiVersionProtocolEngine(_broker, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement());
+ return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+ _supported, _defaultSupportedReply, _port, _transport,
+ ID_GENERATOR.getAndIncrement()
+ );
}
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index d5f7fe486c..8275ab690c 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
@@ -47,11 +49,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkConnection network)
+ NetworkConnection network, Port port, Transport transport)
{
super(new Assembler(conn));
_connection = conn;
-
+ _connection.setPort(port);
+ _connection.setTransport(transport);
if(network != null)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index ed9cd324b4..c2210be935 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -40,6 +40,8 @@ import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -50,6 +52,8 @@ import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler
{
static final AtomicLong _connectionIdSource = new AtomicLong(0L);
+ private final Port _port;
+ private final Transport _transport;
//private NetworkConnection _networkDriver;
private long _readBytes;
@@ -100,9 +104,15 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
- public ProtocolEngine_1_0_0(final NetworkConnection networkDriver, final Broker broker, long id)
+ public ProtocolEngine_1_0_0(final NetworkConnection networkDriver,
+ final Broker broker,
+ long id,
+ Port port,
+ Transport transport)
{
_broker = broker;
+ _port = port;
+ _transport = transport;
_connectionId = id;
if(networkDriver != null)
{
@@ -153,7 +163,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
_conn = new ConnectionEndpoint(container, asSaslServerProvider(_broker.getSubjectCreator(
getLocalAddress())));
_conn.setRemoteAddress(_network.getRemoteAddress());
- _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId));
+ _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport));
_conn.setFrameOutputHandler(this);
_frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
index 124eb779d5..8e64ca74f9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -41,6 +41,8 @@ import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -50,50 +52,52 @@ import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
{
- private long _readBytes;
- private long _writtenBytes;
-
- private long _lastReadTime;
- private long _lastWriteTime;
- private final Broker _broker;
- private long _createTime = System.currentTimeMillis();
- private ConnectionEndpoint _conn;
- private long _connectionId;
-
- private static final ByteBuffer HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 3,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
- private static final ByteBuffer PROTOCOL_HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
-
- private FrameWriter _frameWriter;
- private ProtocolHandler _frameHandler;
- private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
- private Object _sendLock = new Object();
- private byte _major;
- private byte _minor;
- private byte _revision;
+ private final Port _port;
+ private final Transport _transport;
+ private long _readBytes;
+ private long _writtenBytes;
+
+ private long _lastReadTime;
+ private long _lastWriteTime;
+ private final Broker _broker;
+ private long _createTime = System.currentTimeMillis();
+ private ConnectionEndpoint _conn;
+ private long _connectionId;
+
+ private static final ByteBuffer HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 3,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+ private static final ByteBuffer PROTOCOL_HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+
+ private FrameWriter _frameWriter;
+ private ProtocolHandler _frameHandler;
+ private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
+ private Object _sendLock = new Object();
+ private byte _major;
+ private byte _minor;
+ private byte _revision;
private PrintWriter _out;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
@@ -111,14 +115,16 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
FRAME
}
- private State _state = State.A;
+ private State _state = State.A;
public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker broker,
- long id)
+ long id, Port port, Transport transport)
{
_connectionId = id;
_broker = broker;
+ _port = port;
+ _transport = transport;
if(networkDriver != null)
{
setNetworkConnection(networkDriver, networkDriver.getSender());
@@ -167,7 +173,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
SubjectCreator subjectCreator = _broker.getSubjectCreator(getLocalAddress());
_conn = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator));
_conn.setRemoteAddress(getRemoteAddress());
- _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId));
+ _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport));
_conn.setFrameOutputHandler(this);
_conn.setSaslFrameOutput(this);
@@ -333,102 +339,102 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
+ public void exception(Throwable t)
+ {
+ t.printStackTrace();
+ }
- public void closed()
- {
- // todo
+ public void closed()
+ {
+ // todo
_conn.inputClosed();
- if(_conn != null && _conn.getConnectionEventListener() != null)
+ if (_conn != null && _conn.getConnectionEventListener() != null)
{
- ((Connection_1_0)_conn.getConnectionEventListener()).closed();
+ ((Connection_1_0) _conn.getConnectionEventListener()).closed();
}
- }
+ }
- public long getCreateTime()
- {
- return _createTime;
- }
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
- public boolean canSend()
- {
- return true;
- }
+ public boolean canSend()
+ {
+ return true;
+ }
- public void send(final AMQFrame amqFrame)
- {
- send(amqFrame, null);
- }
+ public void send(final AMQFrame amqFrame)
+ {
+ send(amqFrame, null);
+ }
- private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
+ private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
- public void send(final AMQFrame amqFrame, ByteBuffer buf)
- {
+ public void send(final AMQFrame amqFrame, ByteBuffer buf)
+ {
- synchronized(_sendLock)
- {
- _lastWriteTime = System.currentTimeMillis();
- if(FRAME_LOGGER.isLoggable(Level.FINE))
- {
- FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
- }
+ synchronized (_sendLock)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ if (FRAME_LOGGER.isLoggable(Level.FINE))
+ {
+ FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
+ }
- _frameWriter.setValue(amqFrame);
+ _frameWriter.setValue(amqFrame);
- ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
+ ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
- int size = _frameWriter.writeToBuffer(dup);
- if(size > _conn.getMaxFrameSize())
- {
- throw new OversizeFrameException(amqFrame,size);
- }
+ int size = _frameWriter.writeToBuffer(dup);
+ if (size > _conn.getMaxFrameSize())
+ {
+ throw new OversizeFrameException(amqFrame, size);
+ }
- dup.flip();
- _writtenBytes += dup.limit();
+ dup.flip();
+ _writtenBytes += dup.limit();
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- ByteBuffer dup2 = dup.duplicate();
- byte[] data = new byte[dup2.remaining()];
- dup2.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
- }
+ if (RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup2 = dup.duplicate();
+ byte[] data = new byte[dup2.remaining()];
+ dup2.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
+ }
- _sender.send(dup);
- _sender.flush();
+ _sender.send(dup);
+ _sender.flush();
- }
- }
+ }
+ }
- public void send(short channel, FrameBody body)
- {
- AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
- send(frame);
+ public void send(short channel, FrameBody body)
+ {
+ AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+ send(frame);
- }
+ }
- public void close()
- {
- _sender.close();
- }
+ public void close()
+ {
+ _sender.close();
+ }
- public void setLogOutput(final PrintWriter out)
- {
- _out = out;
- }
+ public void setLogOutput(final PrintWriter out)
+ {
+ _out = out;
+ }
- public long getConnectionId()
- {
- return _connectionId;
- }
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
public long getLastReadTime()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index f79d34ea71..b274eabf04 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -29,6 +29,8 @@ import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -43,7 +45,9 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO
public class Connection_1_0 implements ConnectionEventListener
{
+ private final Port _port;
private VirtualHost _vhost;
+ private final Transport _transport;
private final ConnectionEndpoint _conn;
private final long _connectionId;
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
@@ -60,9 +64,15 @@ public class Connection_1_0 implements ConnectionEventListener
- public Connection_1_0(VirtualHost virtualHost, ConnectionEndpoint conn, long connectionId)
+ public Connection_1_0(VirtualHost virtualHost,
+ ConnectionEndpoint conn,
+ long connectionId,
+ Port port,
+ Transport transport)
{
_vhost = virtualHost;
+ _port = port;
+ _transport = transport;
_conn = conn;
_connectionId = connectionId;
_vhost.getConnectionRegistry().registerConnection(_model);
@@ -230,6 +240,18 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
+ public Port getPort()
+ {
+ return _port;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return _transport;
+ }
+
+ @Override
public void initialiseStatistics()
{
_messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 58de6a0cdf..b49bd89266 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -35,6 +35,8 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
@@ -66,10 +68,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHost _virtualHost;
+ private Port _port;
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Principal _peerPrincipal;
private NetworkConnection _networkConnection;
+ private Transport _transport;
public ServerConnection(final long connectionId)
{
@@ -148,6 +152,28 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
initialiseStatistics();
}
+ @Override
+ public Port getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(Port port)
+ {
+ _port = port;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return _transport;
+ }
+
+ public void setTransport(Transport transport)
+ {
+ _transport = transport;
+ }
+
public void onOpen(final Runnable task)
{
_onOpenTask = task;
@@ -451,6 +477,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
return _lastIoTime.longValue();
}
+
public String getClientId()
{
return getConnectionDelegate().getClientId();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 3216f8886a..5d3051f7b2 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -60,7 +60,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
public InternalTestProtocolSession(VirtualHost virtualHost, Broker broker) throws AMQException
{
- super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement());
+ super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index 02b8c74feb..0d4b4ba9f8 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -154,7 +154,8 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory(_broker, versions, null);
+ new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, null, null,
+ org.apache.qpid.server.model.Transport.TCP);
//create a dummy to retrieve the 'current' ID number
long previousId = factory.newProtocolEngine().getConnectionId();
@@ -192,7 +193,8 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
try
{
- new MultiVersionProtocolEngineFactory(_broker, versions, AmqpProtocolVersion.v0_9);
+ new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, AmqpProtocolVersion.v0_9, null,
+ org.apache.qpid.server.model.Transport.TCP);
fail("should not have been allowed to create the factory");
}
catch(IllegalArgumentException iae)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 65dbf7bae1..0bc20836f6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -27,6 +27,8 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -539,5 +541,17 @@ public class MockSubscription implements Subscription
{
return 0;
}
+
+ @Override
+ public Port getPort()
+ {
+ return null;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return null;
+ }
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
index 89d434e95d..efb62ba085 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
@@ -73,7 +73,7 @@ public class SubscriptionFactoryImplTest extends QpidTestCase
}
/**
- * Tests that while creating Subscriptions of various types, the
+ * Tests that while creating Subscriptions of various types, the
* ID numbers assigned are allocated from a common sequence
* (in increasing order).
*/
@@ -104,7 +104,7 @@ public class SubscriptionFactoryImplTest extends QpidTestCase
//create a 0-10 subscription
ServerConnection conn = new ServerConnection(1);
- ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection());
+ ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), null, null);
conn.setVirtualHost(_session.getVirtualHost());
ServerSessionDelegate sesDel = new ServerSessionDelegate();
Binary name = new Binary(new byte[]{new Byte("1")});
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index c8027e143e..5742667dbe 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -239,7 +239,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
ticker);
ticker.setConnection(connection);
- if(_sslContext != null)
+ if(_sslContext != null && socket instanceof SSLSocket)
{
try
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
new file mode 100644
index 0000000000..0d36b96cd4
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.security.ssl;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLBufferingSender implements Sender<ByteBuffer>
+{
+ private static final Logger log = Logger.get(SSLBufferingSender.class);
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ private final Sender<ByteBuffer> delegate;
+ private final SSLEngine engine;
+ private final int sslBufSize;
+ private final ByteBuffer netData;
+ private final SSLStatus _sslStatus;
+
+ private String _hostname;
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
+
+
+ public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
+ {
+ this.engine = engine;
+ this.delegate = delegate;
+ sslBufSize = engine.getSession().getPacketBufferSize();
+ netData = ByteBuffer.allocate(sslBufSize);
+ _sslStatus = sslStatus;
+ }
+
+ public void setHostname(String hostname)
+ {
+ _hostname = hostname;
+ }
+
+ public void close()
+ {
+ if (!closed.getAndSet(true))
+ {
+ if (engine.isOutboundDone())
+ {
+ return;
+ }
+ log.debug("Closing SSL connection");
+
+ engine.closeOutbound();
+ try
+ {
+ tearDownSSLConnection();
+ }
+ catch(Exception e)
+ {
+ throw new SenderException("Error closing SSL connection",e);
+ }
+
+
+ synchronized(_sslStatus.getSslLock())
+ {
+ while (!engine.isOutboundDone())
+ {
+ try
+ {
+ _sslStatus.getSslLock().wait();
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+
+ }
+ }
+ delegate.close();
+ }
+ }
+
+ private void tearDownSSLConnection() throws Exception
+ {
+ SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
+ Status status = result.getStatus();
+ int read = result.bytesProduced();
+ while (status != Status.CLOSED)
+ {
+ if (status == Status.BUFFER_OVERFLOW)
+ {
+ netData.clear();
+ }
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ delegate.send(data);
+ flush();
+ }
+ result = engine.wrap(ByteBuffer.allocate(0), netData);
+ status = result.getStatus();
+ read = result.bytesProduced();
+ }
+ }
+
+ public void flush()
+ {
+ delegate.flush();
+ }
+
+ public void send()
+ {
+ doSend();
+ }
+
+ public synchronized void send(ByteBuffer appData)
+ {
+ boolean buffered;
+ if(buffered = _appData.hasRemaining())
+ {
+ ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
+ newBuf.put(_appData);
+ newBuf.put(appData);
+ newBuf.flip();
+ _appData = newBuf;
+ }
+ doSend();
+ if(!appData.hasRemaining())
+ {
+ _appData = EMPTY_BYTE_BUFFER;
+ }
+ else if(!buffered)
+ {
+ _appData = ByteBuffer.allocate(appData.remaining());
+ _appData.put(appData);
+ _appData.flip();
+ }
+ }
+
+ private synchronized void doSend()
+ {
+ if (closed.get())
+ {
+ throw new SenderException("SSL Sender is closed");
+ }
+
+ HandshakeStatus handshakeStatus;
+ Status status;
+
+ while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
+ && !_sslStatus.getSslErrorFlag())
+ {
+ int read = 0;
+ try
+ {
+ SSLEngineResult result = engine.wrap(_appData, netData);
+ read = result.bytesProduced();
+ status = result.getStatus();
+ handshakeStatus = result.getHandshakeStatus();
+ }
+ catch(SSLException e)
+ {
+ // Should this set _sslError??
+ throw new SenderException("SSL, Error occurred while encrypting data",e);
+ }
+
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ delegate.send(data);
+ }
+
+ switch(status)
+ {
+ case CLOSED:
+ throw new SenderException("SSLEngine is closed");
+
+ case BUFFER_OVERFLOW:
+ netData.clear();
+ continue;
+
+ case OK:
+ break; // do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ switch (handshakeStatus)
+ {
+ case NEED_WRAP:
+ if (netData.hasRemaining())
+ {
+ continue;
+ }
+
+ case NEED_TASK:
+ doTasks();
+ break;
+
+ case NEED_UNWRAP:
+ flush();
+ return;
+
+ case FINISHED:
+ if (_hostname != null)
+ {
+ SSLUtil.verifyHostname(engine, _hostname);
+ }
+
+ case NOT_HANDSHAKING:
+ break; //do nothing
+
+ default:
+ throw new IllegalStateException("SSLSender: Invalid State " + status);
+ }
+
+ }
+ }
+
+ private void doTasks()
+ {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ delegate.setIdleTimeout(i);
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
index 71b763685e..7492d062fd 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,16 +7,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package org.apache.qpid.client.ssl;
@@ -25,6 +25,7 @@ import static org.apache.qpid.test.utils.TestSSLConstants.KEYSTORE_PASSWORD;
import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE;
import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE_PASSWORD;
+import java.util.Arrays;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQTestConnection_0_10;
@@ -64,23 +65,23 @@ public class SSLTest extends QpidBrokerTestCase
if (shouldPerformTest())
{
clearSslStoreSystemProperties();
-
+
//Start the broker (NEEDing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, true, false);
+ configureJavaBrokerIfNecessary(true, true, true, false, false);
super.setUp();
String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" +
- "?ssl='true'&ssl_verify_hostname='true'" +
+ "?ssl='true'&ssl_verify_hostname='true'" +
"&key_store='%s'&key_store_password='%s'" +
"&trust_store='%s'&trust_store_password='%s'" +
"'";
-
+
url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT,
KEYSTORE,KEYSTORE_PASSWORD,TRUSTSTORE,TRUSTSTORE_PASSWORD);
Connection con = getConnection(new AMQConnectionURL(url));
assertNotNull("connection should be successful", con);
- Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
assertNotNull("create session should be successful", ssn);
}
}
@@ -95,7 +96,7 @@ public class SSLTest extends QpidBrokerTestCase
if (shouldPerformTest())
{
//Start the broker (NEEDing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, true, false);
+ configureJavaBrokerIfNecessary(true, true, true, false, false);
super.setUp();
//Create URL enabling SSL at the connection rather than brokerlist level
@@ -119,7 +120,7 @@ public class SSLTest extends QpidBrokerTestCase
if (shouldPerformTest())
{
//Start the broker (NEEDing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, true, false);
+ configureJavaBrokerIfNecessary(true, true, true, false, false);
super.setUp();
//Create URL enabling SSL at the connection, overriding the false at the brokerlist level
@@ -138,18 +139,18 @@ public class SSLTest extends QpidBrokerTestCase
if (shouldPerformTest())
{
//Start the broker (NEEDing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, true, false);
+ configureJavaBrokerIfNecessary(true, true, true, false, false);
super.setUp();
String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s?ssl='true''";
url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT);
-
+
Connection con = getConnection(new AMQConnectionURL(url));
assertNotNull("connection should be successful", con);
- Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
assertNotNull("create session should be successful", ssn);
- }
+ }
}
public void testMultipleCertsInSingleStore() throws Exception
@@ -157,43 +158,43 @@ public class SSLTest extends QpidBrokerTestCase
if (shouldPerformTest())
{
//Start the broker (NEEDing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, true, false);
+ configureJavaBrokerIfNecessary(true, true, true, false, false);
super.setUp();
- String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
- QpidBrokerTestCase.DEFAULT_SSL_PORT +
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
+ QpidBrokerTestCase.DEFAULT_SSL_PORT +
"?ssl='true'&ssl_cert_alias='" + CERT_ALIAS_APP1 + "''";
-
- AMQTestConnection_0_10 con = new AMQTestConnection_0_10(url);
+
+ AMQTestConnection_0_10 con = new AMQTestConnection_0_10(url);
org.apache.qpid.transport.Connection transportCon = con.getConnection();
String userID = transportCon.getSecurityLayer().getUserID();
assertEquals("The correct certificate was not choosen","app1@acme.org",userID);
con.close();
-
- url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
- QpidBrokerTestCase.DEFAULT_SSL_PORT +
+
+ url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
+ QpidBrokerTestCase.DEFAULT_SSL_PORT +
"?ssl='true'&ssl_cert_alias='" + CERT_ALIAS_APP2 + "''";
-
- con = new AMQTestConnection_0_10(url);
+
+ con = new AMQTestConnection_0_10(url);
transportCon = con.getConnection();
userID = transportCon.getSecurityLayer().getUserID();
assertEquals("The correct certificate was not choosen","app2@acme.org",userID);
con.close();
- }
+ }
}
-
+
public void testVerifyHostNameWithIncorrectHostname() throws Exception
{
if (shouldPerformTest())
{
//Start the broker (WANTing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, false, true);
+ configureJavaBrokerIfNecessary(true, true, false, true, false);
super.setUp();
- String url = "amqp://guest:guest@test/?brokerlist='tcp://127.0.0.1:" +
- QpidBrokerTestCase.DEFAULT_SSL_PORT +
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://127.0.0.1:" +
+ QpidBrokerTestCase.DEFAULT_SSL_PORT +
"?ssl='true'&ssl_verify_hostname='true''";
-
+
try
{
getConnection(new AMQConnectionURL(url));
@@ -203,7 +204,7 @@ public class SSLTest extends QpidBrokerTestCase
{
verifyExceptionCausesContains(e, "SSL hostname verification failed");
}
- }
+ }
}
private void verifyExceptionCausesContains(Exception e, String expectedString)
@@ -213,39 +214,39 @@ public class SSLTest extends QpidBrokerTestCase
String strace = bout.toString();
assertTrue("Correct exception not thrown", strace.contains(expectedString));
}
-
+
public void testVerifyLocalHost() throws Exception
{
if (shouldPerformTest())
{
//Start the broker (WANTing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, false, true);
+ configureJavaBrokerIfNecessary(true, true, false, true, false);
super.setUp();
- String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
- QpidBrokerTestCase.DEFAULT_SSL_PORT +
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:" +
+ QpidBrokerTestCase.DEFAULT_SSL_PORT +
"?ssl='true'&ssl_verify_hostname='true''";
Connection con = getConnection(new AMQConnectionURL(url));
assertNotNull("connection should have been created", con);
}
}
-
+
public void testVerifyLocalHostLocalDomain() throws Exception
{
if (shouldPerformTest())
{
//Start the broker (WANTing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, false, true);
+ configureJavaBrokerIfNecessary(true, true, false, true, false);
super.setUp();
- String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost.localdomain:" +
- QpidBrokerTestCase.DEFAULT_SSL_PORT +
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost.localdomain:" +
+ QpidBrokerTestCase.DEFAULT_SSL_PORT +
"?ssl='true'&ssl_verify_hostname='true''";
Connection con = getConnection(new AMQConnectionURL(url));
assertNotNull("connection should have been created", con);
- }
+ }
}
public void testCreateSSLConnectionUsingConnectionURLParamsTrustStoreOnly() throws Exception
@@ -255,12 +256,12 @@ public class SSLTest extends QpidBrokerTestCase
clearSslStoreSystemProperties();
//Start the broker (WANTing client certificate authentication)
- configureJavaBrokerIfNecessary(true, true, false, true);
+ configureJavaBrokerIfNecessary(true, true, false, true, false);
super.setUp();
-
+
String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" +
- "?ssl='true'&ssl_verify_hostname='true'" +
+ "?ssl='true'&ssl_verify_hostname='true'" +
"&trust_store='%s'&trust_store_password='%s'" +
"'";
@@ -268,9 +269,9 @@ public class SSLTest extends QpidBrokerTestCase
Connection con = getConnection(new AMQConnectionURL(url));
assertNotNull("connection should be successful", con);
- Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
assertNotNull("create session should be successful", ssn);
- }
+ }
}
/**
@@ -308,7 +309,7 @@ public class SSLTest extends QpidBrokerTestCase
clearSslStoreSystemProperties();
//Start the broker
- configureJavaBrokerIfNecessary(true, true, needClientCerts, wantClientCerts);
+ configureJavaBrokerIfNecessary(true, true, needClientCerts, wantClientCerts, false);
super.setUp();
String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" +
@@ -344,6 +345,47 @@ public class SSLTest extends QpidBrokerTestCase
}
}
+ /**
+ * Test running TLS and unencrypted on the same port works and both TLS and non-TLS connections can be established
+ *
+ */
+ public void testCreateSSLandTCPonSamePort() throws Exception
+ {
+ if (shouldPerformTest())
+ {
+ clearSslStoreSystemProperties();
+
+ //Start the broker (NEEDing client certificate authentication)
+ configureJavaBrokerIfNecessary(true, false, false, false, true);
+ super.setUp();
+
+ String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" +
+ "?ssl='true'&ssl_verify_hostname='true'" +
+ "&key_store='%s'&key_store_password='%s'" +
+ "&trust_store='%s'&trust_store_password='%s'" +
+ "'";
+
+ url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT,
+ KEYSTORE,KEYSTORE_PASSWORD,TRUSTSTORE,TRUSTSTORE_PASSWORD);
+
+ Connection con = getConnection(new AMQConnectionURL(url));
+ assertNotNull("connection should be successful", con);
+ Session ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ assertNotNull("create session should be successful", ssn);
+
+ url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s'";
+
+ url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT);
+
+ con = getConnection(new AMQConnectionURL(url));
+ assertNotNull("connection should be successful", con);
+ ssn = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ assertNotNull("create session should be successful", ssn);
+
+ }
+ }
+
+
private boolean shouldPerformTest()
{
// We run the SSL tests on all the Java broker profiles
@@ -355,12 +397,17 @@ public class SSLTest extends QpidBrokerTestCase
return Boolean.getBoolean(PROFILE_USE_SSL);
}
- private void configureJavaBrokerIfNecessary(boolean sslEnabled, boolean sslOnly, boolean needClientAuth, boolean wantClientAuth) throws ConfigurationException
+ private void configureJavaBrokerIfNecessary(boolean sslEnabled,
+ boolean sslOnly,
+ boolean needClientAuth,
+ boolean wantClientAuth,
+ boolean samePort) throws ConfigurationException
{
if(isJavaBroker())
{
Map<String, Object> sslPortAttributes = new HashMap<String, Object>();
- sslPortAttributes.put(Port.TRANSPORTS, Collections.singleton(Transport.SSL));
+ sslPortAttributes.put(Port.TRANSPORTS, samePort ? Arrays.asList(Transport.SSL, Transport.TCP)
+ : Collections.singleton(Transport.SSL));
sslPortAttributes.put(Port.PORT, DEFAULT_SSL_PORT);
sslPortAttributes.put(Port.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
sslPortAttributes.put(Port.NEED_CLIENT_AUTH, needClientAuth);
diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes
index b429305572..a404ff4389 100755
--- a/java/test-profiles/CPPExcludes
+++ b/java/test-profiles/CPPExcludes
@@ -180,6 +180,8 @@ org.apache.qpid.client.failover.MultipleBrokersFailoverTest#*
// Uses Java broker specific configuration
org.apache.qpid.client.ssl.SSLTest#testClientCertMissingWhilstWanting
+org.apache.qpid.client.ssl.SSLTest#testCreateSSLandTCPonSamePort
+
// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based
org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats