diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java | 63 |
1 files changed, 44 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 4e5088808a..241ab3fcb9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -93,6 +93,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; + private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; private AMQShortString _contextKey; @@ -262,6 +263,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr closeProtocolSession(); } } + receiveComplete(); } catch (Exception e) { @@ -270,6 +272,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } + private void receiveComplete() + { + for (AMQChannel channel : _channelMap.values()) + { + channel.receivedComplete(); + } + + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -387,35 +398,51 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } } + + private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; + private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); + private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); + private ByteBuffer asByteBuffer(AMQDataBlock block) { - final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + final int size = (int) block.getSize(); - try - { - block.writePayload(new DataOutputStream(new OutputStream() - { + final byte[] data; - @Override - public void write(int b) throws IOException - { - buf.put((byte) b); - } + if(size > REUSABLE_BYTE_BUFFER_CAPACITY) + { + data= new byte[size]; + } + else + { - @Override - public void write(byte[] b, int off, int len) throws IOException - { - buf.put(b, off, len); - } - })); + data = _reusableBytes; + } + _reusableDataOutput.setBuffer(data); + + try + { + block.writePayload(_reusableDataOutput); } catch (IOException e) { throw new RuntimeException(e); } - buf.flip(); + final ByteBuffer buf; + + if(size <= REUSABLE_BYTE_BUFFER_CAPACITY) + { + buf = _reusableByteBuffer; + buf.position(0); + } + else + { + buf = ByteBuffer.wrap(data); + } + buf.limit(_reusableDataOutput.length()); + return buf; } @@ -1418,8 +1445,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _deferFlush = deferFlush; } - - public String getUserName() { return getAuthorizedPrincipal().getName(); |