diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-09-09 17:47:22 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-09-09 17:47:22 +0000 |
commit | 5270591c7831e559925e720c6bfc0c78c514b95a (patch) | |
tree | 8ce7aabb2f5a4a87c4c53b3e8f810c62392eba11 /java/common | |
parent | 282e16aab532d842dffad3935bfd1a952bc584be (diff) | |
download | qpid-python-5270591c7831e559925e720c6bfc0c78c514b95a.tar.gz |
QPID-2627 : Remove dependency on MINA
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1167311 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
48 files changed, 837 insertions, 1257 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 7732ff2fd5..69bf73bb49 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,10 +20,9 @@ */ package org.apache.qpid.codec; -import java.util.ArrayList; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.SimpleByteBufferAllocator; +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; @@ -61,12 +60,11 @@ public class AMQDecoder /** Flag to indicate whether this decoder needs to handle protocol initiation. */ private boolean _expectProtocolInitiation; - private boolean firstDecode = true; private AMQMethodBodyFactory _bodyFactory; - private ByteBuffer _remainingBuf; - + private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>(); + /** * Creates a new AMQP decoder. * @@ -92,62 +90,168 @@ public class AMQDecoder _expectProtocolInitiation = expectProtocolInitiation; } + private class RemainingByteArrayInputStream extends InputStream + { + private int _currentListPos; + private int _markPos; - private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator(); - public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException - { + @Override + public int read() throws IOException + { + ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos); + if(currentStream.available() > 0) + { + return currentStream.read(); + } + else if((_currentListPos == _remainingBufs.size()) + || (++_currentListPos == _remainingBufs.size())) + { + return -1; + } + else + { - // get prior remaining data from accumulator - ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); - ByteBuffer msg; - // if we have a session buffer, append data to that otherwise - // use the buffer read from the network directly - if( _remainingBuf != null ) + ByteArrayInputStream stream = _remainingBufs.get(_currentListPos); + stream.mark(0); + return stream.read(); + } + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { - _remainingBuf.put(buf); - _remainingBuf.flip(); - msg = _remainingBuf; + + if(_currentListPos == _remainingBufs.size()) + { + return -1; + } + else + { + ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos); + final int available = currentStream.available(); + int read = currentStream.read(b, off, len > available ? available : len); + if(read < len) + { + if(_currentListPos++ != _remainingBufs.size()) + { + _remainingBufs.get(_currentListPos).mark(0); + } + int correctRead = read == -1 ? 0 : read; + int subRead = read(b, off+correctRead, len-correctRead); + if(subRead == -1) + { + return read; + } + else + { + return correctRead+subRead; + } + } + else + { + return len; + } + } } - else + + @Override + public int available() throws IOException { - msg = ByteBuffer.wrap(buf); + int total = 0; + for(int i = _currentListPos; i < _remainingBufs.size(); i++) + { + total += _remainingBufs.get(i).available(); + } + return total; } - - if (_expectProtocolInitiation - || (firstDecode - && (msg.remaining() > 0) - && (msg.get(msg.position()) == (byte)'A'))) + + @Override + public void mark(final int readlimit) { - if (_piDecoder.decodable(msg.buf())) + _markPos = _currentListPos; + final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos); + if(stream != null) { - dataBlocks.add(new ProtocolInitiation(msg.buf())); + stream.mark(readlimit); } } - else + + @Override + public void reset() throws IOException { - boolean enoughData = true; - while (enoughData) + _currentListPos = _markPos; + final int size = _remainingBufs.size(); + if(_currentListPos < size) + { + _remainingBufs.get(_currentListPos).reset(); + } + for(int i = _currentListPos+1; i<size; i++) { - int pos = msg.position(); + _remainingBufs.get(i).reset(); + } + } + } + + + public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + { + + // get prior remaining data from accumulator + ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); + DataInputStream msg; + + + ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining()); + if(!_remainingBufs.isEmpty()) + { + _remainingBufs.add(bais); + msg = new DataInputStream(new RemainingByteArrayInputStream()); + } + else + { + msg = new DataInputStream(bais); + } + boolean enoughData = true; + while (enoughData) + { + if(!_expectProtocolInitiation) + { enoughData = _dataBlockDecoder.decodable(msg); - msg.position(pos); if (enoughData) { dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); } - else + } + else + { + enoughData = _piDecoder.decodable(msg); + if (enoughData) { - _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); - _remainingBuf.setAutoExpand(true); - _remainingBuf.put(msg); + dataBlocks.add(new ProtocolInitiation(msg)); + } + + } + + if(!enoughData) + { + if(!_remainingBufs.isEmpty()) + { + _remainingBufs.remove(_remainingBufs.size()-1); + ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator(); + while(iterator.hasNext() && iterator.next().available() == 0) + { + iterator.remove(); + } + } + if(bais.available()!=0) + { + byte[] remaining = new byte[bais.available()]; + bais.read(remaining); + _remainingBufs.add(new ByteArrayInputStream(remaining)); } } - } - if(firstDecode && dataBlocks.size() > 0) - { - firstDecode = false; } return dataBlocks; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index fe04155bb8..ebdad12178 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -34,7 +36,7 @@ public interface AMQBody */ public abstract int getSize(); - public void writePayload(ByteBuffer buffer); + public void writePayload(DataOutputStream buffer) throws IOException; - void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException; + void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index a2fc3a03ef..00c1f5aae5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + /** * A data block represents something that has a size in bytes and the ability to write itself to a byte @@ -39,25 +42,6 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock * Writes the datablock to the specified buffer. * @param buffer */ - public abstract void writePayload(ByteBuffer buffer); - - public ByteBuffer toByteBuffer() - { - final ByteBuffer buffer = ByteBuffer.allocate((int)getSize()); - - writePayload(buffer); - buffer.flip(); - return buffer; - } - - public java.nio.ByteBuffer toNioByteBuffer() - { - final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize()); - - ByteBuffer buf = ByteBuffer.wrap(buffer); - writePayload(buf); - buffer.flip(); - return buffer; - } + public abstract void writePayload(DataOutputStream buffer) throws IOException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 0187fa96a9..2165cadd14 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.IOException; + public class AMQDataBlockDecoder { @@ -42,27 +43,32 @@ public class AMQDataBlockDecoder public AMQDataBlockDecoder() { } - public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException + public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException { - final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); + final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1); // type, channel, body length and end byte if (remainingAfterAttributes < 0) { return false; } - in.position(in.position() + 1 + 2); + in.mark(8); + in.skip(1 + 2); + + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() - final long bodySize = in.getInt() & 0xffffffffL; + final long bodySize = in.readInt() & 0xffffffffL; + + in.reset(); return (remainingAfterAttributes >= bodySize); } - public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) - throws AMQFrameDecodingException, AMQProtocolVersionException + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { - final byte type = in.get(); + final byte type = in.readByte(); BodyFactory bodyFactory; if (type == AMQMethodBody.TYPE) @@ -79,8 +85,8 @@ public class AMQDataBlockDecoder throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); } - final int channel = in.getUnsignedShort(); - final long bodySize = in.getUnsignedInt(); + final int channel = in.readUnsignedShort(); + final long bodySize = EncodingUtils.readUnsignedInteger(in); // bodySize can be zero if ((channel < 0) || (bodySize < 0)) @@ -91,7 +97,7 @@ public class AMQDataBlockDecoder AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); - byte marker = in.get(); + byte marker = in.readByte(); if ((marker & 0xFF) != 0xCE) { throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize @@ -101,13 +107,4 @@ public class AMQDataBlockDecoder return frame; } - public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException - { - return decodable(msg.buf()); - } - - public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException - { - return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); - } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java deleted file mode 100644 index d3b8ecf8bd..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Set; - -public final class AMQDataBlockEncoder -{ - private static final Logger _logger = LoggerFactory.getLogger(AMQDataBlockEncoder.class); - - private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class); - - public AMQDataBlockEncoder() - { } - - - public Set getMessageTypes() - { - return _messageTypes; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 02a46f3748..6acf60a5b3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { @@ -36,7 +38,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock _bodyFrame = bodyFrame; } - public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException + public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException { this._channel = channel; this._bodyFrame = bodyFactory.createBody(in,bodySize); @@ -53,13 +55,13 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { - buffer.put(_bodyFrame.getFrameType()); + buffer.writeByte(_bodyFrame.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, _channel); EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); _bodyFrame.writePayload(buffer); - buffer.put(FRAME_END_BYTE); + buffer.writeByte(FRAME_END_BYTE); } public final int getChannel() @@ -77,48 +79,48 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } - public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body) + public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException { - buffer.put(body.getFrameType()); + buffer.writeByte(body.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body.getSize()); body.writePayload(buffer); - buffer.put(FRAME_END_BYTE); + buffer.writeByte(FRAME_END_BYTE); } - public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2) + public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException { - buffer.put(body1.getFrameType()); + buffer.writeByte(body1.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); body1.writePayload(buffer); - buffer.put(FRAME_END_BYTE); - buffer.put(body2.getFrameType()); + buffer.writeByte(FRAME_END_BYTE); + buffer.writeByte(body2.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); body2.writePayload(buffer); - buffer.put(FRAME_END_BYTE); + buffer.writeByte(FRAME_END_BYTE); } - public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) + public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException { - buffer.put(body1.getFrameType()); + buffer.writeByte(body1.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); body1.writePayload(buffer); - buffer.put(FRAME_END_BYTE); - buffer.put(body2.getFrameType()); + buffer.writeByte(FRAME_END_BYTE); + buffer.writeByte(body2.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); body2.writePayload(buffer); - buffer.put(FRAME_END_BYTE); - buffer.put(body3.getFrameType()); + buffer.writeByte(FRAME_END_BYTE); + buffer.writeByte(body3.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, channel); EncodingUtils.writeUnsignedInteger(buffer, body3.getSize()); body3.writePayload(buffer); - buffer.put(FRAME_END_BYTE); + buffer.writeByte(FRAME_END_BYTE); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 4763b22290..a076d0e5a1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -20,12 +20,14 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import java.io.DataOutputStream; +import java.io.IOException; + public interface AMQMethodBody extends AMQBody { public static final byte TYPE = 1; @@ -43,12 +45,12 @@ public interface AMQMethodBody extends AMQBody /** @return unsigned short */ public int getMethod(); - public void writeMethodPayload(ByteBuffer buffer); + public void writeMethodPayload(DataOutputStream buffer) throws IOException; public int getSize(); - public void writePayload(ByteBuffer buffer); + public void writePayload(DataOutputStream buffer) throws IOException; //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 1a7022c11b..7fceb082ee 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.IOException; + public class AMQMethodBodyFactory implements BodyFactory { private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); @@ -38,7 +39,7 @@ public class AMQMethodBodyFactory implements BodyFactory _protocolSession = protocolSession; } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException { return _protocolSession.getMethodRegistry().convertToBody(in, bodySize); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index cd3d721065..c73c1df701 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -21,13 +21,16 @@ package org.apache.qpid.framing; * */ -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + public abstract class AMQMethodBodyImpl implements AMQMethodBody { public static final byte TYPE = 1; @@ -98,7 +101,7 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return 2 + 2 + getBodySize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { EncodingUtils.writeUnsignedShort(buffer, getClazz()); EncodingUtils.writeUnsignedShort(buffer, getMethod()); @@ -106,12 +109,12 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody } - protected byte readByte(ByteBuffer buffer) + protected byte readByte(DataInputStream buffer) throws IOException { - return buffer.get(); + return buffer.readByte(); } - protected AMQShortString readAMQShortString(ByteBuffer buffer) + protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException { return EncodingUtils.readAMQShortString(buffer); } @@ -121,27 +124,27 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return EncodingUtils.encodedShortStringLength(string); } - protected void writeByte(ByteBuffer buffer, byte b) + protected void writeByte(DataOutputStream buffer, byte b) throws IOException { - buffer.put(b); + buffer.writeByte(b); } - protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) + protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException { EncodingUtils.writeShortStringBytes(buffer, string); } - protected int readInt(ByteBuffer buffer) + protected int readInt(DataInputStream buffer) throws IOException { - return buffer.getInt(); + return buffer.readInt(); } - protected void writeInt(ByteBuffer buffer, int i) + protected void writeInt(DataOutputStream buffer, int i) throws IOException { - buffer.putInt(i); + buffer.writeInt(i); } - protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException { return EncodingUtils.readFieldTable(buffer); } @@ -151,19 +154,19 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. } - protected void writeFieldTable(ByteBuffer buffer, FieldTable table) + protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException { EncodingUtils.writeFieldTableBytes(buffer, table); } - protected long readLong(ByteBuffer buffer) + protected long readLong(DataInputStream buffer) throws IOException { - return buffer.getLong(); + return buffer.readLong(); } - protected void writeLong(ByteBuffer buffer, long l) + protected void writeLong(DataOutputStream buffer, long l) throws IOException { - buffer.putLong(l); + buffer.writeLong(l); } protected int getSizeOf(byte[] response) @@ -171,87 +174,86 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return (response == null) ? 4 : response.length + 4; } - protected void writeBytes(ByteBuffer buffer, byte[] data) + protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException { EncodingUtils.writeBytes(buffer,data); } - protected byte[] readBytes(ByteBuffer buffer) + protected byte[] readBytes(DataInputStream buffer) throws IOException { return EncodingUtils.readBytes(buffer); } - protected short readShort(ByteBuffer buffer) + protected short readShort(DataInputStream buffer) throws IOException { return EncodingUtils.readShort(buffer); } - protected void writeShort(ByteBuffer buffer, short s) + protected void writeShort(DataOutputStream buffer, short s) throws IOException { EncodingUtils.writeShort(buffer, s); } - protected Content readContent(ByteBuffer buffer) + protected Content readContent(DataInputStream buffer) { - return null; //To change body of created methods use File | Settings | File Templates. + return null; } protected int getSizeOf(Content body) { - return 0; //To change body of created methods use File | Settings | File Templates. + return 0; } - protected void writeContent(ByteBuffer buffer, Content body) + protected void writeContent(DataOutputStream buffer, Content body) { - //To change body of created methods use File | Settings | File Templates. } - protected byte readBitfield(ByteBuffer buffer) + protected byte readBitfield(DataInputStream buffer) throws IOException { - return readByte(buffer); //To change body of created methods use File | Settings | File Templates. + return readByte(buffer); } - protected int readUnsignedShort(ByteBuffer buffer) + protected int readUnsignedShort(DataInputStream buffer) throws IOException { - return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates. + return buffer.readUnsignedShort(); } - protected void writeBitfield(ByteBuffer buffer, byte bitfield0) + protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException { - buffer.put(bitfield0); + buffer.writeByte(bitfield0); } - protected void writeUnsignedShort(ByteBuffer buffer, int s) + protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException { EncodingUtils.writeUnsignedShort(buffer, s); } - protected long readUnsignedInteger(ByteBuffer buffer) + protected long readUnsignedInteger(DataInputStream buffer) throws IOException { - return buffer.getUnsignedInt(); + return EncodingUtils.readUnsignedInteger(buffer); } - protected void writeUnsignedInteger(ByteBuffer buffer, long i) + protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException { EncodingUtils.writeUnsignedInteger(buffer, i); } - protected short readUnsignedByte(ByteBuffer buffer) + protected short readUnsignedByte(DataInputStream buffer) throws IOException { - return buffer.getUnsigned(); + return (short) buffer.readUnsignedByte(); } - protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) + protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException { EncodingUtils.writeUnsignedByte(buffer, unsignedByte); } - protected long readTimestamp(ByteBuffer buffer) + protected long readTimestamp(DataInputStream buffer) throws IOException { return EncodingUtils.readTimestamp(buffer); } - protected void writeTimestamp(ByteBuffer buffer, long t) + protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException { EncodingUtils.writeTimestamp(buffer, t); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index 0c61d9db3c..df4d8bdcb6 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -21,10 +21,11 @@ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; public abstract interface AMQMethodBodyInstanceFactory { - public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException; + public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java deleted file mode 100644 index bfcc38ad60..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - - -public interface AMQMethodFactory -{ - - // Connection Methods - - ConnectionCloseBody createConnectionClose(); - - // Access Methods - - AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write); - - - // Tx Methods - - TxSelectBody createTxSelect(); - - TxCommitBody createTxCommit(); - - TxRollbackBody createTxRollback(); - - // Channel Methods - - ChannelOpenBody createChannelOpen(); - - ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText); - - ChannelFlowBody createChannelFlow(boolean active); - - - // Exchange Methods - - - ExchangeBoundBody createExchangeBound(AMQShortString exchangeName, - AMQShortString queueName, - AMQShortString routingKey); - - ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket); - - - // Queue Methods - - QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket); - - QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket); - - QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket); - - - // Message Methods - - // In different versions of the protocol we change the class used for message transfer - // abstract this out so the appropriate methods are created - AMQMethodBody createRecover(boolean requeue); - - AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket); - - AMQMethodBody createConsumerCancel(AMQShortString consumerTag); - - AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple); - - AMQMethodBody createRejectBody(long deliveryTag, boolean requeue); - - AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize); - -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 2b9e2ffaba..cc9a33f4cf 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -21,11 +21,12 @@ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.*; import java.lang.ref.WeakReference; @@ -199,27 +200,16 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt } - private AMQShortString(ByteBuffer data, final int length) + private AMQShortString(DataInputStream data, final int length) throws IOException { if (length > MAX_LENGTH) { throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!"); } - if(data.isDirect() || data.isReadOnly()) - { - byte[] dataBytes = new byte[length]; - data.get(dataBytes); - _data = dataBytes; - _offset = 0; - } - else - { - - _data = data.array(); - _offset = data.arrayOffset() + data.position(); - data.skip(length); - - } + byte[] dataBytes = new byte[length]; + data.read(dataBytes); + _data = dataBytes; + _offset = 0; _length = length; } @@ -275,9 +265,9 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt return new CharSubSequence(start, end); } - public static AMQShortString readFromBuffer(ByteBuffer buffer) + public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException { - final short length = buffer.getUnsigned(); + final int length = buffer.readUnsignedByte(); if (length == 0) { return null; @@ -303,13 +293,13 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt } } - public void writeToBuffer(ByteBuffer buffer) + public void writeToBuffer(DataOutputStream buffer) throws IOException { final int size = length(); //buffer.setAutoExpand(true); - buffer.put((byte) size); - buffer.put(_data, _offset, size); + buffer.write((byte) size); + buffer.write(_data, _offset, size); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index 14fb63da03..f3da64e639 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -20,8 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.math.BigDecimal; /** @@ -60,12 +61,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -106,12 +107,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeUnsignedInteger(buffer, (Long) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readUnsignedInteger(buffer); } @@ -137,7 +138,7 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { BigDecimal bd = (BigDecimal) value; @@ -150,7 +151,7 @@ public enum AMQType EncodingUtils.writeInteger(buffer, unscaled); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { byte places = EncodingUtils.readByte(buffer); @@ -182,12 +183,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLong(buffer, (Long) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLong(buffer); } @@ -246,7 +247,7 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { // Ensure that the value is a FieldTable. if (!(value instanceof FieldTable)) @@ -267,7 +268,7 @@ public enum AMQType * * @return An instance of the type. */ - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { try { @@ -301,10 +302,10 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) { } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) { return null; } @@ -330,12 +331,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLongstr(buffer, (byte[]) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLongstr(buffer); } @@ -360,12 +361,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -391,12 +392,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLongStringBytes(buffer, (String) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLongString(buffer); } @@ -426,12 +427,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeBoolean(buffer, (Boolean) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readBoolean(buffer); } @@ -461,12 +462,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeChar(buffer, (Character) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readChar(buffer); } @@ -496,12 +497,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeByte(buffer, (Byte) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readByte(buffer); } @@ -535,12 +536,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeShort(buffer, (Short) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readShort(buffer); } @@ -577,12 +578,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeInteger(buffer, (Integer) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readInteger(buffer); } @@ -624,12 +625,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeLong(buffer, (Long) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readLong(buffer); } @@ -659,12 +660,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeFloat(buffer, (Float) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readFloat(buffer); } @@ -698,12 +699,12 @@ public enum AMQType } } - public void writeValueImpl(Object value, ByteBuffer buffer) + public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException { EncodingUtils.writeDouble(buffer, (Double) value); } - public Object readValueFromBuffer(ByteBuffer buffer) + public Object readValueFromBuffer(DataInputStream buffer) throws IOException { return EncodingUtils.readDouble(buffer); } @@ -770,9 +771,9 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - public void writeToBuffer(Object value, ByteBuffer buffer) + public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException { - buffer.put(identifier()); + buffer.writeByte(identifier()); writeValueImpl(value, buffer); } @@ -782,7 +783,7 @@ public enum AMQType * @param value An instance of the type. * @param buffer The byte buffer to write it to. */ - abstract void writeValueImpl(Object value, ByteBuffer buffer); + abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException; /** * Reads an instance of the type from a specified byte buffer. @@ -791,5 +792,5 @@ public enum AMQType * * @return An instance of the type. */ - abstract Object readValueFromBuffer(ByteBuffer buffer); + abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index 647d531476..1dbedca362 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -20,8 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Date; import java.util.Map; import java.math.BigDecimal; @@ -60,7 +61,7 @@ public class AMQTypedValue _value = type.toNativeValue(value); } - private AMQTypedValue(AMQType type, ByteBuffer buffer) + private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException { _type = type; _value = type.readValueFromBuffer(buffer); @@ -76,7 +77,7 @@ public class AMQTypedValue return _value; } - public void writeToBuffer(ByteBuffer buffer) + public void writeToBuffer(DataOutputStream buffer) throws IOException { _type.writeToBuffer(_value, buffer); } @@ -86,9 +87,9 @@ public class AMQTypedValue return _type.getEncodingSize(_value); } - public static AMQTypedValue readFromBuffer(ByteBuffer buffer) + public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException { - AMQType type = AMQTypeMap.getType(buffer.get()); + AMQType type = AMQTypeMap.getType(buffer.readByte()); return new AMQTypedValue(type, buffer); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index c7d89a9927..57622b5054 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,27 +37,6 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private static final AMQShortString ZERO_STRING = null; - /** - * We store the encoded form when we decode the content header so that if we need to write it out without modifying - * it we can do so without incurring the expense of reencoding it - */ - private byte[] _encodedForm; - - /** Flag indicating whether the entire content header has been decoded yet */ - private boolean _decoded = true; - - /** - * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker for - * routing in some cases so we can decode that separately. - */ - private boolean _decodedHeaders = true; - - /** - * We have some optimisations for partial decoding for maximum performance. The content type is used by all clients - * to determine the message type - */ - private boolean _decodedContentType = true; - private AMQShortString _contentType; private AMQShortString _encoding; @@ -86,10 +67,10 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private int _propertyFlags = 0; private static final int CONTENT_TYPE_MASK = 1 << 15; - private static final int ENCONDING_MASK = 1 << 14; + private static final int ENCODING_MASK = 1 << 14; private static final int HEADERS_MASK = 1 << 13; private static final int DELIVERY_MODE_MASK = 1 << 12; - private static final int PROPRITY_MASK = 1 << 11; + private static final int PRIORITY_MASK = 1 << 11; private static final int CORRELATION_ID_MASK = 1 << 10; private static final int REPLY_TO_MASK = 1 << 9; private static final int EXPIRATION_MASK = 1 << 8; @@ -101,34 +82,11 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private static final int CLUSTER_ID_MASK = 1 << 2; - /** - * This is 0_10 specific. We use this property to check if some message properties have been changed. - */ - private boolean _hasBeenUpdated = false; - - public boolean reset() - { - boolean result = _hasBeenUpdated; - _hasBeenUpdated = false; - return result; - } - - public void updated() - { - _hasBeenUpdated = true; - } - public BasicContentHeaderProperties() { } public int getPropertyListSize() { - if (_encodedForm != null) - { - return _encodedForm.length; - } - else - { int size = 0; if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0) @@ -136,7 +94,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti size += EncodingUtils.encodedShortStringLength(_contentType); } - if ((_propertyFlags & ENCONDING_MASK) > 0) + if ((_propertyFlags & ENCODING_MASK) > 0) { size += EncodingUtils.encodedShortStringLength(_encoding); } @@ -151,7 +109,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti size += 1; } - if ((_propertyFlags & PROPRITY_MASK) > 0) + if ((_propertyFlags & PRIORITY_MASK) > 0) { size += 1; } @@ -209,23 +167,10 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti } return size; - } - } - - private void clearEncodedForm() - { - if (!_decoded && (_encodedForm != null)) - { - // decode(); - } - - _encodedForm = null; } public void setPropertyFlags(int propertyFlags) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags = propertyFlags; } @@ -234,94 +179,87 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti return _propertyFlags; } - public void writePropertyListPayload(ByteBuffer buffer) + public void writePropertyListPayload(DataOutputStream buffer) throws IOException { - if (_encodedForm != null) + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) { - buffer.put(_encodedForm); + EncodingUtils.writeShortStringBytes(buffer, _contentType); } - else + + if ((_propertyFlags & ENCODING_MASK) != 0) { - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _contentType); - } + EncodingUtils.writeShortStringBytes(buffer, _encoding); + } - if ((_propertyFlags & ENCONDING_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _encoding); - } + if ((_propertyFlags & HEADERS_MASK) != 0) + { + EncodingUtils.writeFieldTableBytes(buffer, _headers); + } - if ((_propertyFlags & HEADERS_MASK) != 0) - { - EncodingUtils.writeFieldTableBytes(buffer, _headers); - } + if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) + { + buffer.writeByte(_deliveryMode); + } - if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) - { - buffer.put(_deliveryMode); - } + if ((_propertyFlags & PRIORITY_MASK) != 0) + { + buffer.writeByte(_priority); + } - if ((_propertyFlags & PROPRITY_MASK) != 0) - { - buffer.put(_priority); - } + if ((_propertyFlags & CORRELATION_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _correlationId); + } - if ((_propertyFlags & CORRELATION_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _correlationId); - } + if ((_propertyFlags & REPLY_TO_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _replyTo); + } - if ((_propertyFlags & REPLY_TO_MASK) != 0) + if ((_propertyFlags & EXPIRATION_MASK) != 0) + { + if (_expiration == 0L) { - EncodingUtils.writeShortStringBytes(buffer, _replyTo); + EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING); } - - if ((_propertyFlags & EXPIRATION_MASK) != 0) + else { - if (_expiration == 0L) - { - EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING); - } - else - { - EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); - } + EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); } + } - if ((_propertyFlags & MESSAGE_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _messageId); - } + if ((_propertyFlags & MESSAGE_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _messageId); + } - if ((_propertyFlags & TIMESTAMP_MASK) != 0) - { - EncodingUtils.writeTimestamp(buffer, _timestamp); - } + if ((_propertyFlags & TIMESTAMP_MASK) != 0) + { + EncodingUtils.writeTimestamp(buffer, _timestamp); + } - if ((_propertyFlags & TYPE_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _type); - } + if ((_propertyFlags & TYPE_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _type); + } - if ((_propertyFlags & USER_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _userId); - } + if ((_propertyFlags & USER_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _userId); + } - if ((_propertyFlags & APPLICATION_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _appId); - } + if ((_propertyFlags & APPLICATION_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _appId); + } - if ((_propertyFlags & CLUSTER_ID_MASK) != 0) - { - EncodingUtils.writeShortStringBytes(buffer, _clusterId); - } + if ((_propertyFlags & CLUSTER_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _clusterId); } } - public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException + public void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException { _propertyFlags = propertyFlags; @@ -331,25 +269,18 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti } decode(buffer); - /*_encodedForm = new byte[size]; - buffer.get(_encodedForm, 0, size); - _decoded = false; - _decodedHeaders = false; - _decodedContentType = false;*/ } - private void decode(ByteBuffer buffer) + private void decode(DataInputStream buffer) throws IOException, AMQFrameDecodingException { // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - int pos = buffer.position(); - try - { + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) { _contentType = EncodingUtils.readAMQShortString(buffer); } - if ((_propertyFlags & ENCONDING_MASK) != 0) + if ((_propertyFlags & ENCODING_MASK) != 0) { _encoding = EncodingUtils.readAMQShortString(buffer); } @@ -361,12 +292,12 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) { - _deliveryMode = buffer.get(); + _deliveryMode = buffer.readByte(); } - if ((_propertyFlags & PROPRITY_MASK) != 0) + if ((_propertyFlags & PRIORITY_MASK) != 0) { - _priority = buffer.get(); + _priority = buffer.readByte(); } if ((_propertyFlags & CORRELATION_ID_MASK) != 0) @@ -413,116 +344,29 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti { _clusterId = EncodingUtils.readAMQShortString(buffer); } - } - catch (AMQFrameDecodingException e) - { - throw new RuntimeException("Error in content header data: " + e, e); - } - final int endPos = buffer.position(); - buffer.position(pos); - final int len = endPos - pos; - _encodedForm = new byte[len]; - final int limit = buffer.limit(); - buffer.limit(endPos); - buffer.get(_encodedForm, 0, len); - buffer.limit(limit); - buffer.position(endPos); - _decoded = true; - } - private void decodeUpToHeaders() - { - ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - try - { - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - byte length = buffer.get(); - buffer.skip(length); - } - - if ((_propertyFlags & ENCONDING_MASK) != 0) - { - byte length = buffer.get(); - buffer.skip(length); - } - - if ((_propertyFlags & HEADERS_MASK) != 0) - { - _headers = EncodingUtils.readFieldTable(buffer); - - } - - _decodedHeaders = true; - } - catch (AMQFrameDecodingException e) - { - throw new RuntimeException("Error in content header data: " + e, e); - } } - private void decodeUpToContentType() - { - ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); - - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - _contentType = EncodingUtils.readAMQShortString(buffer); - } - - _decodedContentType = true; - } - - private void decodeIfNecessary() - { - if (!_decoded) - { - // decode(); - } - } - - private void decodeHeadersIfNecessary() - { - if (!_decoded && !_decodedHeaders) - { - decodeUpToHeaders(); - } - } - - private void decodeContentTypeIfNecessary() - { - if (!_decoded && !_decodedContentType) - { - decodeUpToContentType(); - } - } public AMQShortString getContentType() { - decodeContentTypeIfNecessary(); - return _contentType; } public String getContentTypeAsString() { - decodeContentTypeIfNecessary(); - return (_contentType == null) ? null : _contentType.toString(); } public void setContentType(AMQShortString contentType) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= (CONTENT_TYPE_MASK); _contentType = contentType; } public void setContentType(String contentType) { - _hasBeenUpdated = true; setContentType((contentType == null) ? null : new AMQShortString(contentType)); } @@ -534,31 +378,23 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public AMQShortString getEncoding() { - decodeIfNecessary(); - return _encoding; } public void setEncoding(String encoding) { - _hasBeenUpdated = true; - clearEncodedForm(); - _propertyFlags |= ENCONDING_MASK; + _propertyFlags |= ENCODING_MASK; _encoding = (encoding == null) ? null : new AMQShortString(encoding); } public void setEncoding(AMQShortString encoding) { - _hasBeenUpdated = true; - clearEncodedForm(); - _propertyFlags |= ENCONDING_MASK; + _propertyFlags |= ENCODING_MASK; _encoding = encoding; } public FieldTable getHeaders() { - decodeHeadersIfNecessary(); - if (_headers == null) { setHeaders(FieldTableFactory.newFieldTable()); @@ -569,191 +405,146 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setHeaders(FieldTable headers) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= HEADERS_MASK; _headers = headers; } public byte getDeliveryMode() { - decodeIfNecessary(); - return _deliveryMode; } public void setDeliveryMode(byte deliveryMode) { - clearEncodedForm(); _propertyFlags |= DELIVERY_MODE_MASK; _deliveryMode = deliveryMode; } public byte getPriority() { - decodeIfNecessary(); - return _priority; } public void setPriority(byte priority) { - clearEncodedForm(); - _propertyFlags |= PROPRITY_MASK; + _propertyFlags |= PRIORITY_MASK; _priority = priority; } public AMQShortString getCorrelationId() { - decodeIfNecessary(); - return _correlationId; } public String getCorrelationIdAsString() { - decodeIfNecessary(); - return (_correlationId == null) ? null : _correlationId.toString(); } public void setCorrelationId(String correlationId) { - _hasBeenUpdated = true; setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId)); } public void setCorrelationId(AMQShortString correlationId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= CORRELATION_ID_MASK; _correlationId = correlationId; } public String getReplyToAsString() { - decodeIfNecessary(); - return (_replyTo == null) ? null : _replyTo.toString(); } public AMQShortString getReplyTo() { - decodeIfNecessary(); - return _replyTo; } public void setReplyTo(String replyTo) { - _hasBeenUpdated = true; setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo)); } public void setReplyTo(AMQShortString replyTo) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= REPLY_TO_MASK; _replyTo = replyTo; } public long getExpiration() { - decodeIfNecessary(); return _expiration; } public void setExpiration(long expiration) { - clearEncodedForm(); _propertyFlags |= EXPIRATION_MASK; _expiration = expiration; } public AMQShortString getMessageId() { - decodeIfNecessary(); - return _messageId; } public String getMessageIdAsString() { - decodeIfNecessary(); - return (_messageId == null) ? null : _messageId.toString(); } public void setMessageId(String messageId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= MESSAGE_ID_MASK; _messageId = (messageId == null) ? null : new AMQShortString(messageId); } public void setMessageId(AMQShortString messageId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= MESSAGE_ID_MASK; _messageId = messageId; } public long getTimestamp() { - decodeIfNecessary(); return _timestamp; } public void setTimestamp(long timestamp) { - clearEncodedForm(); _propertyFlags |= TIMESTAMP_MASK; _timestamp = timestamp; } public String getTypeAsString() { - decodeIfNecessary(); - return (_type == null) ? null : _type.toString(); } public AMQShortString getType() { - decodeIfNecessary(); - return _type; } public void setType(String type) { - _hasBeenUpdated = true; setType((type == null) ? null : new AMQShortString(type)); } public void setType(AMQShortString type) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= TYPE_MASK; _type = type; } public String getUserIdAsString() { - decodeIfNecessary(); - return (_userId == null) ? null : _userId.toString(); } public AMQShortString getUserId() { - decodeIfNecessary(); - return _userId; } @@ -764,65 +555,48 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void setUserId(AMQShortString userId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= USER_ID_MASK; _userId = userId; } public String getAppIdAsString() { - decodeIfNecessary(); - return (_appId == null) ? null : _appId.toString(); } public AMQShortString getAppId() { - decodeIfNecessary(); - return _appId; } public void setAppId(String appId) { - _hasBeenUpdated = true; setAppId((appId == null) ? null : new AMQShortString(appId)); } public void setAppId(AMQShortString appId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= APPLICATION_ID_MASK; _appId = appId; - _hasBeenUpdated = true; } public String getClusterIdAsString() { - _hasBeenUpdated = true; - decodeIfNecessary(); return (_clusterId == null) ? null : _clusterId.toString(); } public AMQShortString getClusterId() { - _hasBeenUpdated = true; - decodeIfNecessary(); return _clusterId; } public void setClusterId(String clusterId) { - _hasBeenUpdated = true; setClusterId((clusterId == null) ? null : new AMQShortString(clusterId)); } public void setClusterId(AMQShortString clusterId) { - _hasBeenUpdated = true; - clearEncodedForm(); _propertyFlags |= CLUSTER_ID_MASK; _clusterId = clusterId; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java index 59646577e1..f9580d82b1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; /** * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. */ public interface BodyFactory { - AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException; + AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 94030f383e..15bc20c52d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataOutputStream; +import java.io.IOException; public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { @@ -49,7 +50,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD return frameSize; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { for (int i = 0; i < _blocks.length; i++) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 9d39f8aa86..aedb35f92a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -28,27 +31,22 @@ public class ContentBody implements AMQBody { public static final byte TYPE = 3; - public ByteBuffer payload; + public byte[] _payload; public ContentBody() { } - public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { - if (size > 0) - { - payload = buffer.slice(); - payload.limit((int) size); - buffer.skip((int) size); - } - + _payload = new byte[(int)size]; + buffer.read(_payload); } - public ContentBody(ByteBuffer payload) + public ContentBody(byte[] payload) { - this.payload = payload; + _payload = payload; } public byte getFrameType() @@ -58,23 +56,12 @@ public class ContentBody implements AMQBody public int getSize() { - return (payload == null ? 0 : payload.limit()); + return _payload == null ? 0 : _payload.length; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { - if (payload != null) - { - if(payload.isDirect() || payload.isReadOnly()) - { - ByteBuffer copy = payload.duplicate(); - buffer.put(copy.rewind()); - } - else - { - buffer.put(payload.array(),payload.arrayOffset(),payload.limit()); - } - } + buffer.write(_payload); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -83,32 +70,18 @@ public class ContentBody implements AMQBody session.contentBodyReceived(channelId, this); } - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { if (size > 0) { - payload = buffer.slice(); - payload.limit((int) size); - buffer.skip((int) size); + _payload = new byte[(int)size]; + buffer.read(_payload); } } public void reduceBufferToFit() { - if (payload != null && (payload.remaining() < payload.capacity() / 2)) - { - int size = payload.limit(); - ByteBuffer newPayload = ByteBuffer.allocate(size); - - newPayload.put(payload); - newPayload.flip(); - - //reduce reference count on payload - payload.release(); - - payload = newPayload; - } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index c42995d148..a0b030ab6b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,7 @@ public class ContentBodyFactory implements BodyFactory _log.debug("Creating content body factory"); } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException { return new ContentBody(in, bodySize); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 7526d4c756..18d0f26152 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -42,12 +45,12 @@ public class ContentHeaderBody implements AMQBody { } - public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + public ContentHeaderBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { - classId = buffer.getUnsignedShort(); - weight = buffer.getUnsignedShort(); - bodySize = buffer.getLong(); - int propertyFlags = buffer.getUnsignedShort(); + classId = buffer.readUnsignedShort(); + weight = buffer.readUnsignedShort(); + bodySize = buffer.readLong(); + int propertyFlags = buffer.readUnsignedShort(); ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); @@ -72,13 +75,13 @@ public class ContentHeaderBody implements AMQBody return TYPE; } - protected void populateFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException + protected void populateFromBuffer(DataInputStream buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { - classId = buffer.getUnsignedShort(); - weight = buffer.getUnsignedShort(); - bodySize = buffer.getLong(); - int propertyFlags = buffer.getUnsignedShort(); + classId = buffer.readUnsignedShort(); + weight = buffer.readUnsignedShort(); + bodySize = buffer.readLong(); + int propertyFlags = buffer.readUnsignedShort(); ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); } @@ -90,8 +93,8 @@ public class ContentHeaderBody implements AMQBody * @return * @throws AMQFrameDecodingException */ - public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException + public static ContentHeaderBody createFromBuffer(DataInputStream buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { ContentHeaderBody body = new ContentHeaderBody(buffer, size); @@ -103,11 +106,11 @@ public class ContentHeaderBody implements AMQBody return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { EncodingUtils.writeUnsignedShort(buffer, classId); EncodingUtils.writeUnsignedShort(buffer, weight); - buffer.putLong(bodySize); + buffer.writeLong(bodySize); EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); properties.writePropertyListPayload(buffer); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index 8d5e2f9fb4..a474e337b7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,7 @@ public class ContentHeaderBodyFactory implements BodyFactory _log.debug("Creating content header body factory"); } - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException { // all content headers are the same - it is only the properties that differ. // the content header body further delegates construction of properties diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java index 7ef538cfdc..237929f9a3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + /** * There will be an implementation of this interface for each content type. All content types have associated @@ -32,7 +35,7 @@ public interface ContentHeaderProperties * Writes the property list to the buffer, in a suitably encoded form. * @param buffer The buffer to write to */ - void writePropertyListPayload(ByteBuffer buffer); + void writePropertyListPayload(DataOutputStream buffer) throws IOException; /** * Populates the properties from buffer. @@ -40,8 +43,8 @@ public interface ContentHeaderProperties * @param propertyFlags he property flags. * @throws AMQFrameDecodingException when the buffer does not contain valid data */ - void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException; + void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) + throws AMQFrameDecodingException, IOException; /** * @return the size of the encoded property list in bytes. @@ -56,5 +59,4 @@ public interface ContentHeaderProperties */ int getPropertyFlags(); - void updated(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 46189b63d7..43ee8cd1f1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; @@ -38,8 +39,8 @@ public class ContentHeaderPropertiesFactory } public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, - ByteBuffer buffer, int size) - throws AMQFrameDecodingException + DataInputStream buffer, int size) + throws AMQFrameDecodingException, IOException { ContentHeaderProperties properties; // AMQP version change: "Hardwired" version to major=8, minor=0 diff --git a/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java deleted file mode 100644 index f6795ff200..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public abstract class DeferredDataBlock extends AMQDataBlock -{ - private AMQDataBlock _underlyingDataBlock; - - - public long getSize() - { - if(_underlyingDataBlock == null) - { - _underlyingDataBlock = createAMQDataBlock(); - } - return _underlyingDataBlock.getSize(); - } - - public void writePayload(ByteBuffer buffer) - { - if(_underlyingDataBlock == null) - { - _underlyingDataBlock = createAMQDataBlock(); - } - _underlyingDataBlock.writePayload(buffer); - } - - abstract protected AMQDataBlock createAMQDataBlock(); - -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 6425f8c591..2d7e27405c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.nio.charset.Charset; public class EncodingUtils @@ -218,7 +219,7 @@ public class EncodingUtils return 0; } - public static void writeShortStringBytes(ByteBuffer buffer, String s) + public static void writeShortStringBytes(DataOutputStream buffer, String s) throws IOException { if (s != null) { @@ -231,18 +232,18 @@ public class EncodingUtils // TODO: check length fits in an unsigned byte writeUnsignedByte(buffer, (short)encodedString.length); - buffer.put(encodedString); + buffer.write(encodedString); } else { // really writing out unsigned byte - buffer.put((byte) 0); + buffer.write((byte) 0); } } - public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s) + public static void writeShortStringBytes(DataOutputStream buffer, AMQShortString s) throws IOException { if (s != null) { @@ -252,11 +253,11 @@ public class EncodingUtils else { // really writing out unsigned byte - buffer.put((byte) 0); + buffer.write((byte) 0); } } - public static void writeLongStringBytes(ByteBuffer buffer, String s) + public static void writeLongStringBytes(DataOutputStream buffer, String s) throws IOException { assert (s == null) || (s.length() <= 0xFFFE); if (s != null) @@ -270,7 +271,7 @@ public class EncodingUtils encodedString[i] = (byte) cha[i]; } - buffer.put(encodedString); + buffer.write(encodedString); } else { @@ -278,7 +279,7 @@ public class EncodingUtils } } - public static void writeLongStringBytes(ByteBuffer buffer, char[] s) + public static void writeLongStringBytes(DataOutputStream buffer, char[] s) throws IOException { assert (s == null) || (s.length <= 0xFFFE); if (s != null) @@ -291,7 +292,7 @@ public class EncodingUtils encodedString[i] = (byte) s[i]; } - buffer.put(encodedString); + buffer.write(encodedString); } else { @@ -299,13 +300,13 @@ public class EncodingUtils } } - public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes) + public static void writeLongStringBytes(DataOutputStream buffer, byte[] bytes) throws IOException { assert (bytes == null) || (bytes.length <= 0xFFFE); if (bytes != null) { writeUnsignedInteger(buffer, bytes.length); - buffer.put(bytes); + buffer.write(bytes); } else { @@ -313,24 +314,24 @@ public class EncodingUtils } } - public static void writeUnsignedByte(ByteBuffer buffer, short b) + public static void writeUnsignedByte(DataOutputStream buffer, short b) throws IOException { byte bv = (byte) b; - buffer.put(bv); + buffer.write(bv); } - public static void writeUnsignedShort(ByteBuffer buffer, int s) + public static void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException { // TODO: Is this comparison safe? Do I need to cast RHS to long? if (s < Short.MAX_VALUE) { - buffer.putShort((short) s); + buffer.writeShort(s); } else { short sv = (short) s; - buffer.put((byte) (0xFF & (sv >> 8))); - buffer.put((byte) (0xFF & sv)); + buffer.write((byte) (0xFF & (sv >> 8))); + buffer.write((byte) (0xFF & sv)); } } @@ -339,12 +340,12 @@ public class EncodingUtils return 4; } - public static void writeUnsignedInteger(ByteBuffer buffer, long l) + public static void writeUnsignedInteger(DataOutputStream buffer, long l) throws IOException { // TODO: Is this comparison safe? Do I need to cast RHS to long? if (l < Integer.MAX_VALUE) { - buffer.putInt((int) l); + buffer.writeInt((int) l); } else { @@ -352,14 +353,14 @@ public class EncodingUtils // FIXME: This *may* go faster if we build this into a local 4-byte array and then // put the array in a single call. - buffer.put((byte) (0xFF & (iv >> 24))); - buffer.put((byte) (0xFF & (iv >> 16))); - buffer.put((byte) (0xFF & (iv >> 8))); - buffer.put((byte) (0xFF & iv)); + buffer.write((byte) (0xFF & (iv >> 24))); + buffer.write((byte) (0xFF & (iv >> 16))); + buffer.write((byte) (0xFF & (iv >> 8))); + buffer.write((byte) (0xFF & iv)); } } - public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) + public static void writeFieldTableBytes(DataOutputStream buffer, FieldTable table) throws IOException { if (table != null) { @@ -371,12 +372,12 @@ public class EncodingUtils } } - public static void writeContentBytes(ByteBuffer buffer, Content content) + public static void writeContentBytes(DataOutputStream buffer, Content content) { // TODO: New Content class required for AMQP 0-9. } - public static void writeBooleans(ByteBuffer buffer, boolean[] values) + public static void writeBooleans(DataOutputStream buffer, boolean[] values) throws IOException { byte packedValue = 0; for (int i = 0; i < values.length; i++) @@ -387,16 +388,16 @@ public class EncodingUtils } } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value) + public static void writeBooleans(DataOutputStream buffer, boolean value) throws IOException { - buffer.put(value ? (byte) 1 : (byte) 0); + buffer.write(value ? (byte) 1 : (byte) 0); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -405,10 +406,10 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 1)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -422,10 +423,10 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 2)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -444,11 +445,11 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 3)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, - boolean value4) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -472,11 +473,11 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 4)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, - boolean value4, boolean value5) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -505,11 +506,11 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 5)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, - boolean value4, boolean value5, boolean value6) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5, boolean value6) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -543,11 +544,11 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 6)); } - buffer.put(packedValue); + buffer.write(packedValue); } - public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, - boolean value4, boolean value5, boolean value6, boolean value7) + public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5, boolean value6, boolean value7) throws IOException { byte packedValue = value0 ? (byte) 1 : (byte) 0; @@ -586,7 +587,7 @@ public class EncodingUtils packedValue = (byte) (packedValue | (byte) (1 << 7)); } - buffer.put(packedValue); + buffer.write(packedValue); } /** @@ -595,12 +596,12 @@ public class EncodingUtils * @param buffer * @param data */ - public static void writeLongstr(ByteBuffer buffer, byte[] data) + public static void writeLongstr(DataOutputStream buffer, byte[] data) throws IOException { if (data != null) { writeUnsignedInteger(buffer, data.length); - buffer.put(data); + buffer.write(data); } else { @@ -608,14 +609,14 @@ public class EncodingUtils } } - public static void writeTimestamp(ByteBuffer buffer, long timestamp) + public static void writeTimestamp(DataOutputStream buffer, long timestamp) throws IOException { writeLong(buffer, timestamp); } - public static boolean[] readBooleans(ByteBuffer buffer) + public static boolean[] readBooleans(DataInputStream buffer) throws IOException { - final byte packedValue = buffer.get(); + final byte packedValue = buffer.readByte(); if (packedValue == 0) { return ALL_FALSE_ARRAY; @@ -640,9 +641,9 @@ public class EncodingUtils return result; } - public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + public static FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException { - long length = buffer.getUnsignedInt(); + long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) { return null; @@ -653,21 +654,21 @@ public class EncodingUtils } } - public static Content readContent(ByteBuffer buffer) throws AMQFrameDecodingException + public static Content readContent(DataInputStream buffer) throws AMQFrameDecodingException { // TODO: New Content class required for AMQP 0-9. return null; } - public static AMQShortString readAMQShortString(ByteBuffer buffer) + public static AMQShortString readAMQShortString(DataInputStream buffer) throws IOException { return AMQShortString.readFromBuffer(buffer); } - public static String readShortString(ByteBuffer buffer) + public static String readShortString(DataInputStream buffer) throws IOException { - short length = buffer.getUnsigned(); + short length = (short) (((short)buffer.readByte()) & 0xFF); if (length == 0) { return null; @@ -680,7 +681,7 @@ public class EncodingUtils // this approach here is valid since we know that all the chars are // ASCII (0-127) byte[] stringBytes = new byte[length]; - buffer.get(stringBytes, 0, length); + buffer.read(stringBytes, 0, length); char[] stringChars = new char[length]; for (int i = 0; i < stringChars.length; i++) { @@ -691,9 +692,9 @@ public class EncodingUtils } } - public static String readLongString(ByteBuffer buffer) + public static String readLongString(DataInputStream buffer) throws IOException { - long length = buffer.getUnsignedInt(); + long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) { return ""; @@ -706,7 +707,7 @@ public class EncodingUtils // this approach here is valid since we know that all the chars are // ASCII (0-127) byte[] stringBytes = new byte[(int) length]; - buffer.get(stringBytes, 0, (int) length); + buffer.read(stringBytes, 0, (int) length); char[] stringChars = new char[(int) length]; for (int i = 0; i < stringChars.length; i++) { @@ -717,9 +718,9 @@ public class EncodingUtils } } - public static byte[] readLongstr(ByteBuffer buffer) + public static byte[] readLongstr(DataInputStream buffer) throws IOException { - long length = buffer.getUnsignedInt(); + long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) { return null; @@ -727,17 +728,17 @@ public class EncodingUtils else { byte[] result = new byte[(int) length]; - buffer.get(result); + buffer.read(result); return result; } } - public static long readTimestamp(ByteBuffer buffer) + public static long readTimestamp(DataInputStream buffer) throws IOException { // Discard msb from AMQ timestamp // buffer.getUnsignedInt(); - return buffer.getLong(); + return buffer.readLong(); } static byte[] hexToByteArray(String id) @@ -817,14 +818,14 @@ public class EncodingUtils // AMQP_BOOLEAN_PROPERTY_PREFIX - public static void writeBoolean(ByteBuffer buffer, Boolean aBoolean) + public static void writeBoolean(DataOutputStream buffer, Boolean aBoolean) throws IOException { - buffer.put((byte) (aBoolean ? 1 : 0)); + buffer.write(aBoolean ? 1 : 0); } - public static boolean readBoolean(ByteBuffer buffer) + public static boolean readBoolean(DataInputStream buffer) throws IOException { - byte packedValue = buffer.get(); + byte packedValue = buffer.readByte(); return (packedValue == 1); } @@ -835,14 +836,14 @@ public class EncodingUtils } // AMQP_BYTE_PROPERTY_PREFIX - public static void writeByte(ByteBuffer buffer, Byte aByte) + public static void writeByte(DataOutputStream buffer, Byte aByte) throws IOException { - buffer.put(aByte); + buffer.writeByte(aByte); } - public static byte readByte(ByteBuffer buffer) + public static byte readByte(DataInputStream buffer) throws IOException { - return buffer.get(); + return buffer.readByte(); } public static int encodedByteLength() @@ -851,14 +852,14 @@ public class EncodingUtils } // AMQP_SHORT_PROPERTY_PREFIX - public static void writeShort(ByteBuffer buffer, Short aShort) + public static void writeShort(DataOutputStream buffer, Short aShort) throws IOException { - buffer.putShort(aShort); + buffer.writeShort(aShort); } - public static short readShort(ByteBuffer buffer) + public static short readShort(DataInputStream buffer) throws IOException { - return buffer.getShort(); + return buffer.readShort(); } public static int encodedShortLength() @@ -867,14 +868,14 @@ public class EncodingUtils } // INTEGER_PROPERTY_PREFIX - public static void writeInteger(ByteBuffer buffer, Integer aInteger) + public static void writeInteger(DataOutputStream buffer, Integer aInteger) throws IOException { - buffer.putInt(aInteger); + buffer.writeInt(aInteger); } - public static int readInteger(ByteBuffer buffer) + public static int readInteger(DataInputStream buffer) throws IOException { - return buffer.getInt(); + return buffer.readInt(); } public static int encodedIntegerLength() @@ -883,14 +884,14 @@ public class EncodingUtils } // AMQP_LONG_PROPERTY_PREFIX - public static void writeLong(ByteBuffer buffer, Long aLong) + public static void writeLong(DataOutputStream buffer, Long aLong) throws IOException { - buffer.putLong(aLong); + buffer.writeLong(aLong); } - public static long readLong(ByteBuffer buffer) + public static long readLong(DataInputStream buffer) throws IOException { - return buffer.getLong(); + return buffer.readLong(); } public static int encodedLongLength() @@ -899,14 +900,14 @@ public class EncodingUtils } // Float_PROPERTY_PREFIX - public static void writeFloat(ByteBuffer buffer, Float aFloat) + public static void writeFloat(DataOutputStream buffer, Float aFloat) throws IOException { - buffer.putFloat(aFloat); + buffer.writeFloat(aFloat); } - public static float readFloat(ByteBuffer buffer) + public static float readFloat(DataInputStream buffer) throws IOException { - return buffer.getFloat(); + return buffer.readFloat(); } public static int encodedFloatLength() @@ -915,14 +916,14 @@ public class EncodingUtils } // Double_PROPERTY_PREFIX - public static void writeDouble(ByteBuffer buffer, Double aDouble) + public static void writeDouble(DataOutputStream buffer, Double aDouble) throws IOException { - buffer.putDouble(aDouble); + buffer.writeDouble(aDouble); } - public static double readDouble(ByteBuffer buffer) + public static double readDouble(DataInputStream buffer) throws IOException { - return buffer.getDouble(); + return buffer.readDouble(); } public static int encodedDoubleLength() @@ -930,9 +931,9 @@ public class EncodingUtils return 8; } - public static byte[] readBytes(ByteBuffer buffer) + public static byte[] readBytes(DataInputStream buffer) throws IOException { - long length = buffer.getUnsignedInt(); + long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL; if (length == 0) { return null; @@ -940,19 +941,19 @@ public class EncodingUtils else { byte[] dataBytes = new byte[(int)length]; - buffer.get(dataBytes, 0, (int)length); + buffer.read(dataBytes, 0, (int) length); return dataBytes; } } - public static void writeBytes(ByteBuffer buffer, byte[] data) + public static void writeBytes(DataOutputStream buffer, byte[] data) throws IOException { if (data != null) { // TODO: check length fits in an unsigned byte writeUnsignedInteger(buffer, (long)data.length); - buffer.put(data); + buffer.write(data); } else { @@ -968,35 +969,35 @@ public class EncodingUtils return encodedByteLength(); } - public static char readChar(ByteBuffer buffer) + public static char readChar(DataInputStream buffer) throws IOException { // This is valid as we know that the Character is ASCII 0..127 - return (char) buffer.get(); + return (char) buffer.read(); } - public static void writeChar(ByteBuffer buffer, char character) + public static void writeChar(DataOutputStream buffer, char character) throws IOException { // This is valid as we know that the Character is ASCII 0..127 writeByte(buffer, (byte) character); } - public static long readLongAsShortString(ByteBuffer buffer) + public static long readLongAsShortString(DataInputStream buffer) throws IOException { - short length = buffer.getUnsigned(); + short length = (short) buffer.readUnsignedByte(); short pos = 0; if (length == 0) { return 0L; } - byte digit = buffer.get(); + byte digit = buffer.readByte(); boolean isNegative; long result = 0; if (digit == (byte) '-') { isNegative = true; pos++; - digit = buffer.get(); + digit = buffer.readByte(); } else { @@ -1009,7 +1010,7 @@ public class EncodingUtils while (pos < length) { pos++; - digit = buffer.get(); + digit = buffer.readByte(); result = (result << 3) + (result << 1); result += digit - (byte) '0'; } @@ -1017,15 +1018,15 @@ public class EncodingUtils return result; } - public static long readUnsignedInteger(ByteBuffer buffer) + public static long readUnsignedInteger(DataInputStream buffer) throws IOException { - long l = 0xFF & buffer.get(); + long l = 0xFF & buffer.readByte(); l <<= 8; - l = l | (0xFF & buffer.get()); + l = l | (0xFF & buffer.readByte()); l <<= 8; - l = l | (0xFF & buffer.get()); + l = l | (0xFF & buffer.readByte()); l <<= 8; - l = l | (0xFF & buffer.get()); + l = l | (0xFF & buffer.readByte()); return l; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 22205d49f8..721c821bab 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -20,12 +20,16 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQPInvalidClassException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.math.BigDecimal; import java.util.Collections; import java.util.Enumeration; @@ -43,8 +47,8 @@ public class FieldTable private static final String STRICT_AMQP = "STRICT_AMQP"; private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false")); - private ByteBuffer _encodedForm; - private LinkedHashMap<AMQShortString, AMQTypedValue> _properties; + private byte[] _encodedForm; + private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null; private long _encodedSize; private static final int INITIAL_HASHMAP_CAPACITY = 16; private static final int INITIAL_ENCODED_FORM_SIZE = 256; @@ -52,9 +56,6 @@ public class FieldTable public FieldTable() { super(); - // _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE); - // _encodedForm.setAutoExpand(true); - // _encodedForm.limit(0); } /** @@ -63,16 +64,12 @@ public class FieldTable * @param buffer the buffer from which to read data. The length byte must be read already * @param length the length of the field table. Must be > 0. */ - public FieldTable(ByteBuffer buffer, long length) + public FieldTable(DataInputStream buffer, long length) throws IOException { this(); - ByteBuffer encodedForm = buffer.slice(); - encodedForm.limit((int) length); - _encodedForm = ByteBuffer.allocate((int)length); - _encodedForm.put(encodedForm); - _encodedForm.flip(); + _encodedForm = new byte[(int) length]; + buffer.read(_encodedForm); _encodedSize = length; - buffer.skip((int) length); } public AMQTypedValue getProperty(AMQShortString string) @@ -108,13 +105,19 @@ public class FieldTable { try { - setFromBuffer(_encodedForm, _encodedSize); + setFromBuffer(); } catch (AMQFrameDecodingException e) { _logger.error("Error decoding FieldTable in deferred decoding mode ", e); throw new IllegalArgumentException(e); } + catch (IOException e) + { + _logger.error("Unexpected IO exception decoding field table"); + throw new IllegalArgumentException(e); + + } } private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val) @@ -766,7 +769,7 @@ public class FieldTable // ************************* Byte Buffer Processing - public void writeToBuffer(ByteBuffer buffer) + public void writeToBuffer(DataOutputStream buffer) throws IOException { final boolean trace = _logger.isDebugEnabled(); @@ -786,17 +789,21 @@ public class FieldTable public byte[] getDataAsBytes() { - final int encodedSize = (int) getEncodedSize(); - final ByteBuffer buffer = ByteBuffer.allocate(encodedSize); // FIXME XXX: Is cast a problem? - - putDataInBuffer(buffer); - - final byte[] result = new byte[encodedSize]; - buffer.flip(); - buffer.get(result); - buffer.release(); + if(_encodedForm == null) + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try + { + putDataInBuffer(new DataOutputStream(baos)); + return baos.toByteArray(); + } + catch (IOException e) + { + throw new IllegalArgumentException("IO Exception should never be thrown here"); + } - return result; + } + return _encodedForm.clone(); } public long getEncodedSize() @@ -926,15 +933,8 @@ public class FieldTable public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator() { - if(_encodedForm != null) - { - return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize); - } - else - { - initMapIfNecessary(); - return _properties.entrySet().iterator(); - } + initMapIfNecessary(); + return _properties.entrySet().iterator(); } public Object get(String key) @@ -1002,26 +1002,12 @@ public class FieldTable return _properties.keySet(); } - private void putDataInBuffer(ByteBuffer buffer) + private void putDataInBuffer(DataOutputStream buffer) throws IOException { if (_encodedForm != null) { - if(buffer.isDirect() || buffer.isReadOnly()) - { - ByteBuffer encodedForm = _encodedForm.duplicate(); - - if (encodedForm.position() != 0) - { - encodedForm.flip(); - } - - buffer.put(encodedForm); - } - else - { - buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize); - } + buffer.write(_encodedForm); } else if (_properties != null) { @@ -1035,41 +1021,27 @@ public class FieldTable final Map.Entry<AMQShortString, AMQTypedValue> me = it.next(); try { - if (_logger.isDebugEnabled()) - { - _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" - + me.getValue().getValue()); - _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); - } - // Write the actual parameter name EncodingUtils.writeShortStringBytes(buffer, me.getKey()); me.getValue().writeToBuffer(buffer); } catch (Exception e) { - if (_logger.isDebugEnabled()) - { - _logger.debug("Exception thrown:" + e); - _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" - + me.getValue().getValue()); - _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); - } - throw new RuntimeException(e); } } } } - private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException + private void setFromBuffer() throws AMQFrameDecodingException, IOException { + final ByteArrayInputStream in = new ByteArrayInputStream(_encodedForm); + DataInputStream buffer = new DataInputStream(in); final boolean trace = _logger.isDebugEnabled(); - if (length > 0) + if (_encodedSize > 0) { - final int expectedRemaining = buffer.remaining() - (int) length; _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY); @@ -1077,121 +1049,16 @@ public class FieldTable { final AMQShortString key = EncodingUtils.readAMQShortString(buffer); - - _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read key '" + key); - AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); - - if (trace) - { - _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() - + "', key '" + key + "', value '" + value.getValue() + "'"); - } - _properties.put(key, value); } - while (buffer.remaining() > expectedRemaining); - - } - - _encodedSize = length; - - if (trace) - { - _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done."); - } - } - - private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue> - { - private final AMQTypedValue _value; - private final AMQShortString _key; - - public FieldTableEntry(final AMQShortString key, final AMQTypedValue value) - { - _key = key; - _value = value; - } - - public AMQShortString getKey() - { - return _key; - } - - public AMQTypedValue getValue() - { - return _value; - } - - public AMQTypedValue setValue(final AMQTypedValue value) - { - throw new UnsupportedOperationException(); - } - - public boolean equals(Object o) - { - if(o instanceof FieldTableEntry) - { - FieldTableEntry other = (FieldTableEntry) o; - return (_key == null ? other._key == null : _key.equals(other._key)) - && (_value == null ? other._value == null : _value.equals(other._value)); - } - else - { - return false; - } - } - - public int hashCode() - { - return (getKey()==null ? 0 : getKey().hashCode()) - ^ (getValue()==null ? 0 : getValue().hashCode()); - } - - } - - - private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>> - { + while (in.available() > 0); - private final ByteBuffer _buffer; - private int _expectedRemaining; - - public FieldTableIterator(ByteBuffer buffer, int length) - { - _buffer = buffer; - _expectedRemaining = buffer.remaining() - length; - } - - public boolean hasNext() - { - return (_buffer.remaining() > _expectedRemaining); } - public Map.Entry<AMQShortString, AMQTypedValue> next() - { - if(hasNext()) - { - final AMQShortString key = EncodingUtils.readAMQShortString(_buffer); - AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer); - return new FieldTableEntry(key, value); - } - else - { - return null; - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } } - - - public int hashCode() { initMapIfNecessary(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java index e9d75137ef..438a46f28b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; public class FieldTableFactory { @@ -29,7 +30,7 @@ public class FieldTableFactory return new FieldTable(); } - public static FieldTable newFieldTable(ByteBuffer byteBuffer, long length) throws AMQFrameDecodingException + public static FieldTable newFieldTable(DataInputStream byteBuffer, long length) throws AMQFrameDecodingException, IOException { return new FieldTable(byteBuffer, length); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 18ab05ffa1..a6ce721a50 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -34,12 +37,12 @@ public class HeartbeatBody implements AMQBody } - public HeartbeatBody(ByteBuffer buffer, long size) + public HeartbeatBody(DataInputStream buffer, long size) throws IOException { if(size > 0) { //allow other implementations to have a payload, but ignore it: - buffer.skip((int) size); + buffer.skip(size); } } @@ -53,7 +56,7 @@ public class HeartbeatBody implements AMQBody return 0;//heartbeats we generate have no payload } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) { } @@ -63,12 +66,12 @@ public class HeartbeatBody implements AMQBody session.heartbeatBodyReceived(channelId, this); } - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { if(size > 0) { //allow other implementations to have a payload, but ignore it: - buffer.skip((int) size); + buffer.skip(size); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index c7ada708dc..dfc49c6167 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index fb3dd89717..8c018316f0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -22,6 +22,10 @@ package org.apache.qpid.framing; import org.apache.qpid.AMQException; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -62,35 +66,30 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) + public ProtocolInitiation(DataInputStream in) throws IOException { _protocolHeader = new byte[4]; - in.get(_protocolHeader); + in.read(_protocolHeader); - _protocolClass = in.get(); - _protocolInstance = in.get(); - _protocolMajor = in.get(); - _protocolMinor = in.get(); + _protocolClass = in.readByte(); + _protocolInstance = in.readByte(); + _protocolMajor = in.readByte(); + _protocolMinor = in.readByte(); } - public void writePayload(org.apache.mina.common.ByteBuffer buffer) - { - writePayload(buffer.buf()); - } - public long getSize() { return 4 + 1 + 1 + 1 + 1; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { - buffer.put(_protocolHeader); - buffer.put(_protocolClass); - buffer.put(_protocolInstance); - buffer.put(_protocolMajor); - buffer.put(_protocolMinor); + buffer.write(_protocolHeader); + buffer.write(_protocolClass); + buffer.write(_protocolInstance); + buffer.write(_protocolMajor); + buffer.write(_protocolMinor); } public boolean equals(Object o) @@ -144,9 +143,9 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(ByteBuffer in) + public boolean decodable(DataInputStream in) throws IOException { - return (in.remaining() >= 8); + return (in.available() >= 8); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java index bd763599b0..d2925d13a8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -21,7 +21,8 @@ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataOutputStream; +import java.io.IOException; public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { @@ -68,7 +69,7 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl return frameSize; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { if (_firstFrame != null) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index 76c154581d..ed9136f7c9 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -144,7 +145,7 @@ public class VersionSpecificRegistry } - public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) throws AMQFrameDecodingException + public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException { AMQMethodBodyInstanceFactory bodyFactory; try diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java index 0695349f76..470b7b05e3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java @@ -21,12 +21,10 @@ package org.apache.qpid.framing.abstraction; -import org.apache.mina.common.ByteBuffer; - public interface ContentChunk { int getSize(); - ByteBuffer getData(); + byte[] getData(); void reduceToFit(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java index 7544d9b7e7..d1e53d6907 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -23,8 +23,6 @@ package org.apache.qpid.framing.abstraction; import org.apache.qpid.framing.AMQBody; -import java.nio.ByteBuffer; - public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter { AMQBody convertToBody(ContentChunk contentBody); @@ -32,5 +30,5 @@ public interface ProtocolVersionMethodConverter extends MessagePublishInfoConver void configure(); - AMQBody convertToBody(ByteBuffer buf); + AMQBody convertToBody(byte[] input); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index 1c4a29b106..90a730d6f7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -21,16 +21,13 @@ package org.apache.qpid.framing.amqp_0_9; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.*; -import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl; + public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { @@ -72,9 +69,9 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot } - public AMQBody convertToBody(java.nio.ByteBuffer buf) + public AMQBody convertToBody(byte[] data) { - return new ContentBody(ByteBuffer.wrap(buf)); + return new ContentBody(data); } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) @@ -116,9 +113,9 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot return _contentBodyChunk.getSize(); } - public ByteBuffer getData() + public byte[] getData() { - return _contentBodyChunk.payload; + return _contentBodyChunk._payload; } public void reduceToFit() diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java index 6e330574bc..3b0cc3cebc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java @@ -21,8 +21,6 @@ package org.apache.qpid.framing.amqp_0_91; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; @@ -70,9 +68,9 @@ public class MethodConverter_0_91 extends AbstractMethodConverter implements Pro } - public AMQBody convertToBody(java.nio.ByteBuffer buf) + public AMQBody convertToBody(byte[] data) { - return new ContentBody(ByteBuffer.wrap(buf)); + return new ContentBody(data); } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) @@ -114,9 +112,9 @@ public class MethodConverter_0_91 extends AbstractMethodConverter implements Pro return _contentBodyChunk.getSize(); } - public ByteBuffer getData() + public byte[] getData() { - return _contentBodyChunk.payload; + return _contentBodyChunk._payload; } public void reduceToFit() diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java index c87820b9b2..e6d0482f0d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -26,11 +26,8 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; import org.apache.qpid.framing.*; -import org.apache.mina.common.ByteBuffer; - public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { private int _basicPublishClassId; @@ -60,9 +57,9 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot return contentBodyChunk.getSize(); } - public ByteBuffer getData() + public byte[] getData() { - return contentBodyChunk.payload; + return contentBodyChunk._payload; } public void reduceToFit() @@ -81,9 +78,9 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot } - public AMQBody convertToBody(java.nio.ByteBuffer buf) + public AMQBody convertToBody(byte[] data) { - return new ContentBody(ByteBuffer.wrap(buf)); + return new ContentBody(data); } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) diff --git a/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java b/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java deleted file mode 100644 index 2c7652abeb..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.qpid.transport; - -import org.apache.mina.common.IoConnector; - -public interface SocketConnectorFactory -{ - IoConnector newConnector(); -}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 7f0f04f9c4..e1d1596ec5 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -37,13 +37,6 @@ import org.apache.qpid.transport.util.Logger; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport { - static - { - org.apache.mina.common.ByteBuffer.setAllocator - (new org.apache.mina.common.SimpleByteBufferAllocator()); - org.apache.mina.common.ByteBuffer.setUseDirectBuffers - (Boolean.getBoolean("amqj.enableDirectBuffers")); - } private static final Logger LOGGER = Logger.get(IoNetworkTransport.class); diff --git a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java index 62e25e7d79..272eb75800 100644 --- a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java +++ b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -21,6 +21,9 @@ package org.apache.qpid.codec; */ +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -46,9 +49,16 @@ public class AMQDecoderTest extends TestCase } - public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + private ByteBuffer getHeartbeatBodyBuffer() throws IOException { - ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + HeartbeatBody.FRAME.writePayload(new DataOutputStream(baos)); + return ByteBuffer.wrap(baos.toByteArray()); + } + + public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException + { + ByteBuffer msg = getHeartbeatBodyBuffer(); ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); if (frames.get(0) instanceof AMQFrame) { @@ -60,9 +70,9 @@ public class AMQDecoderTest extends TestCase } } - public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msg = getHeartbeatBodyBuffer(); ByteBuffer msgA = msg.slice(); int msgbPos = msg.remaining() / 2; int msgaLimit = msg.remaining() - msgbPos; @@ -83,10 +93,10 @@ public class AMQDecoderTest extends TestCase } } - public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = getHeartbeatBodyBuffer(); + ByteBuffer msgB = getHeartbeatBodyBuffer(); ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining()); msg.put(msgA); msg.put(msgB); @@ -106,11 +116,11 @@ public class AMQDecoderTest extends TestCase } } - public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = getHeartbeatBodyBuffer(); + ByteBuffer msgB = getHeartbeatBodyBuffer(); + ByteBuffer msgC = getHeartbeatBodyBuffer(); ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2); sliceA.put(msgA); diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java index 4fd1f60d69..5e7783f492 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import junit.framework.TestCase; +import java.io.*; + public class BasicContentHeaderPropertiesTest extends TestCase { @@ -76,15 +76,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase assertEquals(99, _testProperties.getPropertyFlags()); } - public void testWritePropertyListPayload() + public void testWritePropertyListPayload() throws IOException { - ByteBuffer buf = ByteBuffer.allocate(300); - _testProperties.writePropertyListPayload(buf); + _testProperties.writePropertyListPayload(new DataOutputStream(new ByteArrayOutputStream(300))); } public void testPopulatePropertiesFromBuffer() throws Exception { - ByteBuffer buf = ByteBuffer.allocate(300); + DataInputStream buf = new DataInputStream(new ByteArrayInputStream(new byte[300])); _testProperties.populatePropertiesFromBuffer(buf, 99, 99); } diff --git a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index d4691ba097..bb4c9c3884 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -23,14 +23,14 @@ package org.apache.qpid.framing; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQPInvalidClassException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.*; + public class PropertyFieldTableTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class); @@ -441,7 +441,7 @@ public class PropertyFieldTableTest extends TestCase } /** Check that a nested field table parameter correctly encodes and decodes to a byte buffer. */ - public void testNestedFieldTable() + public void testNestedFieldTable() throws IOException { byte[] testBytes = new byte[] { 0, 1, 2, 3, 4, 5 }; @@ -465,14 +465,16 @@ public class PropertyFieldTableTest extends TestCase outerTable.setFieldTable("innerTable", innerTable); // Write the outer table into the buffer. - final ByteBuffer buffer = ByteBuffer.allocate((int) outerTable.getEncodedSize() + 4); - outerTable.writeToBuffer(buffer); - buffer.flip(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + outerTable.writeToBuffer(new DataOutputStream(baos)); + + byte[] data = baos.toByteArray(); // Extract the table back from the buffer again. try { - FieldTable extractedOuterTable = EncodingUtils.readFieldTable(buffer); + FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new DataInputStream(new ByteArrayInputStream(data))); FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable"); @@ -567,7 +569,7 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals("Hello", table.getObject("object-string")); } - public void testwriteBuffer() + public void testwriteBuffer() throws IOException { byte[] bytes = { 99, 98, 97, 96, 95 }; @@ -585,15 +587,17 @@ public class PropertyFieldTableTest extends TestCase table.setString("string", "hello"); table.setString("null-string", null); - final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize() + 4); // FIXME XXX: Is cast a problem? - table.writeToBuffer(buffer); + ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4); + table.writeToBuffer(new DataOutputStream(baos)); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); - buffer.flip(); - long length = buffer.getUnsignedInt(); + long length = dis.readInt() & 0xFFFFFFFFL; - FieldTable table2 = new FieldTable(buffer, length); + FieldTable table2 = new FieldTable(dis, length); Assert.assertEquals((Boolean) true, table2.getBoolean("bool")); Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte")); diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index ac3380e0c0..bde8bc68ad 100644 --- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -24,17 +24,16 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.util.*; import junit.framework.TestCase; import junit.framework.TestResult; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.apache.mina.util.AvailablePortFinder; + public class QpidTestCase extends TestCase { @@ -140,9 +139,85 @@ public class QpidTestCase extends TestCase return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; } + + public static final int MIN_PORT_NUMBER = 1; + public static final int MAX_PORT_NUMBER = 49151; + + + /** + * Gets the next available port starting at a port. + * + * @param fromPort the port to scan for availability + * @throws NoSuchElementException if there are no ports available + */ + protected int getNextAvailable(int fromPort) + { + if ((fromPort < MIN_PORT_NUMBER) || (fromPort > MAX_PORT_NUMBER)) + { + throw new IllegalArgumentException("Invalid start port: " + fromPort); + } + + for (int i = fromPort; i <= MAX_PORT_NUMBER; i++) + { + if (available(i)) { + return i; + } + } + + throw new NoSuchElementException("Could not find an available port above " + fromPort); + } + + /** + * Checks to see if a specific port is available. + * + * @param port the port to check for availability + */ + private boolean available(int port) + { + if ((port < MIN_PORT_NUMBER) || (port > MAX_PORT_NUMBER)) + { + throw new IllegalArgumentException("Invalid start port: " + port); + } + + ServerSocket ss = null; + DatagramSocket ds = null; + try + { + ss = new ServerSocket(port); + ss.setReuseAddress(true); + ds = new DatagramSocket(port); + ds.setReuseAddress(true); + return true; + } + catch (IOException e) + { + } + finally + { + if (ds != null) + { + ds.close(); + } + + if (ss != null) + { + try + { + ss.close(); + } + catch (IOException e) + { + /* should not be thrown */ + } + } + } + + return false; + } + public int findFreePort() { - return AvailablePortFinder.getNextAvailable(10000); + return getNextAvailable(10000); } /** diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 375a326654..3cd7dea2b6 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.transport; -import org.apache.mina.util.AvailablePortFinder; - import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.network.io.IoAcceptor; @@ -58,7 +56,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener { super.setUp(); - port = AvailablePortFinder.getNextAvailable(12000); + port = findFreePort(); } protected void tearDown() throws Exception diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java index 0de1308281..215c6d9931 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -22,16 +22,8 @@ package org.apache.qpid.transport.network.io; import java.net.Socket; import java.nio.ByteBuffer; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.network.security.ssl.SSLReceiver; -import org.apache.qpid.transport.network.security.ssl.SSLSender; import org.apache.qpid.transport.util.Logger; /** @@ -46,13 +38,6 @@ import org.apache.qpid.transport.util.Logger; public final class IoTransport<E> { - static - { - org.apache.mina.common.ByteBuffer.setAllocator - (new org.apache.mina.common.SimpleByteBufferAllocator()); - org.apache.mina.common.ByteBuffer.setUseDirectBuffers - (Boolean.getBoolean("amqj.enableDirectBuffers")); - } private static final Logger log = Logger.get(IoTransport.class); diff --git a/java/common/templates/method/version/MethodBodyClass.vm b/java/common/templates/method/version/MethodBodyClass.vm index a739110d70..ce8a453eeb 100644 --- a/java/common/templates/method/version/MethodBodyClass.vm +++ b/java/common/templates/method/version/MethodBodyClass.vm @@ -46,9 +46,11 @@ package org.apache.qpid.framing.amqp_$version.getMajor()_$version.getMinor(); +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.HashMap; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.*; import org.apache.qpid.AMQException; @@ -56,7 +58,7 @@ public class ${javaClassName} extends AMQMethodBody_$version.getMajor()_$version { private static final AMQMethodBodyInstanceFactory FACTORY_INSTANCE = new AMQMethodBodyInstanceFactory() { - public AMQMethodBody newInstance(ByteBuffer in, long size) throws AMQFrameDecodingException + public AMQMethodBody newInstance(DataInputStream in, long size) throws AMQFrameDecodingException, IOException { return new ${javaClassName}(in); } @@ -84,7 +86,7 @@ public class ${javaClassName} extends AMQMethodBody_$version.getMajor()_$version // Constructor - public ${javaClassName}(ByteBuffer buffer) throws AMQFrameDecodingException + public ${javaClassName}(DataInputStream buffer) throws AMQFrameDecodingException, IOException { #foreach( $field in $method.ConsolidatedFields ) _$field.Name = read$field.getEncodingType()( buffer ); @@ -169,7 +171,7 @@ public class ${javaClassName} extends AMQMethodBody_$version.getMajor()_$version return size; } - public void writeMethodPayload(ByteBuffer buffer) + public void writeMethodPayload(DataOutputStream buffer) throws IOException { #foreach( $field in $method.ConsolidatedFields ) write$field.getEncodingType()( buffer, _$field.Name ); diff --git a/java/common/templates/model/MethodRegistryClass.vm b/java/common/templates/model/MethodRegistryClass.vm index 759e5e4a42..8258175ce7 100644 --- a/java/common/templates/model/MethodRegistryClass.vm +++ b/java/common/templates/model/MethodRegistryClass.vm @@ -30,7 +30,8 @@ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -53,8 +54,8 @@ public abstract class MethodRegistry #end - public abstract AMQMethodBody convertToBody(ByteBuffer in, long size) - throws AMQFrameDecodingException; + public abstract AMQMethodBody convertToBody(DataInputStream in, long size) + throws AMQFrameDecodingException, IOException; public abstract int getMaxClassId(); @@ -101,4 +102,4 @@ public abstract class MethodRegistry public abstract ProtocolVersionMethodConverter getProtocolVersionMethodConverter(); -}
\ No newline at end of file +} diff --git a/java/common/templates/model/version/MethodRegistryClass.vm b/java/common/templates/model/version/MethodRegistryClass.vm index 277605e34b..79553f7748 100644 --- a/java/common/templates/model/version/MethodRegistryClass.vm +++ b/java/common/templates/model/version/MethodRegistryClass.vm @@ -35,32 +35,33 @@ import org.apache.qpid.protocol.AMQConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.IOException; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodRegistry_$version.getMajor()_$version.getMinor() extends MethodRegistry { - + private static final Logger _log = LoggerFactory.getLogger(MethodRegistry.class); - private ProtocolVersionMethodConverter _protocolVersionConverter = new MethodConverter_$version.getMajor()_$version.getMinor()(); - -#set( $specificModel = $model.asSingleVersionModel() ) - - -#set( $maxClassId = $specificModel.getMaximumClassId()+1 ) - private final AMQMethodBodyInstanceFactory[][] _factories = new AMQMethodBodyInstanceFactory[$maxClassId][]; - - public MethodRegistry_$version.getMajor()_$version.getMinor()() - { - this(new ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor())); + private ProtocolVersionMethodConverter _protocolVersionConverter = new MethodConverter_$version.getMajor()_$version.getMinor()(); + +#set( $specificModel = $model.asSingleVersionModel() ) + + +#set( $maxClassId = $specificModel.getMaximumClassId()+1 ) + private final AMQMethodBodyInstanceFactory[][] _factories = new AMQMethodBodyInstanceFactory[$maxClassId][]; + + public MethodRegistry_$version.getMajor()_$version.getMinor()() + { + this(new ProtocolVersion((byte)$version.getMajor(),(byte)$version.getMinor())); } - - public MethodRegistry_$version.getMajor()_$version.getMinor()(ProtocolVersion pv) - { - super(pv); + + public MethodRegistry_$version.getMajor()_$version.getMinor()(ProtocolVersion pv) + { + super(pv); #foreach( $amqpClass in $specificModel.getClassList() ) #set( $amqpClassNameFirstChar = $amqpClass.getName().substring(0,1) ) #set( $amqpClassNameFirstCharU = $amqpClassNameFirstChar.toUpperCase() ) @@ -68,30 +69,30 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth - // Register method body instance factories for the $amqpClassNameUpperCamel class. + // Register method body instance factories for the $amqpClassNameUpperCamel class. -#set( $maxMethodId = $amqpClass.getMaximumMethodId()+1 ) +#set( $maxMethodId = $amqpClass.getMaximumMethodId()+1 ) _factories[$amqpClass.getClassId()] = new AMQMethodBodyInstanceFactory[$maxMethodId]; - + #foreach( $amqpMethod in $amqpClass.getMethodList() ) #set( $amqpMethodNameFirstChar = $amqpMethod.getName().substring(0,1) ) #set( $amqpMethodNameFirstCharU = $amqpMethodNameFirstChar.toUpperCase() ) #set( $amqpMethodNameUpperCamel = "$amqpMethodNameFirstCharU$amqpMethod.getName().substring(1)" ) _factories[$amqpClass.getClassId()][$amqpMethod.getMethodId()] = ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}BodyImpl.getFactory(); -#end - +#end + #end - - - } + + + } - public AMQMethodBody convertToBody(ByteBuffer in, long size) - throws AMQFrameDecodingException + public AMQMethodBody convertToBody(DataInputStream in, long size) + throws AMQFrameDecodingException, IOException { - int classId = in.getUnsignedShort(); - int methodId = in.getUnsignedShort(); - + int classId = in.readUnsignedShort(); + int methodId = in.readUnsignedShort(); + AMQMethodBodyInstanceFactory bodyFactory; try { @@ -137,15 +138,15 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth public int getMaxClassId() - { - return $specificModel.getMaximumClassId(); - } + { + return $specificModel.getMaximumClassId(); + } public int getMaxMethodId(int classId) - { - return _factories[classId].length - 1; - } - + { + return _factories[classId].length - 1; + } + #foreach( $amqpClass in $specificModel.getClassList() ) @@ -153,12 +154,12 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth #set( $amqpClassNameFirstCharU = $amqpClassNameFirstChar.toUpperCase() ) #set( $amqpClassNameUpperCamel = "$amqpClassNameFirstCharU$amqpClass.getName().substring(1)" ) - + #foreach( $amqpMethod in $amqpClass.getMethodList() ) #set( $amqpMethodNameFirstChar = $amqpMethod.getName().substring(0,1) ) #set( $amqpMethodNameFirstCharU = $amqpMethodNameFirstChar.toUpperCase() ) #set( $amqpMethodNameUpperCamel = "$amqpMethodNameFirstCharU$amqpMethod.getName().substring(1)" ) - public ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}Body create${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}Body( + public ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}Body create${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}Body( #foreach( $field in $amqpMethod.FieldList ) #if( $velocityCount == $amqpMethod.getFieldList().size() ) final $field.NativeType $field.Name @@ -166,9 +167,9 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth final $field.NativeType $field.Name, #end #end - ) + ) { - return new ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}BodyImpl( + return new ${amqpClassNameUpperCamel}${amqpMethodNameUpperCamel}BodyImpl( #foreach( $field in $amqpMethod.FieldList ) #if( $velocityCount == $amqpMethod.getFieldList().size() ) $field.Name @@ -176,18 +177,18 @@ public class MethodRegistry_$version.getMajor()_$version.getMinor() extends Meth $field.Name, #end #end - ); + ); } -#end - +#end + #end - - + + public ProtocolVersionMethodConverter getProtocolVersionMethodConverter() { return _protocolVersionConverter; - } + } } |