summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java96
1 files changed, 76 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 01b12b44ce..7033bf755d 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.ConnectionDelegate;
@@ -33,30 +33,43 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Set;
-public class MultiVersionProtocolEngine implements ProtocolEngine
+public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
+ private final long _id;
+
private Set<AmqpProtocolVersion> _supported;
private String _fqdn;
private IApplicationRegistry _appRegistry;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
-
- private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+
+ private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+
+ public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
+ String fqdn,
+ Set<AmqpProtocolVersion> supported,
+ NetworkConnection network,
+ long id)
+ {
+ this(appRegistry,fqdn,supported,id);
+ setNetworkConnection(network);
+ }
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
String fqdn,
Set<AmqpProtocolVersion> supported,
- NetworkConnection network)
+ long id)
{
+ _id = id;
_appRegistry = appRegistry;
_fqdn = fqdn;
_supported = supported;
- _network = network;
- _sender = _network.getSender();
+
}
+
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
@@ -92,6 +105,7 @@ public class MultiVersionProtocolEngine implements ProtocolEngine
_delegate.readerIdle();
}
+
public void received(ByteBuffer msg)
{
_delegate.received(msg);
@@ -102,6 +116,11 @@ public class MultiVersionProtocolEngine implements ProtocolEngine
_delegate.exception(t);
}
+ public long getConnectionId()
+ {
+ return _delegate.getConnectionId();
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
private static final byte[] AMQP_0_8_HEADER =
@@ -126,7 +145,7 @@ public class MultiVersionProtocolEngine implements ProtocolEngine
(byte) 9
};
-private static final byte[] AMQP_0_9_1_HEADER =
+ private static final byte[] AMQP_0_9_1_HEADER =
new byte[] { (byte) 'A',
(byte) 'M',
(byte) 'Q',
@@ -149,11 +168,23 @@ private static final byte[] AMQP_0_9_1_HEADER =
(byte) 10
};
+ public void setNetworkConnection(NetworkConnection networkConnection)
+ {
+ setNetworkConnection(networkConnection, networkConnection.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+ }
+
+
private static interface DelegateCreator
{
AmqpProtocolVersion getVersion();
byte[] getHeaderIdentifier();
- ProtocolEngine getProtocolEngine();
+ ServerProtocolEngine getProtocolEngine();
}
private DelegateCreator creator_0_8 = new DelegateCreator()
@@ -169,9 +200,9 @@ private static final byte[] AMQP_0_9_1_HEADER =
return AMQP_0_8_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -189,9 +220,9 @@ private static final byte[] AMQP_0_9_1_HEADER =
return AMQP_0_9_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -209,9 +240,9 @@ private static final byte[] AMQP_0_9_1_HEADER =
return AMQP_0_9_1_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -230,12 +261,12 @@ private static final byte[] AMQP_0_9_1_HEADER =
return AMQP_0_10_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
final ConnectionDelegate connDelegate =
new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
- ServerConnection conn = new ServerConnection();
+ ServerConnection conn = new ServerConnection(_id);
conn.setConnectionDelegate(connDelegate);
return new ProtocolEngine_0_10( conn, _network, _appRegistry);
@@ -246,7 +277,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
- private class ClosedDelegateProtocolEngine implements ProtocolEngine
+ private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
public SocketAddress getRemoteAddress()
{
@@ -292,9 +323,19 @@ private static final byte[] AMQP_0_9_1_HEADER =
{
}
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+
+ }
+
+ public long getConnectionId()
+ {
+ return _id;
+ }
}
- private class SelfDelegateProtocolEngine implements ProtocolEngine
+ private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
@@ -340,7 +381,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
_header.get(headerBytes);
- ProtocolEngine newDelegate = null;
+ ServerProtocolEngine newDelegate = null;
byte[] newestSupported = null;
for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -366,14 +407,19 @@ private static final byte[] AMQP_0_9_1_HEADER =
if(newDelegate == null)
{
_sender.send(ByteBuffer.wrap(newestSupported));
+ _sender.flush();
_delegate = new ClosedDelegateProtocolEngine();
+
+ _network.close();
+
}
else
{
_delegate = newDelegate;
_header.flip();
+ _delegate.setNetworkConnection(_network, _sender);
_delegate.received(_header);
if(msg.hasRemaining())
{
@@ -385,6 +431,11 @@ private static final byte[] AMQP_0_9_1_HEADER =
}
+ public long getConnectionId()
+ {
+ return _id;
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
@@ -404,5 +455,10 @@ private static final byte[] AMQP_0_9_1_HEADER =
{
}
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+
+ }
}
}