summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-01 15:18:17 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-01 15:18:17 +0000
commit50876b8a80c5bfd4ba125f87e07fe77669520c80 (patch)
tree1e1a28291299870ac58b658270ec16e42fe5de9e /qpid/java/common
parente8a5ab04596ff422b3609cf1c454bdc76b473399 (diff)
downloadqpid-python-50876b8a80c5bfd4ba125f87e07fe77669520c80.tar.gz
Reduce copying in 0-9 path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656312 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java23
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java24
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java130
11 files changed, 211 insertions, 73 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
index cb0c78ef37..6860b46546 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+
public interface AMQBody
{
public byte getFrameType();
@@ -39,4 +40,6 @@ public interface AMQBody
public void writePayload(DataOutput buffer) throws IOException;
void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
+
+ long writePayload(ByteBufferSender sender) throws IOException;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
index c234a5e829..8f804bf2d6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.transport.ByteBufferSender;
+
/**
* A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -44,4 +46,6 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock
*/
public abstract void writePayload(DataOutput buffer) throws IOException;
+ public abstract long writePayload(ByteBufferSender sender) throws IOException;
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 83397c37d8..5fcdfb901a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -57,6 +61,25 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
buffer.writeByte(FRAME_END_BYTE);
}
+ private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE };
+
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] frameHeader = new byte[7];
+ BytesDataOutput buffer = new BytesDataOutput(frameHeader);
+
+ buffer.writeByte(_bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, _channel);
+ EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+ sender.send(ByteBuffer.wrap(frameHeader));
+
+ long size = 8 + _bodyFrame.writePayload(sender);
+
+ sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY));
+ return size;
+ }
+
public final int getChannel()
{
return _channel;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index e40452edea..01deed67ed 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -24,12 +24,15 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -105,6 +108,16 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
writeMethodPayload(buffer);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ final int size = getSize();
+ byte[] bytes = new byte[size];
+ BytesDataOutput buffer = new BytesDataOutput(bytes);
+ writePayload(buffer);
+ sender.send(ByteBuffer.wrap(bytes));
+ return size;
+ }
protected int getSizeOf(AMQShortString string)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index ef0da9b918..6481c6ebdb 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -23,10 +23,14 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
+
public class BasicContentHeaderProperties
{
//persistent & non-persistent constants, values as per JMS DeliveryMode
@@ -314,6 +318,26 @@ public class BasicContentHeaderProperties
}
}
+
+ public long writePropertyListPayload(final ByteBufferSender sender) throws IOException
+ {
+ if(useEncodedForm())
+ {
+ sender.send(ByteBuffer.wrap(_encodedForm));
+ return _encodedForm.length;
+ }
+ else
+ {
+ int propertyListSize = getPropertyListSize();
+ byte[] data = new byte[propertyListSize];
+ BytesDataOutput out = new BytesDataOutput(data);
+ writePropertyListPayload(out);
+ sender.send(ByteBuffer.wrap(data));
+ return propertyListSize;
+ }
+
+ }
+
public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
{
_propertyFlags = propertyFlags;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
index 098e3652ad..819446021e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.qpid.transport.ByteBufferSender;
+
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -58,6 +60,17 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ long size = 0l;
+ for (int i = 0; i < _blocks.length; i++)
+ {
+ size += _blocks[i].writePayload(sender);
+ }
+ return size;
+ }
+
public String toString()
{
if (_blocks == null)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index 5c322f3845..0f4ba5209b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
public class ContentBody implements AMQBody
{
@@ -72,6 +73,20 @@ public class ContentBody implements AMQBody
session.contentBodyReceived(channelId, this);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ if(_payload != null)
+ {
+ sender.send(ByteBuffer.wrap(_payload));
+ return _payload.length;
+ }
+ else
+ {
+ return 0l;
+ }
+ }
+
public byte[] getPayload()
{
return _payload;
@@ -133,6 +148,23 @@ public class ContentBody implements AMQBody
}
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ if(_buf.hasArray())
+ {
+ sender.send(ByteBuffer.wrap(_buf.array(), _buf.arrayOffset() + _offset, _length));
+ }
+ else
+ {
+ ByteBuffer buf = _buf.duplicate();
+
+ buf.position(_offset);
+ buf.limit(_offset+_length);
+ sender.send(buf);
+ }
+ return _length;
+ }
public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 377d2e115c..21b8e6c8b6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -24,10 +24,13 @@ import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class ContentHeaderBody implements AMQBody
{
@@ -98,6 +101,19 @@ public class ContentHeaderBody implements AMQBody
_properties.writePropertyListPayload(buffer);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] data = new byte[14];
+ BytesDataOutput buffer = new BytesDataOutput(data);
+ EncodingUtils.writeUnsignedShort(buffer, CLASS_ID);
+ EncodingUtils.writeUnsignedShort(buffer, 0);
+ buffer.writeLong(_bodySize);
+ EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags());
+ sender.send(ByteBuffer.wrap(data));
+ return 14 + _properties.writePropertyListPayload(sender);
+ }
+
public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
throws AMQException
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index b5f854eb0e..3afc082c89 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ByteBufferSender;
public class HeartbeatBody implements AMQBody
{
@@ -61,6 +62,12 @@ public class HeartbeatBody implements AMQBody
{
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ return 0l;
+ }
+
public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
throws AMQException
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index ed1935ca04..9c8d2a8578 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -23,11 +23,14 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.BytesDataOutput;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -88,6 +91,16 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
buffer.write(_protocolMinor);
}
+ @Override
+ public long writePayload(final ByteBufferSender sender) throws IOException
+ {
+ byte[] data = new byte[8];
+ BytesDataOutput out = new BytesDataOutput(data);
+ writePayload(out);
+ sender.send(ByteBuffer.wrap(data));
+ return 8l;
+ }
+
public boolean equals(Object o)
{
if (!(o instanceof ProtocolInitiation))
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index 2344872580..c231a0a7ca 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -37,10 +37,9 @@ import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.TransportEncryption;
-public class NonBlockingNetworkTransport implements IncomingNetworkTransport
+public class NonBlockingNetworkTransport
{
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
@@ -51,21 +50,6 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
private AcceptingThread _acceptor;
private SelectorThread _selector;
- protected NonBlockingConnection createNetworkConnection(final SocketChannel socketChannel,
- final ServerProtocolEngine engine,
- final Integer sendBufferSize,
- final Integer receiveBufferSize,
- final int timeout,
- final IdleTimeoutTicker ticker,
- final Set<TransportEncryption> encryptionSet,
- final SSLContext sslContext,
- final boolean wantClientAuth,
- final boolean needClientAuth,
- final Runnable onTransportEncryptionAction)
- {
- return new NonBlockingConnection(socketChannel, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction, _selector);
- }
-
public void close()
{
if(_acceptor != null)
@@ -173,59 +157,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
{
socketChannel = _serverSocket.accept();
- final ServerProtocolEngine engine =
- (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
- .getRemoteSocketAddress());
-
- if(engine != null)
- {
- socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
- socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-
- NonBlockingConnection connection =
- createNetworkConnection(socketChannel,
- engine,
- sendBufferSize,
- receiveBufferSize,
- _timeout,
- ticker,
- _encryptionSet,
- _sslContext,
- _config.wantClientAuth(),
- _config.needClientAuth(),
- new Runnable()
- {
-
- @Override
- public void run()
- {
- engine.encryptedTransport();
- }
- });
-
- engine.setNetworkConnection(connection, connection.getSender());
- connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
-
- ticker.setConnection(connection);
-
- connection.start();
-
- _selector.addConnection(connection);
-
- }
- else
- {
- socketChannel.close();
- }
+ acceptSocketChannel(socketChannel);
}
catch(RuntimeException e)
{
@@ -262,6 +194,64 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
}
}
+ public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
+ {
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+ .getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ },
+ _selector);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ connection.start();
+
+ _selector.addConnection(connection);
+
+ }
+ else
+ {
+ socketChannel.close();
+ }
+ }
+
private void closeSocketIfNecessary(final Socket socket)
{
if(socket != null)