summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java81
1 files changed, 29 insertions, 52 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index 18b1b71d69..6c07e4912f 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -32,6 +32,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import java.io.PrintWriter;
@@ -46,9 +47,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
{
static final AtomicLong _connectionIdSource = new AtomicLong(0L);
- public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
- private NetworkConnection _networkDriver;
+ //private NetworkConnection _networkDriver;
private long _readBytes;
private long _writtenBytes;
private final UUID _id;
@@ -57,7 +56,6 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
private ConnectionEndpoint _conn;
private final long _connectionId = _connectionIdSource.getAndIncrement();
- private static final int BUF_SIZE = 8;
private static final ByteBuffer HEADER =
ByteBuffer.wrap(new byte[]
{
@@ -73,11 +71,12 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
private FrameWriter _frameWriter;
private FrameHandler _frameHandler;
- private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
private Object _sendLock = new Object();
private byte _major;
private byte _minor;
private byte _revision;
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
static enum State {
@@ -94,56 +93,21 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
private State _state = State.A;
- public ProtocolEngine_1_0_0(NetworkConnection networkDriver,
- final IApplicationRegistry appRegistry)
+ public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry)
{
-
-
_id = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
- setNetworkDriver(networkDriver);
-
-
- // FIXME Two log messages to maintain compatinbility with earlier protocol versions
-// _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
-// _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
}
- public void setNetworkDriver(NetworkConnection driver)
- {
- _networkDriver = driver;
- Container container = new Container();
-
- Principal principal = new Principal()
- {
-
- public String getName()
- {
- // TODO
- return "rob";
- }
- };
- _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager());
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
- _conn.setFrameOutputHandler(this);
- _conn.setRemoteAddress(driver.getRemoteAddress());
-
-
- _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
- _frameHandler = new FrameHandler(_conn);
-
- _networkDriver.getSender().send(HEADER.duplicate());
-
- }
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public long getReadBytes()
@@ -166,6 +130,25 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
//Todo
}
+ public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
+ {
+ _network = network;
+ _sender = sender;
+
+ Container container = new Container();
+
+ _conn = new ConnectionEndpoint(container,_appRegistry.getAuthenticationManager());
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setFrameOutputHandler(this);
+ _conn.setRemoteAddress(_network.getRemoteAddress());
+
+ _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
+ _frameHandler = new FrameHandler(_conn);
+
+ _sender.send(HEADER.duplicate());
+ _sender.flush();
+ }
+
public String getAddress()
{
return getRemoteAddress().toString();
@@ -338,26 +321,19 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
}
- if(_buf.remaining() < _conn.getMaxFrameSize())
- {
- _buf = ByteBuffer.allocate(Math.min(_conn.getMaxFrameSize(),1024*1024));
- }
+
_frameWriter.setValue(amqFrame);
ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
- // int pos = _buf.position();
int size = _frameWriter.writeToBuffer(dup);
if(size > _conn.getMaxFrameSize())
{
-// _buf.position(pos);
throw new OversizeFrameException(amqFrame,size);
}
-// _buf.position(_buf.position()+dup.position());
-
dup.flip();
_writtenBytes += dup.limit();
@@ -371,7 +347,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
}
- _networkDriver.getSender().send(dup);
+ _sender.send(dup);
+ _sender.flush();
}
}