diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java | 96 |
1 files changed, 76 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 01b12b44ce..7033bf755d 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.ConnectionDelegate; @@ -33,30 +33,43 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Set; -public class MultiVersionProtocolEngine implements ProtocolEngine +public class MultiVersionProtocolEngine implements ServerProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); + private final long _id; + private Set<AmqpProtocolVersion> _supported; private String _fqdn; private IApplicationRegistry _appRegistry; private NetworkConnection _network; private Sender<ByteBuffer> _sender; - - private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine(); + + private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); + + public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, + String fqdn, + Set<AmqpProtocolVersion> supported, + NetworkConnection network, + long id) + { + this(appRegistry,fqdn,supported,id); + setNetworkConnection(network); + } public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, String fqdn, Set<AmqpProtocolVersion> supported, - NetworkConnection network) + long id) { + _id = id; _appRegistry = appRegistry; _fqdn = fqdn; _supported = supported; - _network = network; - _sender = _network.getSender(); + } + public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); @@ -92,6 +105,7 @@ public class MultiVersionProtocolEngine implements ProtocolEngine _delegate.readerIdle(); } + public void received(ByteBuffer msg) { _delegate.received(msg); @@ -102,6 +116,11 @@ public class MultiVersionProtocolEngine implements ProtocolEngine _delegate.exception(t); } + public long getConnectionId() + { + return _delegate.getConnectionId(); + } + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; private static final byte[] AMQP_0_8_HEADER = @@ -126,7 +145,7 @@ public class MultiVersionProtocolEngine implements ProtocolEngine (byte) 9 }; -private static final byte[] AMQP_0_9_1_HEADER = + private static final byte[] AMQP_0_9_1_HEADER = new byte[] { (byte) 'A', (byte) 'M', (byte) 'Q', @@ -149,11 +168,23 @@ private static final byte[] AMQP_0_9_1_HEADER = (byte) 10 }; + public void setNetworkConnection(NetworkConnection networkConnection) + { + setNetworkConnection(networkConnection, networkConnection.getSender()); + } + + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + _network = network; + _sender = sender; + } + + private static interface DelegateCreator { AmqpProtocolVersion getVersion(); byte[] getHeaderIdentifier(); - ProtocolEngine getProtocolEngine(); + ServerProtocolEngine getProtocolEngine(); } private DelegateCreator creator_0_8 = new DelegateCreator() @@ -169,9 +200,9 @@ private static final byte[] AMQP_0_9_1_HEADER = return AMQP_0_8_HEADER; } - public ProtocolEngine getProtocolEngine() + public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); } }; @@ -189,9 +220,9 @@ private static final byte[] AMQP_0_9_1_HEADER = return AMQP_0_9_HEADER; } - public ProtocolEngine getProtocolEngine() + public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); } }; @@ -209,9 +240,9 @@ private static final byte[] AMQP_0_9_1_HEADER = return AMQP_0_9_1_HEADER; } - public ProtocolEngine getProtocolEngine() + public ServerProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id); } }; @@ -230,12 +261,12 @@ private static final byte[] AMQP_0_9_1_HEADER = return AMQP_0_10_HEADER; } - public ProtocolEngine getProtocolEngine() + public ServerProtocolEngine getProtocolEngine() { final ConnectionDelegate connDelegate = new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn); - ServerConnection conn = new ServerConnection(); + ServerConnection conn = new ServerConnection(_id); conn.setConnectionDelegate(connDelegate); return new ProtocolEngine_0_10( conn, _network, _appRegistry); @@ -246,7 +277,7 @@ private static final byte[] AMQP_0_9_1_HEADER = new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 }; - private class ClosedDelegateProtocolEngine implements ProtocolEngine + private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { public SocketAddress getRemoteAddress() { @@ -292,9 +323,19 @@ private static final byte[] AMQP_0_9_1_HEADER = { } + + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + + } + + public long getConnectionId() + { + return _id; + } } - private class SelfDelegateProtocolEngine implements ProtocolEngine + private class SelfDelegateProtocolEngine implements ServerProtocolEngine { private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); @@ -340,7 +381,7 @@ private static final byte[] AMQP_0_9_1_HEADER = _header.get(headerBytes); - ProtocolEngine newDelegate = null; + ServerProtocolEngine newDelegate = null; byte[] newestSupported = null; for(int i = 0; newDelegate == null && i < _creators.length; i++) @@ -366,14 +407,19 @@ private static final byte[] AMQP_0_9_1_HEADER = if(newDelegate == null) { _sender.send(ByteBuffer.wrap(newestSupported)); + _sender.flush(); _delegate = new ClosedDelegateProtocolEngine(); + + _network.close(); + } else { _delegate = newDelegate; _header.flip(); + _delegate.setNetworkConnection(_network, _sender); _delegate.received(_header); if(msg.hasRemaining()) { @@ -385,6 +431,11 @@ private static final byte[] AMQP_0_9_1_HEADER = } + public long getConnectionId() + { + return _id; + } + public void exception(Throwable t) { _logger.error("Error establishing session", t); @@ -404,5 +455,10 @@ private static final byte[] AMQP_0_9_1_HEADER = { } + + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + { + + } } } |