diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java | 81 |
1 files changed, 29 insertions, 52 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java index 18b1b71d69..6c07e4912f 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java @@ -32,6 +32,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.protocol.v1_0.Connection_1_0; import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import java.io.PrintWriter; @@ -46,9 +47,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa { static final AtomicLong _connectionIdSource = new AtomicLong(0L); - public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - - private NetworkConnection _networkDriver; + //private NetworkConnection _networkDriver; private long _readBytes; private long _writtenBytes; private final UUID _id; @@ -57,7 +56,6 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa private ConnectionEndpoint _conn; private final long _connectionId = _connectionIdSource.getAndIncrement(); - private static final int BUF_SIZE = 8; private static final ByteBuffer HEADER = ByteBuffer.wrap(new byte[] { @@ -73,11 +71,12 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa private FrameWriter _frameWriter; private FrameHandler _frameHandler; - private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024); private Object _sendLock = new Object(); private byte _major; private byte _minor; private byte _revision; + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; static enum State { @@ -94,56 +93,21 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa private State _state = State.A; - public ProtocolEngine_1_0_0(NetworkConnection networkDriver, - final IApplicationRegistry appRegistry) + public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry) { - - _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - setNetworkDriver(networkDriver); - - - // FIXME Two log messages to maintain compatinbility with earlier protocol versions -// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); -// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); } - public void setNetworkDriver(NetworkConnection driver) - { - _networkDriver = driver; - Container container = new Container(); - - Principal principal = new Principal() - { - - public String getName() - { - // TODO - return "rob"; - } - }; - _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager()); - _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); - _conn.setFrameOutputHandler(this); - _conn.setRemoteAddress(driver.getRemoteAddress()); - - - _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); - _frameHandler = new FrameHandler(_conn); - - _networkDriver.getSender().send(HEADER.duplicate()); - - } public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getReadBytes() @@ -166,6 +130,25 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa //Todo } + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) + { + _network = network; + _sender = sender; + + Container container = new Container(); + + _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager()); + _conn.setConnectionEventListener(new Connection_1_0(_appRegistry)); + _conn.setFrameOutputHandler(this); + _conn.setRemoteAddress(_network.getRemoteAddress()); + + _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); + _frameHandler = new FrameHandler(_conn); + + _sender.send(HEADER.duplicate()); + _sender.flush(); + } + public String getAddress() { return getRemoteAddress().toString(); @@ -338,26 +321,19 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); } - if(_buf.remaining() < _conn.getMaxFrameSize()) - { - _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024)); - } + _frameWriter.setValue(amqFrame); ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); - // int pos = _buf.position(); int size = _frameWriter.writeToBuffer(dup); if(size > _conn.getMaxFrameSize()) { -// _buf.position(pos); throw new OversizeFrameException(amqFrame,size); } -// _buf.position(_buf.position()+dup.position()); - dup.flip(); _writtenBytes += dup.limit(); @@ -371,7 +347,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa } - _networkDriver.getSender().send(dup); + _sender.send(dup); + _sender.flush(); } } |