diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 229 |
1 files changed, 205 insertions, 24 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 284954edba..8911d4ee3e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.protocol; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -409,10 +410,10 @@ public class AMQProtocolHandler implements ProtocolEngine final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); // Decode buffer - - for (AMQDataBlock message : dataBlocks) + int size = dataBlocks.size(); + for (int i = 0; i < size; i++) { - + AMQDataBlock message = dataBlocks.get(i); if (PROTOCOL_DEBUG) { _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); @@ -420,10 +421,10 @@ public class AMQProtocolHandler implements ProtocolEngine if(message instanceof AMQFrame) { - final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; - if (debug && ((msgNumber % 1000) == 0)) + if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled()) { _logger.debug("Received " + _messageReceivedCount + " protocol messages"); } @@ -514,12 +515,20 @@ public class AMQProtocolHandler implements ProtocolEngine return getStateManager().createWaiter(states); } - public synchronized void writeFrame(AMQDataBlock frame) + public void writeFrame(AMQDataBlock frame) + { + writeFrame(frame, true); + } + + public synchronized void writeFrame(AMQDataBlock frame, boolean flush) { final ByteBuffer buf = asByteBuffer(frame); _writtenBytes += buf.remaining(); _sender.send(buf); - _sender.flush(); + if(flush) + { + _sender.flush(); + } if (PROTOCOL_DEBUG) { @@ -539,35 +548,51 @@ public class AMQProtocolHandler implements ProtocolEngine } + private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; + private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; + private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); + private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); + private ByteBuffer asByteBuffer(AMQDataBlock block) { - final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); + final int size = (int) block.getSize(); - try - { - block.writePayload(new DataOutputStream(new OutputStream() - { + final byte[] data; - @Override - public void write(int b) throws IOException - { - buf.put((byte) b); - } + if(size > REUSABLE_BYTE_BUFFER_CAPACITY) + { + data= new byte[size]; + } + else + { - @Override - public void write(byte[] b, int off, int len) throws IOException - { - buf.put(b, off, len); - } - })); + data = _reusableBytes; + } + _reusableDataOutput.setBuffer(data); + + try + { + block.writePayload(_reusableDataOutput); } catch (IOException e) { throw new RuntimeException(e); } - buf.flip(); + final ByteBuffer buf; + + if(size < REUSABLE_BYTE_BUFFER_CAPACITY) + { + buf = _reusableByteBuffer; + buf.position(0); + } + else + { + buf = ByteBuffer.wrap(data); + } + buf.limit(_reusableDataOutput.length()); + return buf; } @@ -840,4 +865,160 @@ public class AMQProtocolHandler implements ProtocolEngine return _suggestedProtocolVersion; } + private static class BytesDataOutput implements DataOutput + { + int _pos = 0; + byte[] _buf; + + public BytesDataOutput(byte[] buf) + { + _buf = buf; + } + + public void setBuffer(byte[] buf) + { + _buf = buf; + _pos = 0; + } + + public void reset() + { + _pos = 0; + } + + public int length() + { + return _pos; + } + + public void write(int b) + { + _buf[_pos++] = (byte) b; + } + + public void write(byte[] b) + { + System.arraycopy(b, 0, _buf, _pos, b.length); + _pos+=b.length; + } + + + public void write(byte[] b, int off, int len) + { + System.arraycopy(b, off, _buf, _pos, len); + _pos+=len; + + } + + public void writeBoolean(boolean v) + { + _buf[_pos++] = v ? (byte) 1 : (byte) 0; + } + + public void writeByte(int v) + { + _buf[_pos++] = (byte) v; + } + + public void writeShort(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeChar(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeInt(int v) + { + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeLong(long v) + { + _buf[_pos++] = (byte) (v >>> 56); + _buf[_pos++] = (byte) (v >>> 48); + _buf[_pos++] = (byte) (v >>> 40); + _buf[_pos++] = (byte) (v >>> 32); + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte)v; + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + _buf[_pos++] = ((byte)s.charAt(i)); + } + } + + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + int v = s.charAt(i); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + } + + public void writeUTF(String s) + { + int strlen = s.length(); + + int pos = _pos; + _pos+=2; + + + for (int i = 0; i < strlen; i++) + { + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + { + c = s.charAt(i); + _buf[_pos++] = (byte) c; + + } + else if (c > 0x07FF) + { + _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + else + { + _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); + } + } + + int len = _pos - (pos + 2); + + _buf[pos++] = (byte) (len >>> 8); + _buf[pos] = (byte) len; + } + + } + + } |