diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-x | qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java | 468 |
1 files changed, 192 insertions, 276 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 49c0812f4a..2ccf595c26 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -24,40 +24,31 @@ package org.apache.qpid.server.protocol; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.security.Principal; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ProtocolEngineCreator; -import org.apache.qpid.transport.Sender; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender; -import org.apache.qpid.transport.network.security.ssl.SSLReceiver; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; public class MultiVersionProtocolEngine implements ServerProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); private final long _id; - private final SSLContext _sslContext; - private final boolean _wantClientAuth; - private final boolean _needClientAuth; private final AmqpPort<?> _port; - private final Transport _transport; + private Transport _transport; private final ProtocolEngineCreator[] _creators; private final Runnable _onCloseTask; @@ -65,15 +56,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private String _fqdn; private final Broker<?> _broker; private NetworkConnection _network; - private Sender<ByteBuffer> _sender; + private ByteBufferSender _sender; private final Protocol _defaultSupportedReply; private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); + private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); public MultiVersionProtocolEngine(final Broker<?> broker, - SSLContext sslContext, - boolean wantClientAuth, - boolean needClientAuth, final Set<Protocol> supported, final Protocol defaultSupportedReply, AmqpPort<?> port, @@ -92,15 +81,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _broker = broker; _supported = supported; _defaultSupportedReply = defaultSupportedReply; - _sslContext = sslContext; - _wantClientAuth = wantClientAuth; - _needClientAuth = needClientAuth; _port = port; _transport = transport; _creators = creators; _onCloseTask = onCloseTask; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + _delegate.setMessageAssignmentSuspended(value); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _delegate.isMessageAssignmentSuspended(); + } public SocketAddress getRemoteAddress() { @@ -147,6 +144,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _delegate.readerIdle(); } + @Override + public void encryptedTransport() + { + _delegate.encryptedTransport(); + } + public void received(ByteBuffer msg) { @@ -169,9 +172,21 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return _delegate.isTransportBlockedForWriting(); + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _delegate.setTransportBlockedForWriting(blocked); + } + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { _network = network; SocketAddress address = _network.getLocalAddress(); @@ -198,10 +213,82 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getLastWriteTime(); } + @Override + public void processPending() + { + _delegate.processPending(); + } + + @Override + public boolean hasWork() + { + return _delegate.hasWork(); + } + + @Override + public void notifyWork() + { + _delegate.notifyWork(); + } + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + _workListener.set(listener); + _delegate.setWorkListener(listener); + } + + @Override + public void clearWork() + { + _delegate.clearWork(); + } private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { + + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPending() + { + + } + + @Override + public boolean hasWork() + { + return false; + } + + @Override + public void notifyWork() + { + + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + + } + + @Override + public void clearWork() + { + + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -247,7 +334,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + @Override + public void encryptedTransport() + { + + } + + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { } @@ -274,12 +367,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { return new Subject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } } private class SelfDelegateProtocolEngine implements ServerProtocolEngine { private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); - private long _lastReadTime; + private long _lastReadTime = System.currentTimeMillis(); + private final AtomicBoolean _hasWork = new AtomicBoolean(); public SocketAddress getRemoteAddress() { @@ -301,6 +406,47 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return 0; } + @Override + public void setMessageAssignmentSuspended(final boolean value) + { + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return false; + } + + @Override + public void processPending() + { + + } + + @Override + public boolean hasWork() + { + return _hasWork.get(); + } + + @Override + public void notifyWork() + { + _hasWork.set(true); + } + + @Override + public void setWorkListener(final Action<ServerProtocolEngine> listener) + { + + } + + @Override + public void clearWork() + { + _hasWork.set(false); + } + public void received(ByteBuffer msg) { _lastReadTime = System.currentTimeMillis(); @@ -360,15 +506,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } } - - if(newDelegate == null && looksLikeSSL(headerBytes)) - { - if(_sslContext != null) - { - newDelegate = new SslDelegateProtocolEngine(); - } - } - // If no delegate is found then send back a supported protocol version id if(newDelegate == null) { @@ -398,8 +535,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } else { + boolean hasWork = _delegate.hasWork(); + if (hasWork) + { + newDelegate.notifyWork(); + } _delegate = newDelegate; - + _delegate.setWorkListener(_workListener.get()); _header.flip(); _delegate.received(_header); if(msg.hasRemaining()) @@ -423,6 +565,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } + public void exception(Throwable t) { _logger.error("Error establishing session", t); @@ -466,132 +619,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _network.close(); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) - { - - } - - @Override - public long getLastReadTime() - { - return _lastReadTime; - } - @Override - public long getLastWriteTime() + public void encryptedTransport() { - return 0; - } - } - - private class SslDelegateProtocolEngine implements ServerProtocolEngine - { - private final MultiVersionProtocolEngine _decryptEngine; - private final SSLEngine _engine; - private final SSLReceiver _sslReceiver; - private final SSLBufferingSender _sslSender; - private long _lastReadTime; - - private SslDelegateProtocolEngine() - { - - _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported, - _defaultSupportedReply, _port, Transport.SSL, _id, _creators, - null); - - _engine = _sslContext.createSSLEngine(); - _engine.setUseClientMode(false); - SSLUtil.removeSSLv3Support(_engine); - SSLUtil.updateEnabledCipherSuites(_engine, _port.getEnabledCipherSuites(), _port.getDisabledCipherSuites()); - - if(_needClientAuth) - { - _engine.setNeedClientAuth(true); - } - else if(_wantClientAuth) + if(_transport == Transport.TCP) { - _engine.setWantClientAuth(true); + _transport = Transport.SSL; } - - SSLStatus sslStatus = new SSLStatus(); - _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus); - _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus); - _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender); - } - - @Override - public void received(ByteBuffer msg) - { - _lastReadTime = System.currentTimeMillis(); - _sslReceiver.received(msg); - _sslSender.send(); - _sslSender.flush(); - } - - @Override - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) - { - //TODO - Implement - } - - @Override - public SocketAddress getRemoteAddress() - { - return _decryptEngine.getRemoteAddress(); } - @Override - public SocketAddress getLocalAddress() + public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) { - return _decryptEngine.getLocalAddress(); - } - @Override - public long getWrittenBytes() - { - return _decryptEngine.getWrittenBytes(); - } - - @Override - public long getReadBytes() - { - return _decryptEngine.getReadBytes(); - } - - @Override - public void closed() - { - _decryptEngine.closed(); - } - - @Override - public void writerIdle() - { - _decryptEngine.writerIdle(); - } - - @Override - public void readerIdle() - { - _decryptEngine.readerIdle(); - } - - @Override - public void exception(Throwable t) - { - _decryptEngine.exception(t); - } - - @Override - public long getConnectionId() - { - return _decryptEngine.getConnectionId(); - } - - @Override - public Subject getSubject() - { - return _decryptEngine.getSubject(); } @Override @@ -603,132 +642,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine @Override public long getLastWriteTime() { - return _decryptEngine.getLastWriteTime(); + return 0; } } - private boolean looksLikeSSL(byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); - } - - private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - - private static class SSLNetworkConnection implements NetworkConnection - { - private final NetworkConnection _network; - private final SSLBufferingSender _sslSender; - private final SSLEngine _engine; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public SSLNetworkConnection(SSLEngine engine, NetworkConnection network, - SSLBufferingSender sslSender) - { - _engine = engine; - _network = network; - _sslSender = sslSender; - - } - - @Override - public Sender<ByteBuffer> getSender() - { - return _sslSender; - } - - @Override - public void start() - { - _network.start(); - } - - @Override - public void close() - { - _sslSender.close(); - - _network.close(); - } - - @Override - public SocketAddress getRemoteAddress() - { - return _network.getRemoteAddress(); - } - - @Override - public SocketAddress getLocalAddress() - { - return _network.getLocalAddress(); - } - - @Override - public void setMaxWriteIdle(int sec) - { - _network.setMaxWriteIdle(sec); - } - @Override - public void setMaxReadIdle(int sec) - { - _network.setMaxReadIdle(sec); - } - - @Override - public Principal getPeerPrincipal() - { - synchronized (_lock) - { - if(!_principalChecked) - { - try - { - _principal = _engine.getSession().getPeerPrincipal(); - } - catch (SSLPeerUnverifiedException e) - { - _principal = null; - } - - _principalChecked = true; - } - - return _principal; - } - } - - @Override - public int getMaxReadIdle() - { - return _network.getMaxReadIdle(); - } - - @Override - public int getMaxWriteIdle() - { - return _network.getMaxWriteIdle(); - } - } } |