summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java63
1 files changed, 44 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 4e5088808a..241ab3fcb9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -93,6 +93,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
+ private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
private AMQShortString _contextKey;
@@ -262,6 +263,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
closeProtocolSession();
}
}
+ receiveComplete();
}
catch (Exception e)
{
@@ -270,6 +272,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
+ private void receiveComplete()
+ {
+ for (AMQChannel channel : _channelMap.values())
+ {
+ channel.receivedComplete();
+ }
+
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -387,35 +398,51 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
}
+
+ private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+ private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+ private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
private ByteBuffer asByteBuffer(AMQDataBlock block)
{
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+ final int size = (int) block.getSize();
- try
- {
- block.writePayload(new DataOutputStream(new OutputStream()
- {
+ final byte[] data;
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
+ if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ data= new byte[size];
+ }
+ else
+ {
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
+ data = _reusableBytes;
+ }
+ _reusableDataOutput.setBuffer(data);
+
+ try
+ {
+ block.writePayload(_reusableDataOutput);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- buf.flip();
+ final ByteBuffer buf;
+
+ if(size <= REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ buf = _reusableByteBuffer;
+ buf.position(0);
+ }
+ else
+ {
+ buf = ByteBuffer.wrap(data);
+ }
+ buf.limit(_reusableDataOutput.length());
+
return buf;
}
@@ -1418,8 +1445,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_deferFlush = deferFlush;
}
-
-
public String getUserName()
{
return getAuthorizedPrincipal().getName();