summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java468
1 files changed, 192 insertions, 276 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 49c0812f4a..2ccf595c26 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -24,40 +24,31 @@ package org.apache.qpid.server.protocol;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.security.Principal;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.transport.Sender;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
private final long _id;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
- private final Transport _transport;
+ private Transport _transport;
private final ProtocolEngineCreator[] _creators;
private final Runnable _onCloseTask;
@@ -65,15 +56,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private String _fqdn;
private final Broker<?> _broker;
private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
+ private ByteBufferSender _sender;
private final Protocol _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+ private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
public MultiVersionProtocolEngine(final Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supported,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -92,15 +81,23 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_broker = broker;
_supported = supported;
_defaultSupportedReply = defaultSupportedReply;
- _sslContext = sslContext;
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
_creators = creators;
_onCloseTask = onCloseTask;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ _delegate.setMessageAssignmentSuspended(value);
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _delegate.isMessageAssignmentSuspended();
+ }
public SocketAddress getRemoteAddress()
{
@@ -147,6 +144,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_delegate.readerIdle();
}
+ @Override
+ public void encryptedTransport()
+ {
+ _delegate.encryptedTransport();
+ }
+
public void received(ByteBuffer msg)
{
@@ -169,9 +172,21 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _delegate.isTransportBlockedForWriting();
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _delegate.setTransportBlockedForWriting(blocked);
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
_network = network;
SocketAddress address = _network.getLocalAddress();
@@ -198,10 +213,82 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getLastWriteTime();
}
+ @Override
+ public void processPending()
+ {
+ _delegate.processPending();
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _delegate.hasWork();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _delegate.notifyWork();
+ }
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ _delegate.setWorkListener(listener);
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _delegate.clearWork();
+ }
private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPending()
+ {
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return false;
+ }
+
+ @Override
+ public void notifyWork()
+ {
+
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+
+ }
+
+ @Override
+ public void clearWork()
+ {
+
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -247,7 +334,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ @Override
+ public void encryptedTransport()
+ {
+
+ }
+
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
}
@@ -274,12 +367,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
return new Subject();
}
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
}
private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
- private long _lastReadTime;
+ private long _lastReadTime = System.currentTimeMillis();
+ private final AtomicBoolean _hasWork = new AtomicBoolean();
public SocketAddress getRemoteAddress()
{
@@ -301,6 +406,47 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return 0;
}
+ @Override
+ public void setMessageAssignmentSuspended(final boolean value)
+ {
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public void processPending()
+ {
+
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _hasWork.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _hasWork.set(true);
+ }
+
+ @Override
+ public void setWorkListener(final Action<ServerProtocolEngine> listener)
+ {
+
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _hasWork.set(false);
+ }
+
public void received(ByteBuffer msg)
{
_lastReadTime = System.currentTimeMillis();
@@ -360,15 +506,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
}
-
- if(newDelegate == null && looksLikeSSL(headerBytes))
- {
- if(_sslContext != null)
- {
- newDelegate = new SslDelegateProtocolEngine();
- }
- }
-
// If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
@@ -398,8 +535,13 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
else
{
+ boolean hasWork = _delegate.hasWork();
+ if (hasWork)
+ {
+ newDelegate.notifyWork();
+ }
_delegate = newDelegate;
-
+ _delegate.setWorkListener(_workListener.get());
_header.flip();
_delegate.received(_header);
if(msg.hasRemaining())
@@ -423,6 +565,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return _delegate.getSubject();
}
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return false;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
@@ -466,132 +619,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_network.close();
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
-
- }
-
- @Override
- public long getLastReadTime()
- {
- return _lastReadTime;
- }
-
@Override
- public long getLastWriteTime()
+ public void encryptedTransport()
{
- return 0;
- }
- }
-
- private class SslDelegateProtocolEngine implements ServerProtocolEngine
- {
- private final MultiVersionProtocolEngine _decryptEngine;
- private final SSLEngine _engine;
- private final SSLReceiver _sslReceiver;
- private final SSLBufferingSender _sslSender;
- private long _lastReadTime;
-
- private SslDelegateProtocolEngine()
- {
-
- _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
- _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
- null);
-
- _engine = _sslContext.createSSLEngine();
- _engine.setUseClientMode(false);
- SSLUtil.removeSSLv3Support(_engine);
- SSLUtil.updateEnabledCipherSuites(_engine, _port.getEnabledCipherSuites(), _port.getDisabledCipherSuites());
-
- if(_needClientAuth)
- {
- _engine.setNeedClientAuth(true);
- }
- else if(_wantClientAuth)
+ if(_transport == Transport.TCP)
{
- _engine.setWantClientAuth(true);
+ _transport = Transport.SSL;
}
-
- SSLStatus sslStatus = new SSLStatus();
- _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
- _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
- _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
- }
-
- @Override
- public void received(ByteBuffer msg)
- {
- _lastReadTime = System.currentTimeMillis();
- _sslReceiver.received(msg);
- _sslSender.send();
- _sslSender.flush();
- }
-
- @Override
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
- //TODO - Implement
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _decryptEngine.getRemoteAddress();
}
- @Override
- public SocketAddress getLocalAddress()
+ public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender)
{
- return _decryptEngine.getLocalAddress();
- }
- @Override
- public long getWrittenBytes()
- {
- return _decryptEngine.getWrittenBytes();
- }
-
- @Override
- public long getReadBytes()
- {
- return _decryptEngine.getReadBytes();
- }
-
- @Override
- public void closed()
- {
- _decryptEngine.closed();
- }
-
- @Override
- public void writerIdle()
- {
- _decryptEngine.writerIdle();
- }
-
- @Override
- public void readerIdle()
- {
- _decryptEngine.readerIdle();
- }
-
- @Override
- public void exception(Throwable t)
- {
- _decryptEngine.exception(t);
- }
-
- @Override
- public long getConnectionId()
- {
- return _decryptEngine.getConnectionId();
- }
-
- @Override
- public Subject getSubject()
- {
- return _decryptEngine.getSubject();
}
@Override
@@ -603,132 +642,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
@Override
public long getLastWriteTime()
{
- return _decryptEngine.getLastWriteTime();
+ return 0;
}
}
- private boolean looksLikeSSL(byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
- private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
- }
-
- private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
-
- private static class SSLNetworkConnection implements NetworkConnection
- {
- private final NetworkConnection _network;
- private final SSLBufferingSender _sslSender;
- private final SSLEngine _engine;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
- SSLBufferingSender sslSender)
- {
- _engine = engine;
- _network = network;
- _sslSender = sslSender;
-
- }
-
- @Override
- public Sender<ByteBuffer> getSender()
- {
- return _sslSender;
- }
-
- @Override
- public void start()
- {
- _network.start();
- }
-
- @Override
- public void close()
- {
- _sslSender.close();
-
- _network.close();
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- @Override
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- @Override
- public void setMaxWriteIdle(int sec)
- {
- _network.setMaxWriteIdle(sec);
- }
- @Override
- public void setMaxReadIdle(int sec)
- {
- _network.setMaxReadIdle(sec);
- }
-
- @Override
- public Principal getPeerPrincipal()
- {
- synchronized (_lock)
- {
- if(!_principalChecked)
- {
- try
- {
- _principal = _engine.getSession().getPeerPrincipal();
- }
- catch (SSLPeerUnverifiedException e)
- {
- _principal = null;
- }
-
- _principalChecked = true;
- }
-
- return _principal;
- }
- }
-
- @Override
- public int getMaxReadIdle()
- {
- return _network.getMaxReadIdle();
- }
-
- @Override
- public int getMaxWriteIdle()
- {
- return _network.getMaxWriteIdle();
- }
- }
}