diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-01 15:18:17 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-01 15:18:17 +0000 |
commit | 50876b8a80c5bfd4ba125f87e07fe77669520c80 (patch) | |
tree | 1e1a28291299870ac58b658270ec16e42fe5de9e /qpid/java/common | |
parent | e8a5ab04596ff422b3609cf1c454bdc76b473399 (diff) | |
download | qpid-python-50876b8a80c5bfd4ba125f87e07fe77669520c80.tar.gz |
Reduce copying in 0-9 path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656312 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
11 files changed, 211 insertions, 73 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index cb0c78ef37..6860b46546 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; + public interface AMQBody { public byte getFrameType(); @@ -39,4 +40,6 @@ public interface AMQBody public void writePayload(DataOutput buffer) throws IOException; void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException; + + long writePayload(ByteBufferSender sender) throws IOException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index c234a5e829..8f804bf2d6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -23,6 +23,8 @@ package org.apache.qpid.framing; import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.transport.ByteBufferSender; + /** * A data block represents something that has a size in bytes and the ability to write itself to a byte @@ -44,4 +46,6 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock */ public abstract void writePayload(DataOutput buffer) throws IOException; + public abstract long writePayload(ByteBufferSender sender) throws IOException; + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 83397c37d8..5fcdfb901a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -22,6 +22,10 @@ package org.apache.qpid.framing; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { @@ -57,6 +61,25 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock buffer.writeByte(FRAME_END_BYTE); } + private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE }; + + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + byte[] frameHeader = new byte[7]; + BytesDataOutput buffer = new BytesDataOutput(frameHeader); + + buffer.writeByte(_bodyFrame.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, _channel); + EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); + sender.send(ByteBuffer.wrap(frameHeader)); + + long size = 8 + _bodyFrame.writePayload(sender); + + sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY)); + return size; + } + public final int getChannel() { return _channel; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index e40452edea..01deed67ed 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -24,12 +24,15 @@ package org.apache.qpid.framing; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; public abstract class AMQMethodBodyImpl implements AMQMethodBody { @@ -105,6 +108,16 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody writeMethodPayload(buffer); } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + final int size = getSize(); + byte[] bytes = new byte[size]; + BytesDataOutput buffer = new BytesDataOutput(bytes); + writePayload(buffer); + sender.send(ByteBuffer.wrap(bytes)); + return size; + } protected int getSizeOf(AMQShortString string) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index ef0da9b918..6481c6ebdb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -23,10 +23,14 @@ package org.apache.qpid.framing; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; + public class BasicContentHeaderProperties { //persistent & non-persistent constants, values as per JMS DeliveryMode @@ -314,6 +318,26 @@ public class BasicContentHeaderProperties } } + + public long writePropertyListPayload(final ByteBufferSender sender) throws IOException + { + if(useEncodedForm()) + { + sender.send(ByteBuffer.wrap(_encodedForm)); + return _encodedForm.length; + } + else + { + int propertyListSize = getPropertyListSize(); + byte[] data = new byte[propertyListSize]; + BytesDataOutput out = new BytesDataOutput(data); + writePropertyListPayload(out); + sender.send(ByteBuffer.wrap(data)); + return propertyListSize; + } + + } + public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException { _propertyFlags = propertyFlags; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 098e3652ad..819446021e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -23,6 +23,8 @@ package org.apache.qpid.framing; import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.transport.ByteBufferSender; + public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { @@ -58,6 +60,17 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD } } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + long size = 0l; + for (int i = 0; i < _blocks.length; i++) + { + size += _blocks[i].writePayload(sender); + } + return size; + } + public String toString() { if (_blocks == null) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 5c322f3845..0f4ba5209b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; public class ContentBody implements AMQBody { @@ -72,6 +73,20 @@ public class ContentBody implements AMQBody session.contentBodyReceived(channelId, this); } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + if(_payload != null) + { + sender.send(ByteBuffer.wrap(_payload)); + return _payload.length; + } + else + { + return 0l; + } + } + public byte[] getPayload() { return _payload; @@ -133,6 +148,23 @@ public class ContentBody implements AMQBody } } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + if(_buf.hasArray()) + { + sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + _offset, _length)); + } + else + { + ByteBuffer buf = _buf.duplicate(); + + buf.position(_offset); + buf.limit(_offset+_length); + sender.send(buf); + } + return _length; + } public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 377d2e115c..21b8e6c8b6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -24,10 +24,13 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; public class ContentHeaderBody implements AMQBody { @@ -98,6 +101,19 @@ public class ContentHeaderBody implements AMQBody _properties.writePropertyListPayload(buffer); } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + byte[] data = new byte[14]; + BytesDataOutput buffer = new BytesDataOutput(data); + EncodingUtils.writeUnsignedShort(buffer, CLASS_ID); + EncodingUtils.writeUnsignedShort(buffer, 0); + buffer.writeLong(_bodySize); + EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags()); + sender.send(ByteBuffer.wrap(data)); + return 14 + _properties.writePropertyListPayload(sender); + } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index b5f854eb0e..3afc082c89 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -27,6 +27,7 @@ import java.io.IOException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ByteBufferSender; public class HeartbeatBody implements AMQBody { @@ -61,6 +62,12 @@ public class HeartbeatBody implements AMQBody { } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + return 0l; + } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index ed1935ca04..9c8d2a8578 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -23,11 +23,14 @@ package org.apache.qpid.framing; import java.io.DataOutput; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.util.BytesDataOutput; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -88,6 +91,16 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData buffer.write(_protocolMinor); } + @Override + public long writePayload(final ByteBufferSender sender) throws IOException + { + byte[] data = new byte[8]; + BytesDataOutput out = new BytesDataOutput(data); + writePayload(out); + sender.send(ByteBuffer.wrap(data)); + return 8l; + } + public boolean equals(Object o) { if (!(o instanceof ProtocolInitiation)) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java index 2344872580..c231a0a7ca 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -37,10 +37,9 @@ import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.TransportEncryption; -public class NonBlockingNetworkTransport implements IncomingNetworkTransport +public class NonBlockingNetworkTransport { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); @@ -51,21 +50,6 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport private AcceptingThread _acceptor; private SelectorThread _selector; - protected NonBlockingConnection createNetworkConnection(final SocketChannel socketChannel, - final ServerProtocolEngine engine, - final Integer sendBufferSize, - final Integer receiveBufferSize, - final int timeout, - final IdleTimeoutTicker ticker, - final Set<TransportEncryption> encryptionSet, - final SSLContext sslContext, - final boolean wantClientAuth, - final boolean needClientAuth, - final Runnable onTransportEncryptionAction) - { - return new NonBlockingConnection(socketChannel, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction, _selector); - } - public void close() { if(_acceptor != null) @@ -173,59 +157,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport { socketChannel = _serverSocket.accept(); - final ServerProtocolEngine engine = - (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() - .getRemoteSocketAddress()); - - if(engine != null) - { - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - - NonBlockingConnection connection = - createNetworkConnection(socketChannel, - engine, - sendBufferSize, - receiveBufferSize, - _timeout, - ticker, - _encryptionSet, - _sslContext, - _config.wantClientAuth(), - _config.needClientAuth(), - new Runnable() - { - - @Override - public void run() - { - engine.encryptedTransport(); - } - }); - - engine.setNetworkConnection(connection, connection.getSender()); - connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); - - ticker.setConnection(connection); - - connection.start(); - - _selector.addConnection(connection); - - } - else - { - socketChannel.close(); - } + acceptSocketChannel(socketChannel); } catch(RuntimeException e) { @@ -262,6 +194,64 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport } } + public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException + { + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); + + if(engine != null) + { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NonBlockingConnection connection = + new NonBlockingConnection(socketChannel, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + new Runnable() + { + + @Override + public void run() + { + engine.encryptedTransport(); + } + }, + _selector); + + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + connection.start(); + + _selector.addConnection(connection); + + } + else + { + socketChannel.close(); + } + } + private void closeSocketIfNecessary(final Socket socket) { if(socket != null) |