diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java | 92 |
1 files changed, 76 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 5c92aa95b6..d9e5e1c473 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -28,7 +28,7 @@ import java.util.Set; import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Sender; @@ -42,24 +42,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private Set<AmqpProtocolVersion> _supported; private String _fqdn; - private IApplicationRegistry _appRegistry; + private final Broker _broker; private NetworkConnection _network; private Sender<ByteBuffer> _sender; private final AmqpProtocolVersion _defaultSupportedReply; private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); - public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry, + public MultiVersionProtocolEngine(final Broker broker, final Set<AmqpProtocolVersion> supported, final AmqpProtocolVersion defaultSupportedReply, final long id, final NetworkConnection network) { - this(appRegistry, supported, defaultSupportedReply, id); + this(broker, supported, defaultSupportedReply, id); setNetworkConnection(network); } - public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry, + public MultiVersionProtocolEngine(final Broker broker, final Set<AmqpProtocolVersion> supported, final AmqpProtocolVersion defaultSupportedReply, final long id) @@ -71,7 +71,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } _id = id; - _appRegistry = appRegistry; + _broker = broker; _supported = supported; _defaultSupportedReply = defaultSupportedReply; } @@ -217,6 +217,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _sender = sender; } + @Override + public long getLastReadTime() + { + return _delegate.getLastReadTime(); + } + + @Override + public long getLastWriteTime() + { + return _delegate.getLastWriteTime(); + } + private static interface DelegateCreator { @@ -240,7 +252,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); + return new AMQProtocolEngine(_broker, _network, _id); } }; @@ -260,7 +272,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); + return new AMQProtocolEngine(_broker, _network, _id); } }; @@ -280,7 +292,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); + return new AMQProtocolEngine(_broker, _network, _id); } }; @@ -301,13 +313,15 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - final ConnectionDelegate connDelegate = - new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn, _appRegistry.getAuthenticationManager(getLocalAddress())); + final ConnectionDelegate connDelegate = new org.apache.qpid.server.transport.ServerConnectionDelegate(_broker, + _fqdn, _broker.getSubjectCreator(getLocalAddress())); ServerConnection conn = new ServerConnection(_id); - conn.setConnectionDelegate(connDelegate); - return new ProtocolEngine_0_10( conn, _network, _appRegistry); + conn.setConnectionDelegate(connDelegate); + conn.setRemoteAddress(_network.getRemoteAddress()); + conn.setLocalAddress(_network.getLocalAddress()); + return new ProtocolEngine_0_10( conn, _network); } }; @@ -327,7 +341,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new ProtocolEngine_1_0_0(_appRegistry,_id); + return new ProtocolEngine_1_0_0(_network, _broker, _id); } }; @@ -347,7 +361,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public ServerProtocolEngine getProtocolEngine() { - return new ProtocolEngine_1_0_0_SASL(_network, _appRegistry, _id); + return new ProtocolEngine_1_0_0_SASL(_network, _broker, _id); } }; @@ -407,6 +421,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + @Override + public long getLastReadTime() + { + return 0; + } + + @Override + public long getLastWriteTime() + { + return 0; + } + public long getConnectionId() { return _id; @@ -547,7 +573,29 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void closed() { - + try + { + _delegate = new ClosedDelegateProtocolEngine(); + if(_logger.isDebugEnabled()) + { + _logger.debug("Connection from " + getRemoteAddress() + " was closed before any protocol version was established."); + } + } + catch(Exception e) + { + //ignore + } + finally + { + try + { + _network.close(); + } + catch(Exception e) + { + //ignore + } + } } public void writerIdle() @@ -564,5 +612,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { } + + @Override + public long getLastReadTime() + { + return 0; + } + + @Override + public long getLastWriteTime() + { + return 0; + } } } |