diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/java/common/src/main/java/org/apache/qpid/framing | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/framing')
53 files changed, 8290 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java new file mode 100644 index 0000000000..fe04155bb8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; + +public interface AMQBody +{ + public byte getFrameType(); + + /** + * Get the size of the body + * @return unsigned short + */ + public abstract int getSize(); + + public void writePayload(ByteBuffer buffer); + + void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java new file mode 100644 index 0000000000..a2fc3a03ef --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * A data block represents something that has a size in bytes and the ability to write itself to a byte + * buffer (similar to a byte array). + */ +public abstract class AMQDataBlock implements EncodableAMQDataBlock +{ + /** + * Get the size of buffer needed to store the byte representation of this + * frame. + * @return unsigned integer + */ + public abstract long getSize(); + + /** + * Writes the datablock to the specified buffer. + * @param buffer + */ + public abstract void writePayload(ByteBuffer buffer); + + public ByteBuffer toByteBuffer() + { + final ByteBuffer buffer = ByteBuffer.allocate((int)getSize()); + + writePayload(buffer); + buffer.flip(); + return buffer; + } + + public java.nio.ByteBuffer toNioByteBuffer() + { + final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize()); + + ByteBuffer buf = ByteBuffer.wrap(buffer); + writePayload(buf); + buffer.flip(); + return buffer; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java new file mode 100644 index 0000000000..228867b2b0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; + +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQDataBlockDecoder +{ + private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY"; + + private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + + static + { + _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); + _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + + Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); + + public AMQDataBlockDecoder() + { } + + public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException + { + final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) + { + return false; + } + + in.position(in.position() + 1 + 2); + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.getInt() & 0xffffffffL; + + return (remainingAfterAttributes >= bodySize); + + } + + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) + throws AMQFrameDecodingException, AMQProtocolVersionException + { + final byte type = in.get(); + + BodyFactory bodyFactory; + if (type == AMQMethodBody.TYPE) + { + bodyFactory = methodBodyFactory; + } + else + { + bodyFactory = _bodiesSupported[type]; + } + + if (bodyFactory == null) + { + throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); + } + + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) + { + throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize, null); + } + + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type, null); + } + + return frame; + } + + public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); + if (bodyFactory == null) + { + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQMethodBodyFactory(protocolSession); + session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); + } + + out.write(createAndPopulateFrame(bodyFactory, in)); + } + + public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException + { + return decodable(msg.buf()); + } + + public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException + { + return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java new file mode 100644 index 0000000000..374644b4f2 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.demux.MessageEncoder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Set; + +public final class AMQDataBlockEncoder implements MessageEncoder +{ + private static final Logger _logger = LoggerFactory.getLogger(AMQDataBlockEncoder.class); + + private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class); + + public AMQDataBlockEncoder() + { } + + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception + { + final AMQDataBlock frame = (AMQDataBlock) message; + + final ByteBuffer buffer = frame.toByteBuffer(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); + } + + out.write(buffer); + } + + public Set getMessageTypes() + { + return _messageTypes; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java new file mode 100644 index 0000000000..02a46f3748 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock +{ + private final int _channel; + + private final AMQBody _bodyFrame; + public static final byte FRAME_END_BYTE = (byte) 0xCE; + + + public AMQFrame(final int channel, final AMQBody bodyFrame) + { + _channel = channel; + _bodyFrame = bodyFrame; + } + + public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException + { + this._channel = channel; + this._bodyFrame = bodyFactory.createBody(in,bodySize); + } + + public long getSize() + { + return 1 + 2 + 4 + _bodyFrame.getSize() + 1; + } + + public static final int getFrameOverhead() + { + return 1 + 2 + 4 + 1; + } + + + public void writePayload(ByteBuffer buffer) + { + buffer.put(_bodyFrame.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, _channel); + EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); + _bodyFrame.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + } + + public final int getChannel() + { + return _channel; + } + + public final AMQBody getBodyFrame() + { + return _bodyFrame; + } + + public String toString() + { + return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); + } + + public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body) + { + buffer.put(body.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body.getSize()); + body.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + + public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2) + { + buffer.put(body1.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); + body1.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body2.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); + body2.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + + public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) + { + buffer.put(body1.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); + body1.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body2.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); + body2.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body3.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body3.getSize()); + body3.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java new file mode 100644 index 0000000000..2373edb478 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; + +/** + * AMQFrameDecodingException indicates that an AMQP frame cannot be decoded because it does not have the correct + * format as defined by the protocol. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents a format error in a protocol frame. + * </table> + */ +public class AMQFrameDecodingException extends AMQException +{ + public AMQFrameDecodingException(AMQConstant errorCode, String message, Throwable cause) + { + super(errorCode, message, cause); + } + + public AMQFrameDecodingException(AMQConstant errorCode, String message) + { + super(errorCode, message, null); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java new file mode 100644 index 0000000000..4763b22290 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; + +public interface AMQMethodBody extends AMQBody +{ + public static final byte TYPE = 1; + + /** AMQP version */ + public byte getMajor(); + + public byte getMinor(); + + + + /** @return unsigned short */ + public int getClazz(); + + /** @return unsigned short */ + public int getMethod(); + + public void writeMethodPayload(ByteBuffer buffer); + + + public int getSize(); + + public void writePayload(ByteBuffer buffer); + + //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; + + //public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; + + public AMQFrame generateFrame(int channelId); + + public String toString(); + + + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException + */ + public AMQChannelException getChannelNotFoundException(int channelId); + + public AMQChannelException getChannelException(AMQConstant code, String message); + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause); + + public AMQConnectionException getConnectionException(AMQConstant code, String message); + + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause); + + + public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java new file mode 100644 index 0000000000..1a7022c11b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQMethodBodyFactory implements BodyFactory +{ + private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); + + private final AMQVersionAwareProtocolSession _protocolSession; + + public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession) + { + _protocolSession = protocolSession; + } + + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + { + return _protocolSession.getMethodRegistry().convertToBody(in, bodySize); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java new file mode 100644 index 0000000000..cd3d721065 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -0,0 +1,258 @@ +package org.apache.qpid.framing; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + +public abstract class AMQMethodBodyImpl implements AMQMethodBody +{ + public static final byte TYPE = 1; + + public AMQMethodBodyImpl() + { + } + + public byte getFrameType() + { + return TYPE; + } + + + /** unsigned short */ + abstract protected int getBodySize(); + + + public AMQFrame generateFrame(int channelId) + { + return new AMQFrame(channelId, this); + } + + /** + * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and + * method ids of the body it resulted from). + */ + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException + */ + public AMQChannelException getChannelNotFoundException(int channelId) + { + return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); + } + + public AMQChannelException getChannelException(AMQConstant code, String message) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null); + } + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + { + return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null); + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException + { + session.methodFrameReceived(channelId, this); + } + + public int getSize() + { + return 2 + 2 + getBodySize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, getClazz()); + EncodingUtils.writeUnsignedShort(buffer, getMethod()); + writeMethodPayload(buffer); + } + + + protected byte readByte(ByteBuffer buffer) + { + return buffer.get(); + } + + protected AMQShortString readAMQShortString(ByteBuffer buffer) + { + return EncodingUtils.readAMQShortString(buffer); + } + + protected int getSizeOf(AMQShortString string) + { + return EncodingUtils.encodedShortStringLength(string); + } + + protected void writeByte(ByteBuffer buffer, byte b) + { + buffer.put(b); + } + + protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string) + { + EncodingUtils.writeShortStringBytes(buffer, string); + } + + protected int readInt(ByteBuffer buffer) + { + return buffer.getInt(); + } + + protected void writeInt(ByteBuffer buffer, int i) + { + buffer.putInt(i); + } + + protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + return EncodingUtils.readFieldTable(buffer); + } + + protected int getSizeOf(FieldTable table) + { + return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeFieldTable(ByteBuffer buffer, FieldTable table) + { + EncodingUtils.writeFieldTableBytes(buffer, table); + } + + protected long readLong(ByteBuffer buffer) + { + return buffer.getLong(); + } + + protected void writeLong(ByteBuffer buffer, long l) + { + buffer.putLong(l); + } + + protected int getSizeOf(byte[] response) + { + return (response == null) ? 4 : response.length + 4; + } + + protected void writeBytes(ByteBuffer buffer, byte[] data) + { + EncodingUtils.writeBytes(buffer,data); + } + + protected byte[] readBytes(ByteBuffer buffer) + { + return EncodingUtils.readBytes(buffer); + } + + protected short readShort(ByteBuffer buffer) + { + return EncodingUtils.readShort(buffer); + } + + protected void writeShort(ByteBuffer buffer, short s) + { + EncodingUtils.writeShort(buffer, s); + } + + protected Content readContent(ByteBuffer buffer) + { + return null; //To change body of created methods use File | Settings | File Templates. + } + + protected int getSizeOf(Content body) + { + return 0; //To change body of created methods use File | Settings | File Templates. + } + + protected void writeContent(ByteBuffer buffer, Content body) + { + //To change body of created methods use File | Settings | File Templates. + } + + protected byte readBitfield(ByteBuffer buffer) + { + return readByte(buffer); //To change body of created methods use File | Settings | File Templates. + } + + protected int readUnsignedShort(ByteBuffer buffer) + { + return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates. + } + + protected void writeBitfield(ByteBuffer buffer, byte bitfield0) + { + buffer.put(bitfield0); + } + + protected void writeUnsignedShort(ByteBuffer buffer, int s) + { + EncodingUtils.writeUnsignedShort(buffer, s); + } + + protected long readUnsignedInteger(ByteBuffer buffer) + { + return buffer.getUnsignedInt(); + } + protected void writeUnsignedInteger(ByteBuffer buffer, long i) + { + EncodingUtils.writeUnsignedInteger(buffer, i); + } + + + protected short readUnsignedByte(ByteBuffer buffer) + { + return buffer.getUnsigned(); + } + + protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte) + { + EncodingUtils.writeUnsignedByte(buffer, unsignedByte); + } + + protected long readTimestamp(ByteBuffer buffer) + { + return EncodingUtils.readTimestamp(buffer); + } + + protected void writeTimestamp(ByteBuffer buffer, long t) + { + EncodingUtils.writeTimestamp(buffer, t); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java new file mode 100644 index 0000000000..0c61d9db3c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + + +public abstract interface AMQMethodBodyInstanceFactory +{ + public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java new file mode 100644 index 0000000000..bfcc38ad60 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + + +public interface AMQMethodFactory +{ + + // Connection Methods + + ConnectionCloseBody createConnectionClose(); + + // Access Methods + + AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write); + + + // Tx Methods + + TxSelectBody createTxSelect(); + + TxCommitBody createTxCommit(); + + TxRollbackBody createTxRollback(); + + // Channel Methods + + ChannelOpenBody createChannelOpen(); + + ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText); + + ChannelFlowBody createChannelFlow(boolean active); + + + // Exchange Methods + + + ExchangeBoundBody createExchangeBound(AMQShortString exchangeName, + AMQShortString queueName, + AMQShortString routingKey); + + ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket); + + + // Queue Methods + + QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket); + + QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket); + + QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket); + + + // Message Methods + + // In different versions of the protocol we change the class used for message transfer + // abstract this out so the appropriate methods are created + AMQMethodBody createRecover(boolean requeue); + + AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket); + + AMQMethodBody createConsumerCancel(AMQShortString consumerTag); + + AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple); + + AMQMethodBody createRejectBody(long deliveryTag, boolean requeue); + + AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java new file mode 100644 index 0000000000..ab09c1de6d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +/** + * AMQProtocolInstanceException indicates that the protocol class is incorrect in a header. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent incorrect protocol class in frame header. + * </table> + * + * @todo Not an AMQP exception as no status code. + */ +public class AMQProtocolClassException extends AMQProtocolHeaderException +{ + public AMQProtocolClassException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java new file mode 100644 index 0000000000..6b819364da --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.qpid.AMQException; + +/** + * AMQProtocolHeaderException indicates a format error in an AMQP frame header. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent format error in frame header. + * </table> + * + * @todo Not an AMQP exception as no status code. + */ +public class AMQProtocolHeaderException extends AMQException +{ + public AMQProtocolHeaderException(String message, Throwable cause) + { + super(null, message, cause); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java new file mode 100644 index 0000000000..3165c373a9 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +/** + * AMQProtocolInstanceException indicates that the protocol instance is incorrect in a header. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent incorrect protocol instance in frame header. + * </table> + * + * @todo Not an AMQP exception as no status code. + */ +public class AMQProtocolInstanceException extends AMQProtocolHeaderException +{ + public AMQProtocolInstanceException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java new file mode 100644 index 0000000000..c9b0973ea6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +/** + * AMQProtocolInstanceException indicates that the client and server differ on expected protocol version in a header. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent incorrect protocol version in frame header. + * </table> + * + * @todo Not an AMQP exception as no status code. + */ +public class AMQProtocolVersionException extends AMQProtocolHeaderException +{ + public AMQProtocolVersionException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java new file mode 100644 index 0000000000..39a9beb9e8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -0,0 +1,779 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.lang.ref.WeakReference; + +/** + * A short string is a representation of an AMQ Short String + * Short strings differ from the Java String class by being limited to on ASCII characters (0-127) + * and thus can be held more effectively in a byte buffer. + * + */ +public final class AMQShortString implements CharSequence, Comparable<AMQShortString> +{ + private static final byte MINUS = (byte)'-'; + private static final byte ZERO = (byte) '0'; + + private final class TokenizerImpl implements AMQShortStringTokenizer + { + private final byte _delim; + private int _count = -1; + private int _pos = 0; + + public TokenizerImpl(final byte delim) + { + _delim = delim; + } + + public int countTokens() + { + if(_count == -1) + { + _count = 1 + AMQShortString.this.occurences(_delim); + } + return _count; + } + + public AMQShortString nextToken() + { + if(_pos <= AMQShortString.this.length()) + { + int nextDelim = AMQShortString.this.indexOf(_delim, _pos); + if(nextDelim == -1) + { + nextDelim = AMQShortString.this.length(); + } + + AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++); + _pos = nextDelim; + return nextToken; + } + else + { + return null; + } + } + + public boolean hasMoreTokens() + { + return _pos <= AMQShortString.this.length(); + } + } + + private AMQShortString substring(final int from, final int to) + { + return new AMQShortString(_data, from+_offset, to+_offset); + } + + + private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap = + new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>() + { + protected Map<AMQShortString, WeakReference<AMQShortString>> initialValue() + { + return new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>(); + }; + }; + + private static final Map<AMQShortString, WeakReference<AMQShortString>> _globalInternMap = + new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>(); + + private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class); + + private final byte[] _data; + private final int _offset; + private int _hashCode; + private String _asString = null; + + private final int _length; + private static final char[] EMPTY_CHAR_ARRAY = new char[0]; + + public static final AMQShortString EMPTY_STRING = new AMQShortString((String)null); + + public AMQShortString(byte[] data) + { + + _data = data.clone(); + _length = data.length; + _offset = 0; + } + + public AMQShortString(byte[] data, int pos) + { + final int size = data[pos++]; + final byte[] dataCopy = new byte[size]; + System.arraycopy(data,pos,dataCopy,0,size); + _length = size; + _data = dataCopy; + _offset = 0; + } + + public AMQShortString(String data) + { + this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray()); + _asString = data; + } + + public AMQShortString(char[] data) + { + if (data == null) + { + throw new NullPointerException("Cannot create AMQShortString with null char[]"); + } + + final int length = data.length; + final byte[] stringBytes = new byte[length]; + int hash = 0; + for (int i = 0; i < length; i++) + { + stringBytes[i] = (byte) (0xFF & data[i]); + hash = (31 * hash) + stringBytes[i]; + } + _hashCode = hash; + _data = stringBytes; + + _length = length; + _offset = 0; + + } + + public AMQShortString(CharSequence charSequence) + { + final int length = charSequence.length(); + final byte[] stringBytes = new byte[length]; + int hash = 0; + for (int i = 0; i < length; i++) + { + stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i))); + hash = (31 * hash) + stringBytes[i]; + + } + + _data = stringBytes; + _hashCode = hash; + _length = length; + _offset = 0; + + } + + private AMQShortString(ByteBuffer data, final int length) + { + if(data.isDirect() || data.isReadOnly()) + { + byte[] dataBytes = new byte[length]; + data.get(dataBytes); + _data = dataBytes; + _offset = 0; + } + else + { + + _data = data.array(); + _offset = data.arrayOffset() + data.position(); + data.skip(length); + + } + _length = length; + + } + + private AMQShortString(final byte[] data, final int from, final int to) + { + _offset = from; + _length = to - from; + _data = data; + } + + public AMQShortString shrink() + { + if(_data.length != _length) + { + byte[] dataBytes = new byte[_length]; + System.arraycopy(_data,_offset,dataBytes,0,_length); + return new AMQShortString(dataBytes,0,_length); + } + else + { + return this; + } + } + + /** + * Get the length of the short string + * @return length of the underlying byte array + */ + public int length() + { + return _length; + } + + public char charAt(int index) + { + + return (char) _data[_offset + index]; + + } + + public CharSequence subSequence(int start, int end) + { + return new CharSubSequence(start, end); + } + + public int writeToByteArray(byte[] encoding, int pos) + { + final int size = length(); + encoding[pos++] = (byte) size; + System.arraycopy(_data,_offset,encoding,pos,size); + return pos+size; + } + + public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos) + { + + + final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos); + if(shortString.length() == 0) + { + return null; + } + else + { + return shortString; + } + } + + public static AMQShortString readFromBuffer(ByteBuffer buffer) + { + final short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + + return new AMQShortString(buffer, length); + } + } + + public byte[] getBytes() + { + if(_offset == 0 && _length == _data.length) + { + return _data.clone(); + } + else + { + byte[] data = new byte[_length]; + System.arraycopy(_data,_offset,data,0,_length); + return data; + } + } + + public void writeToBuffer(ByteBuffer buffer) + { + + final int size = length(); + //buffer.setAutoExpand(true); + buffer.put((byte) size); + buffer.put(_data, _offset, size); + + } + + public boolean endsWith(String s) + { + return endsWith(new AMQShortString(s)); + } + + + public boolean endsWith(AMQShortString otherString) + { + + if (otherString.length() > length()) + { + return false; + } + + + int thisLength = length(); + int otherLength = otherString.length(); + + for (int i = 1; i <= otherLength; i++) + { + if (charAt(thisLength - i) != otherString.charAt(otherLength - i)) + { + return false; + } + } + return true; + } + + public boolean startsWith(String s) + { + return startsWith(new AMQShortString(s)); + } + + public boolean startsWith(AMQShortString otherString) + { + + if (otherString.length() > length()) + { + return false; + } + + for (int i = 0; i < otherString.length(); i++) + { + if (charAt(i) != otherString.charAt(i)) + { + return false; + } + } + + return true; + + } + + public boolean startsWith(CharSequence otherString) + { + if (otherString.length() > length()) + { + return false; + } + + for (int i = 0; i < otherString.length(); i++) + { + if (charAt(i) != otherString.charAt(i)) + { + return false; + } + } + + return true; + } + + + private final class CharSubSequence implements CharSequence + { + private final int _sequenceOffset; + private final int _end; + + public CharSubSequence(final int offset, final int end) + { + _sequenceOffset = offset; + _end = end; + } + + public int length() + { + return _end - _sequenceOffset; + } + + public char charAt(int index) + { + return AMQShortString.this.charAt(index + _sequenceOffset); + } + + public CharSequence subSequence(int start, int end) + { + return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset); + } + } + + public char[] asChars() + { + final int size = length(); + final char[] chars = new char[size]; + + for (int i = 0; i < size; i++) + { + chars[i] = (char) _data[i + _offset]; + } + + return chars; + } + + + public String asString() + { + if (_asString == null) + { + _asString = new String(asChars()); + } + return _asString; + } + + public boolean equals(Object o) + { + + + if(o instanceof AMQShortString) + { + return equals((AMQShortString)o); + } + if(o instanceof CharSequence) + { + return equals((CharSequence)o); + } + + if (o == null) + { + return false; + } + + if (o == this) + { + return true; + } + + + return false; + + } + + public boolean equals(final AMQShortString otherString) + { + if (otherString == this) + { + return true; + } + + if (otherString == null) + { + return false; + } + + final int hashCode = _hashCode; + + final int otherHashCode = otherString._hashCode; + + if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode)) + { + return false; + } + + final int length = _length; + + if(length != otherString._length) + { + return false; + } + + + final byte[] data = _data; + + final byte[] otherData = otherString._data; + + final int offset = _offset; + + final int otherOffset = otherString._offset; + + if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length) + { + return Arrays.equals(data, otherData); + } + else + { + int thisIdx = offset; + int otherIdx = otherOffset; + for(int i = length; i-- != 0; ) + { + if(!(data[thisIdx++] == otherData[otherIdx++])) + { + return false; + } + } + } + + return true; + + } + + public boolean equals(CharSequence s) + { + if(s instanceof AMQShortString) + { + return equals((AMQShortString)s); + } + + if (s == null) + { + return false; + } + + if (s.length() != length()) + { + return false; + } + + for (int i = 0; i < length(); i++) + { + if (charAt(i) != s.charAt(i)) + { + return false; + } + } + + return true; + } + + public int hashCode() + { + int hash = _hashCode; + if (hash == 0) + { + final int size = length(); + + for (int i = 0; i < size; i++) + { + hash = (31 * hash) + _data[i+_offset]; + } + + _hashCode = hash; + } + + return hash; + } + + public void setDirty() + { + _hashCode = 0; + } + + public String toString() + { + return asString(); + } + + public int compareTo(AMQShortString name) + { + if (name == null) + { + return 1; + } + else + { + + if (name.length() < length()) + { + return -name.compareTo(this); + } + + for (int i = 0; i < length(); i++) + { + final byte d = _data[i+_offset]; + final byte n = name._data[i+name._offset]; + if (d < n) + { + return -1; + } + + if (d > n) + { + return 1; + } + } + + return (length() == name.length()) ? 0 : -1; + } + } + + + public AMQShortStringTokenizer tokenize(byte delim) + { + return new TokenizerImpl(delim); + } + + + public AMQShortString intern() + { + + hashCode(); + + Map<AMQShortString, WeakReference<AMQShortString>> localMap = + _localInternMap.get(); + + WeakReference<AMQShortString> ref = localMap.get(this); + AMQShortString internString; + + if(ref != null) + { + internString = ref.get(); + if(internString != null) + { + return internString; + } + } + + + synchronized(_globalInternMap) + { + + ref = _globalInternMap.get(this); + if((ref == null) || ((internString = ref.get()) == null)) + { + internString = shrink(); + ref = new WeakReference(internString); + _globalInternMap.put(internString, ref); + } + + } + localMap.put(internString, ref); + return internString; + + } + + private int occurences(final byte delim) + { + int count = 0; + final int end = _offset + _length; + for(int i = _offset ; i < end ; i++ ) + { + if(_data[i] == delim) + { + count++; + } + } + return count; + } + + private int indexOf(final byte val, final int pos) + { + + for(int i = pos; i < length(); i++) + { + if(_data[_offset+i] == val) + { + return i; + } + } + return -1; + } + + + public static AMQShortString join(final Collection<AMQShortString> terms, + final AMQShortString delim) + { + if(terms.size() == 0) + { + return EMPTY_STRING; + } + + int size = delim.length() * (terms.size() - 1); + for(AMQShortString term : terms) + { + size += term.length(); + } + + byte[] data = new byte[size]; + int pos = 0; + final byte[] delimData = delim._data; + final int delimOffset = delim._offset; + final int delimLength = delim._length; + + + for(AMQShortString term : terms) + { + + if(pos!=0) + { + System.arraycopy(delimData, delimOffset,data,pos, delimLength); + pos+=delimLength; + } + System.arraycopy(term._data,term._offset,data,pos,term._length); + pos+=term._length; + } + + + + return new AMQShortString(data,0,size); + } + + public int toIntValue() + { + int pos = _offset; + int val = 0; + + + boolean isNegative = (_data[pos] == MINUS); + if(isNegative) + { + pos++; + } + + final int end = _length + _offset; + + while(pos < end) + { + int digit = (int) (_data[pos++] - ZERO); + if((digit < 0) || (digit > 9)) + { + throw new NumberFormatException("\""+toString()+"\" is not a valid number"); + } + val = val * 10; + val += digit; + } + if(isNegative) + { + val = val * -1; + } + return val; + } + + public boolean contains(final byte b) + { + final int end = _length + _offset; + for(int i = _offset; i < end; i++) + { + if(_data[i] == b) + { + return true; + } + } + return false; //To change body of created methods use File | Settings | File Templates. + } + + public static AMQShortString valueOf(Object obj) + { + return obj == null ? null : new AMQShortString(String.valueOf(obj)); + } + + + public static void main(String args[]) + { + AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k"); + AMQShortString s2 = s.substring(2, 7); + + AMQShortStringTokenizer t = s2.tokenize((byte) '.'); + while(t.hasMoreTokens()) + { + System.err.println(t.nextToken()); + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java new file mode 100644 index 0000000000..e2db8906a1 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java @@ -0,0 +1,31 @@ +package org.apache.qpid.framing; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public interface AMQShortStringTokenizer +{ + + public int countTokens(); + + public AMQShortString nextToken(); + + boolean hasMoreTokens(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java new file mode 100644 index 0000000000..14fb63da03 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -0,0 +1,795 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import java.math.BigDecimal; + +/** + * AMQType is a type that represents the different possible AMQP field table types. It provides operations for each + * of the types to perform tasks such as calculating the size of an instance of the type, converting types between AMQP + * and Java native types, and reading and writing instances of AMQP types in binary formats to and from byte buffers. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Get the equivalent one byte identifier for a type. + * <tr><td> Calculate the size of an instance of an AMQP parameter type. <td> {@link EncodingUtils} + * <tr><td> Convert an instance of an AMQP parameter into a compatable Java object tagged with its AMQP type. + * <td> {@link AMQTypedValue} + * <tr><td> Write an instance of an AMQP parameter type to a byte buffer. <td> {@link EncodingUtils} + * <tr><td> Read an instance of an AMQP parameter from a byte buffer. <td> {@link EncodingUtils} + * </table> + */ +public enum AMQType +{ + LONG_STRING('S') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedLongStringLength((String) value); + } + + public String toNativeValue(Object value) + { + if (value != null) + { + return value.toString(); + } + else + { + throw new NullPointerException("Cannot convert: null to String."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeLongStringBytes(buffer, (String) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readLongString(buffer); + } + }, + + INTEGER('i') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.unsignedIntegerLength(); + } + + public Long toNativeValue(Object value) + { + if (value instanceof Long) + { + return (Long) value; + } + else if (value instanceof Integer) + { + return ((Integer) value).longValue(); + } + else if (value instanceof Short) + { + return ((Short) value).longValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).longValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Long.valueOf((String) value); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + ") to int."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeUnsignedInteger(buffer, (Long) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readUnsignedInteger(buffer); + } + }, + + DECIMAL('D') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedByteLength() + EncodingUtils.encodedIntegerLength(); + } + + public Object toNativeValue(Object value) + { + if (value instanceof BigDecimal) + { + return (BigDecimal) value; + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to BigDecimal."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + BigDecimal bd = (BigDecimal) value; + + byte places = new Integer(bd.scale()).byteValue(); + + int unscaled = bd.intValue(); + + EncodingUtils.writeByte(buffer, places); + + EncodingUtils.writeInteger(buffer, unscaled); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + byte places = EncodingUtils.readByte(buffer); + + int unscaled = EncodingUtils.readInteger(buffer); + + BigDecimal bd = new BigDecimal(unscaled); + + return bd.setScale(places); + } + }, + + TIMESTAMP('T') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedLongLength(); + } + + public Object toNativeValue(Object value) + { + if (value instanceof Long) + { + return (Long) value; + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to timestamp."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeLong(buffer, (Long) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readLong(buffer); + } + }, + + /** + * Implements the field table type. The native value of a field table type will be an instance of + * {@link FieldTable}, which itself may contain name/value pairs encoded as {@link AMQTypedValue}s. + */ + FIELD_TABLE('F') + { + /** + * Calculates the size of an instance of the type in bytes. + * + * @param value An instance of the type. + * + * @return The size of the instance of the type in bytes. + */ + public int getEncodingSize(Object value) + { + // Ensure that the value is a FieldTable. + if (!(value instanceof FieldTable)) + { + throw new IllegalArgumentException("Value is not a FieldTable."); + } + + FieldTable ftValue = (FieldTable) value; + + // Loop over all name/value pairs adding up size of each. FieldTable itself keeps track of its encoded + // size as entries are added, so no need to loop over all explicitly. + // EncodingUtils calculation of the encoded field table lenth, will include 4 bytes for its 'size' field. + return EncodingUtils.encodedFieldTableLength(ftValue); + } + + /** + * Converts an instance of the type to an equivalent Java native representation. + * + * @param value An instance of the type. + * + * @return An equivalent Java native representation. + */ + public Object toNativeValue(Object value) + { + // Ensure that the value is a FieldTable. + if (!(value instanceof FieldTable)) + { + throw new IllegalArgumentException("Value is not a FieldTable."); + } + + return (FieldTable) value; + } + + /** + * Writes an instance of the type to a specified byte buffer. + * + * @param value An instance of the type. + * @param buffer The byte buffer to write it to. + */ + public void writeValueImpl(Object value, ByteBuffer buffer) + { + // Ensure that the value is a FieldTable. + if (!(value instanceof FieldTable)) + { + throw new IllegalArgumentException("Value is not a FieldTable."); + } + + FieldTable ftValue = (FieldTable) value; + + // Loop over all name/values writing out into buffer. + ftValue.writeToBuffer(buffer); + } + + /** + * Reads an instance of the type from a specified byte buffer. + * + * @param buffer The byte buffer to write it to. + * + * @return An instance of the type. + */ + public Object readValueFromBuffer(ByteBuffer buffer) + { + try + { + // Read size of field table then all name/value pairs. + return EncodingUtils.readFieldTable(buffer); + } + catch (AMQFrameDecodingException e) + { + throw new IllegalArgumentException("Unable to read field table from buffer.", e); + } + } + }, + + VOID('V') + { + public int getEncodingSize(Object value) + { + return 0; + } + + public Object toNativeValue(Object value) + { + if (value == null) + { + return null; + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to null String."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return null; + } + }, + + BINARY('x') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedLongstrLength((byte[]) value); + } + + public Object toNativeValue(Object value) + { + if ((value instanceof byte[]) || (value == null)) + { + return value; + } + else + { + throw new IllegalArgumentException("Value: " + value + " (" + value.getClass().getName() + + ") cannot be converted to byte[]"); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeLongstr(buffer, (byte[]) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readLongstr(buffer); + } + }, + + ASCII_STRING('c') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedLongStringLength((String) value); + } + + public String toNativeValue(Object value) + { + if (value != null) + { + return value.toString(); + } + else + { + throw new NullPointerException("Cannot convert: null to String."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeLongStringBytes(buffer, (String) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readLongString(buffer); + } + }, + + WIDE_STRING('C') + { + public int getEncodingSize(Object value) + { + // FIXME: use proper charset encoder + return EncodingUtils.encodedLongStringLength((String) value); + } + + public String toNativeValue(Object value) + { + if (value != null) + { + return value.toString(); + } + else + { + throw new NullPointerException("Cannot convert: null to String."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeLongStringBytes(buffer, (String) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readLongString(buffer); + } + }, + + BOOLEAN('t') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedBooleanLength(); + } + + public Object toNativeValue(Object value) + { + if (value instanceof Boolean) + { + return (Boolean) value; + } + else if ((value instanceof String) || (value == null)) + { + return Boolean.valueOf((String) value); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to boolean."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeBoolean(buffer, (Boolean) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readBoolean(buffer); + } + }, + + ASCII_CHARACTER('k') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedCharLength(); + } + + public Character toNativeValue(Object value) + { + if (value instanceof Character) + { + return (Character) value; + } + else if (value == null) + { + throw new NullPointerException("Cannot convert null into char"); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to char."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeChar(buffer, (Character) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readChar(buffer); + } + }, + + BYTE('b') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedByteLength(); + } + + public Byte toNativeValue(Object value) + { + if (value instanceof Byte) + { + return (Byte) value; + } + else if ((value instanceof String) || (value == null)) + { + return Byte.valueOf((String) value); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to byte."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeByte(buffer, (Byte) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readByte(buffer); + } + }, + + SHORT('s') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedShortLength(); + } + + public Short toNativeValue(Object value) + { + if (value instanceof Short) + { + return (Short) value; + } + else if (value instanceof Byte) + { + return ((Byte) value).shortValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Short.valueOf((String) value); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to short."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeShort(buffer, (Short) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readShort(buffer); + } + }, + + INT('I') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedIntegerLength(); + } + + public Integer toNativeValue(Object value) + { + if (value instanceof Integer) + { + return (Integer) value; + } + else if (value instanceof Short) + { + return ((Short) value).intValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).intValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Integer.valueOf((String) value); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + ") to int."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeInteger(buffer, (Integer) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readInteger(buffer); + } + }, + + LONG('l') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedLongLength(); + } + + public Object toNativeValue(Object value) + { + if (value instanceof Long) + { + return (Long) value; + } + else if (value instanceof Integer) + { + return ((Integer) value).longValue(); + } + else if (value instanceof Short) + { + return ((Short) value).longValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).longValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Long.valueOf((String) value); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to long."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeLong(buffer, (Long) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readLong(buffer); + } + }, + + FLOAT('f') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedFloatLength(); + } + + public Float toNativeValue(Object value) + { + if (value instanceof Float) + { + return (Float) value; + } + else if ((value instanceof String) || (value == null)) + { + return Float.valueOf((String) value); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to float."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeFloat(buffer, (Float) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readFloat(buffer); + } + }, + + DOUBLE('d') + { + public int getEncodingSize(Object value) + { + return EncodingUtils.encodedDoubleLength(); + } + + public Double toNativeValue(Object value) + { + if (value instanceof Double) + { + return (Double) value; + } + else if (value instanceof Float) + { + return ((Float) value).doubleValue(); + } + else if ((value instanceof String) || (value == null)) + { + return Double.valueOf((String) value); + } + else + { + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to double."); + } + } + + public void writeValueImpl(Object value, ByteBuffer buffer) + { + EncodingUtils.writeDouble(buffer, (Double) value); + } + + public Object readValueFromBuffer(ByteBuffer buffer) + { + return EncodingUtils.readDouble(buffer); + } + }; + + /** Holds the defined one byte identifier for the type. */ + private final byte _identifier; + + /** + * Creates an instance of an AMQP type from its defined one byte identifier. + * + * @param identifier The one byte identifier for the type. + */ + AMQType(char identifier) + { + _identifier = (byte) identifier; + } + + /** + * Extracts the byte identifier for the typ. + * + * @return The byte identifier for the typ. + */ + public final byte identifier() + { + return _identifier; + } + + /** + * Calculates the size of an instance of the type in bytes. + * + * @param value An instance of the type. + * + * @return The size of the instance of the type in bytes. + */ + public abstract int getEncodingSize(Object value); + + /** + * Converts an instance of the type to an equivalent Java native representation. + * + * @param value An instance of the type. + * + * @return An equivalent Java native representation. + */ + public abstract Object toNativeValue(Object value); + + /** + * Converts an instance of the type to an equivalent Java native representation, packaged as an + * {@link AMQTypedValue} tagged with its AMQP type. + * + * @param value An instance of the type. + * + * @return An equivalent Java native representation, tagged with its AMQP type. + */ + public AMQTypedValue asTypedValue(Object value) + { + return new AMQTypedValue(this, toNativeValue(value)); + } + + /** + * Writes an instance of the type to a specified byte buffer, preceded by its one byte identifier. As the type and + * value are both written, this provides a fully encoded description of a parameters type and value. + * + * @param value An instance of the type. + * @param buffer The byte buffer to write it to. + */ + public void writeToBuffer(Object value, ByteBuffer buffer) + { + buffer.put(identifier()); + writeValueImpl(value, buffer); + } + + /** + * Writes an instance of the type to a specified byte buffer. + * + * @param value An instance of the type. + * @param buffer The byte buffer to write it to. + */ + abstract void writeValueImpl(Object value, ByteBuffer buffer); + + /** + * Reads an instance of the type from a specified byte buffer. + * + * @param buffer The byte buffer to write it to. + * + * @return An instance of the type. + */ + abstract Object readValueFromBuffer(ByteBuffer buffer); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java new file mode 100644 index 0000000000..a07fd78c8c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import java.util.HashMap; +import java.util.Map; + +public class AMQTypeMap +{ + public static final Map<Byte, AMQType> _reverseTypeMap = new HashMap<Byte, AMQType>(); + + static + { + for(AMQType type : AMQType.values()) + { + _reverseTypeMap.put(type.identifier(), type); + } + } + + public static AMQType getType(Byte identifier) + { + AMQType result = _reverseTypeMap.get(identifier); + if (result == null) { + throw new IllegalArgumentException + ("no such type code: " + Integer.toHexString(identifier.intValue())); + } + return result; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java new file mode 100644 index 0000000000..647d531476 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -0,0 +1,179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import java.util.Date; +import java.util.Map; +import java.math.BigDecimal; + +/** + * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter + * value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides + * the ability to create such parameters from Java native value and a type tag or to extract the native value and type + * from one. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create a fully typed AMQP value from a native type and a type tag. <td> {@link AMQType} + * <tr><td> Create a fully typed AMQP value from a binary representation in a byte buffer. <td> {@link AMQType} + * <tr><td> Write a fully typed AMQP value to a binary representation in a byte buffer. <td> {@link AMQType} + * <tr><td> Extract the type from a fully typed AMQP value. + * <tr><td> Extract the value from a fully typed AMQP value. + * </table> + */ +public class AMQTypedValue +{ + /** The type of the value. */ + private final AMQType _type; + + /** The Java native representation of the AMQP typed value. */ + private final Object _value; + + public AMQTypedValue(AMQType type, Object value) + { + if (type == null) + { + throw new NullPointerException("Cannot create a typed value with null type"); + } + + _type = type; + _value = type.toNativeValue(value); + } + + private AMQTypedValue(AMQType type, ByteBuffer buffer) + { + _type = type; + _value = type.readValueFromBuffer(buffer); + } + + public AMQType getType() + { + return _type; + } + + public Object getValue() + { + return _value; + } + + public void writeToBuffer(ByteBuffer buffer) + { + _type.writeToBuffer(_value, buffer); + } + + public int getEncodingSize() + { + return _type.getEncodingSize(_value); + } + + public static AMQTypedValue readFromBuffer(ByteBuffer buffer) + { + AMQType type = AMQTypeMap.getType(buffer.get()); + + return new AMQTypedValue(type, buffer); + } + + public String toString() + { + return "[" + getType() + ": " + getValue() + "]"; + } + + + public boolean equals(Object o) + { + if(o instanceof AMQTypedValue) + { + AMQTypedValue other = (AMQTypedValue) o; + return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value)); + } + else + { + return false; + } + } + + public int hashCode() + { + return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode()); + } + + + public static AMQTypedValue toTypedValue(Object val) + { + if(val == null) + { + return AMQType.VOID.asTypedValue(null); + } + + Class klass = val.getClass(); + if(klass == String.class) + { + return AMQType.ASCII_STRING.asTypedValue(val); + } + else if(klass == Character.class) + { + return AMQType.ASCII_CHARACTER.asTypedValue(val); + } + else if(klass == Integer.class) + { + return AMQType.INT.asTypedValue(val); + } + else if(klass == Long.class) + { + return AMQType.LONG.asTypedValue(val); + } + else if(klass == Float.class) + { + return AMQType.FLOAT.asTypedValue(val); + } + else if(klass == Double.class) + { + return AMQType.DOUBLE.asTypedValue(val); + } + else if(klass == Date.class) + { + return AMQType.TIMESTAMP.asTypedValue(val); + } + else if(klass == Byte.class) + { + return AMQType.BYTE.asTypedValue(val); + } + else if(klass == Boolean.class) + { + return AMQType.BOOLEAN.asTypedValue(val); + } + else if(klass == byte[].class) + { + return AMQType.BINARY.asTypedValue(val); + } + else if(klass == BigDecimal.class) + { + return AMQType.DECIMAL.asTypedValue(val); + } + else if(val instanceof Map) + { + return AMQType.FIELD_TABLE.asTypedValue(FieldTable.convertToFieldTable((Map)val)); + } + return null; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java new file mode 100644 index 0000000000..c7d89a9927 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -0,0 +1,838 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BasicContentHeaderProperties implements CommonContentHeaderProperties +{ + //persistent & non-persistent constants, values as per JMS DeliveryMode + public static final int NON_PERSISTENT = 1; + public static final int PERSISTENT = 2; + + private static final Logger _logger = LoggerFactory.getLogger(BasicContentHeaderProperties.class); + + private static final AMQShortString ZERO_STRING = null; + + /** + * We store the encoded form when we decode the content header so that if we need to write it out without modifying + * it we can do so without incurring the expense of reencoding it + */ + private byte[] _encodedForm; + + /** Flag indicating whether the entire content header has been decoded yet */ + private boolean _decoded = true; + + /** + * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker for + * routing in some cases so we can decode that separately. + */ + private boolean _decodedHeaders = true; + + /** + * We have some optimisations for partial decoding for maximum performance. The content type is used by all clients + * to determine the message type + */ + private boolean _decodedContentType = true; + + private AMQShortString _contentType; + + private AMQShortString _encoding; + + private FieldTable _headers; + + private byte _deliveryMode; + + private byte _priority; + + private AMQShortString _correlationId; + + private AMQShortString _replyTo; + + private long _expiration; + + private AMQShortString _messageId; + + private long _timestamp; + + private AMQShortString _type; + + private AMQShortString _userId; + + private AMQShortString _appId; + + private AMQShortString _clusterId; + + private int _propertyFlags = 0; + private static final int CONTENT_TYPE_MASK = 1 << 15; + private static final int ENCONDING_MASK = 1 << 14; + private static final int HEADERS_MASK = 1 << 13; + private static final int DELIVERY_MODE_MASK = 1 << 12; + private static final int PROPRITY_MASK = 1 << 11; + private static final int CORRELATION_ID_MASK = 1 << 10; + private static final int REPLY_TO_MASK = 1 << 9; + private static final int EXPIRATION_MASK = 1 << 8; + private static final int MESSAGE_ID_MASK = 1 << 7; + private static final int TIMESTAMP_MASK = 1 << 6; + private static final int TYPE_MASK = 1 << 5; + private static final int USER_ID_MASK = 1 << 4; + private static final int APPLICATION_ID_MASK = 1 << 3; + private static final int CLUSTER_ID_MASK = 1 << 2; + + + /** + * This is 0_10 specific. We use this property to check if some message properties have been changed. + */ + private boolean _hasBeenUpdated = false; + + public boolean reset() + { + boolean result = _hasBeenUpdated; + _hasBeenUpdated = false; + return result; + } + + public void updated() + { + _hasBeenUpdated = true; + } + + public BasicContentHeaderProperties() + { } + + public int getPropertyListSize() + { + if (_encodedForm != null) + { + return _encodedForm.length; + } + else + { + int size = 0; + + if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0) + { + size += EncodingUtils.encodedShortStringLength(_contentType); + } + + if ((_propertyFlags & ENCONDING_MASK) > 0) + { + size += EncodingUtils.encodedShortStringLength(_encoding); + } + + if ((_propertyFlags & HEADERS_MASK) > 0) + { + size += EncodingUtils.encodedFieldTableLength(_headers); + } + + if ((_propertyFlags & DELIVERY_MODE_MASK) > 0) + { + size += 1; + } + + if ((_propertyFlags & PROPRITY_MASK) > 0) + { + size += 1; + } + + if ((_propertyFlags & CORRELATION_ID_MASK) > 0) + { + size += EncodingUtils.encodedShortStringLength(_correlationId); + } + + if ((_propertyFlags & REPLY_TO_MASK) > 0) + { + size += EncodingUtils.encodedShortStringLength(_replyTo); + } + + if ((_propertyFlags & EXPIRATION_MASK) > 0) + { + if (_expiration == 0L) + { + size += EncodingUtils.encodedShortStringLength(ZERO_STRING); + } + else + { + size += EncodingUtils.encodedShortStringLength(_expiration); + } + } + + if ((_propertyFlags & MESSAGE_ID_MASK) > 0) + { + size += EncodingUtils.encodedShortStringLength(_messageId); + } + + if ((_propertyFlags & TIMESTAMP_MASK) > 0) + { + size += 8; + } + + if ((_propertyFlags & TYPE_MASK) > 0) + { + size += EncodingUtils.encodedShortStringLength(_type); + } + + if ((_propertyFlags & USER_ID_MASK) > 0) + { + size += EncodingUtils.encodedShortStringLength(_userId); + } + + if ((_propertyFlags & APPLICATION_ID_MASK) > 0) + { + size += EncodingUtils.encodedShortStringLength(_appId); + } + + if ((_propertyFlags & CLUSTER_ID_MASK) > 0) + { + size += EncodingUtils.encodedShortStringLength(_clusterId); + } + + return size; + } + } + + private void clearEncodedForm() + { + if (!_decoded && (_encodedForm != null)) + { + // decode(); + } + + _encodedForm = null; + } + + public void setPropertyFlags(int propertyFlags) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags = propertyFlags; + } + + public int getPropertyFlags() + { + return _propertyFlags; + } + + public void writePropertyListPayload(ByteBuffer buffer) + { + if (_encodedForm != null) + { + buffer.put(_encodedForm); + } + else + { + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _contentType); + } + + if ((_propertyFlags & ENCONDING_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _encoding); + } + + if ((_propertyFlags & HEADERS_MASK) != 0) + { + EncodingUtils.writeFieldTableBytes(buffer, _headers); + } + + if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) + { + buffer.put(_deliveryMode); + } + + if ((_propertyFlags & PROPRITY_MASK) != 0) + { + buffer.put(_priority); + } + + if ((_propertyFlags & CORRELATION_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _correlationId); + } + + if ((_propertyFlags & REPLY_TO_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _replyTo); + } + + if ((_propertyFlags & EXPIRATION_MASK) != 0) + { + if (_expiration == 0L) + { + EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING); + } + else + { + EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration)); + } + } + + if ((_propertyFlags & MESSAGE_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _messageId); + } + + if ((_propertyFlags & TIMESTAMP_MASK) != 0) + { + EncodingUtils.writeTimestamp(buffer, _timestamp); + } + + if ((_propertyFlags & TYPE_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _type); + } + + if ((_propertyFlags & USER_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _userId); + } + + if ((_propertyFlags & APPLICATION_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _appId); + } + + if ((_propertyFlags & CLUSTER_ID_MASK) != 0) + { + EncodingUtils.writeShortStringBytes(buffer, _clusterId); + } + } + } + + public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException + { + _propertyFlags = propertyFlags; + + if (_logger.isDebugEnabled()) + { + _logger.debug("Property flags: " + _propertyFlags); + } + + decode(buffer); + /*_encodedForm = new byte[size]; + buffer.get(_encodedForm, 0, size); + _decoded = false; + _decodedHeaders = false; + _decodedContentType = false;*/ + } + + private void decode(ByteBuffer buffer) + { + // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + int pos = buffer.position(); + try + { + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + _contentType = EncodingUtils.readAMQShortString(buffer); + } + + if ((_propertyFlags & ENCONDING_MASK) != 0) + { + _encoding = EncodingUtils.readAMQShortString(buffer); + } + + if ((_propertyFlags & HEADERS_MASK) != 0) + { + _headers = EncodingUtils.readFieldTable(buffer); + } + + if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) + { + _deliveryMode = buffer.get(); + } + + if ((_propertyFlags & PROPRITY_MASK) != 0) + { + _priority = buffer.get(); + } + + if ((_propertyFlags & CORRELATION_ID_MASK) != 0) + { + _correlationId = EncodingUtils.readAMQShortString(buffer); + } + + if ((_propertyFlags & REPLY_TO_MASK) != 0) + { + _replyTo = EncodingUtils.readAMQShortString(buffer); + } + + if ((_propertyFlags & EXPIRATION_MASK) != 0) + { + _expiration = EncodingUtils.readLongAsShortString(buffer); + } + + if ((_propertyFlags & MESSAGE_ID_MASK) != 0) + { + _messageId = EncodingUtils.readAMQShortString(buffer); + } + + if ((_propertyFlags & TIMESTAMP_MASK) != 0) + { + _timestamp = EncodingUtils.readTimestamp(buffer); + } + + if ((_propertyFlags & TYPE_MASK) != 0) + { + _type = EncodingUtils.readAMQShortString(buffer); + } + + if ((_propertyFlags & USER_ID_MASK) != 0) + { + _userId = EncodingUtils.readAMQShortString(buffer); + } + + if ((_propertyFlags & APPLICATION_ID_MASK) != 0) + { + _appId = EncodingUtils.readAMQShortString(buffer); + } + + if ((_propertyFlags & CLUSTER_ID_MASK) != 0) + { + _clusterId = EncodingUtils.readAMQShortString(buffer); + } + } + catch (AMQFrameDecodingException e) + { + throw new RuntimeException("Error in content header data: " + e, e); + } + + final int endPos = buffer.position(); + buffer.position(pos); + final int len = endPos - pos; + _encodedForm = new byte[len]; + final int limit = buffer.limit(); + buffer.limit(endPos); + buffer.get(_encodedForm, 0, len); + buffer.limit(limit); + buffer.position(endPos); + _decoded = true; + } + + private void decodeUpToHeaders() + { + ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + try + { + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + byte length = buffer.get(); + buffer.skip(length); + } + + if ((_propertyFlags & ENCONDING_MASK) != 0) + { + byte length = buffer.get(); + buffer.skip(length); + } + + if ((_propertyFlags & HEADERS_MASK) != 0) + { + _headers = EncodingUtils.readFieldTable(buffer); + + } + + _decodedHeaders = true; + } + catch (AMQFrameDecodingException e) + { + throw new RuntimeException("Error in content header data: " + e, e); + } + } + + private void decodeUpToContentType() + { + ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + _contentType = EncodingUtils.readAMQShortString(buffer); + } + + _decodedContentType = true; + } + + private void decodeIfNecessary() + { + if (!_decoded) + { + // decode(); + } + } + + private void decodeHeadersIfNecessary() + { + if (!_decoded && !_decodedHeaders) + { + decodeUpToHeaders(); + } + } + + private void decodeContentTypeIfNecessary() + { + if (!_decoded && !_decodedContentType) + { + decodeUpToContentType(); + } + } + + public AMQShortString getContentType() + { + decodeContentTypeIfNecessary(); + + return _contentType; + } + + public String getContentTypeAsString() + { + decodeContentTypeIfNecessary(); + + return (_contentType == null) ? null : _contentType.toString(); + } + + public void setContentType(AMQShortString contentType) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= (CONTENT_TYPE_MASK); + _contentType = contentType; + } + + public void setContentType(String contentType) + { + _hasBeenUpdated = true; + setContentType((contentType == null) ? null : new AMQShortString(contentType)); + } + + public String getEncodingAsString() + { + + return (getEncoding() == null) ? null : getEncoding().toString(); + } + + public AMQShortString getEncoding() + { + decodeIfNecessary(); + + return _encoding; + } + + public void setEncoding(String encoding) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= ENCONDING_MASK; + _encoding = (encoding == null) ? null : new AMQShortString(encoding); + } + + public void setEncoding(AMQShortString encoding) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= ENCONDING_MASK; + _encoding = encoding; + } + + public FieldTable getHeaders() + { + decodeHeadersIfNecessary(); + + if (_headers == null) + { + setHeaders(FieldTableFactory.newFieldTable()); + } + + return _headers; + } + + public void setHeaders(FieldTable headers) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= HEADERS_MASK; + _headers = headers; + } + + public byte getDeliveryMode() + { + decodeIfNecessary(); + + return _deliveryMode; + } + + public void setDeliveryMode(byte deliveryMode) + { + clearEncodedForm(); + _propertyFlags |= DELIVERY_MODE_MASK; + _deliveryMode = deliveryMode; + } + + public byte getPriority() + { + decodeIfNecessary(); + + return _priority; + } + + public void setPriority(byte priority) + { + clearEncodedForm(); + _propertyFlags |= PROPRITY_MASK; + _priority = priority; + } + + public AMQShortString getCorrelationId() + { + decodeIfNecessary(); + + return _correlationId; + } + + public String getCorrelationIdAsString() + { + decodeIfNecessary(); + + return (_correlationId == null) ? null : _correlationId.toString(); + } + + public void setCorrelationId(String correlationId) + { + _hasBeenUpdated = true; + setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId)); + } + + public void setCorrelationId(AMQShortString correlationId) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= CORRELATION_ID_MASK; + _correlationId = correlationId; + } + + public String getReplyToAsString() + { + decodeIfNecessary(); + + return (_replyTo == null) ? null : _replyTo.toString(); + } + + public AMQShortString getReplyTo() + { + decodeIfNecessary(); + + return _replyTo; + } + + public void setReplyTo(String replyTo) + { + _hasBeenUpdated = true; + setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo)); + } + + public void setReplyTo(AMQShortString replyTo) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= REPLY_TO_MASK; + _replyTo = replyTo; + } + + public long getExpiration() + { + decodeIfNecessary(); + return _expiration; + } + + public void setExpiration(long expiration) + { + clearEncodedForm(); + _propertyFlags |= EXPIRATION_MASK; + _expiration = expiration; + } + + public AMQShortString getMessageId() + { + decodeIfNecessary(); + + return _messageId; + } + + public String getMessageIdAsString() + { + decodeIfNecessary(); + + return (_messageId == null) ? null : _messageId.toString(); + } + + public void setMessageId(String messageId) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= MESSAGE_ID_MASK; + _messageId = (messageId == null) ? null : new AMQShortString(messageId); + } + + public void setMessageId(AMQShortString messageId) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= MESSAGE_ID_MASK; + _messageId = messageId; + } + + public long getTimestamp() + { + decodeIfNecessary(); + return _timestamp; + } + + public void setTimestamp(long timestamp) + { + clearEncodedForm(); + _propertyFlags |= TIMESTAMP_MASK; + _timestamp = timestamp; + } + + public String getTypeAsString() + { + decodeIfNecessary(); + + return (_type == null) ? null : _type.toString(); + } + + public AMQShortString getType() + { + decodeIfNecessary(); + + return _type; + } + + public void setType(String type) + { + _hasBeenUpdated = true; + setType((type == null) ? null : new AMQShortString(type)); + } + + public void setType(AMQShortString type) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= TYPE_MASK; + _type = type; + } + + public String getUserIdAsString() + { + decodeIfNecessary(); + + return (_userId == null) ? null : _userId.toString(); + } + + public AMQShortString getUserId() + { + decodeIfNecessary(); + + return _userId; + } + + public void setUserId(String userId) + { + setUserId((userId == null) ? null : new AMQShortString(userId)); + } + + public void setUserId(AMQShortString userId) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= USER_ID_MASK; + _userId = userId; + } + + public String getAppIdAsString() + { + decodeIfNecessary(); + + return (_appId == null) ? null : _appId.toString(); + } + + public AMQShortString getAppId() + { + decodeIfNecessary(); + + return _appId; + } + + public void setAppId(String appId) + { + _hasBeenUpdated = true; + setAppId((appId == null) ? null : new AMQShortString(appId)); + } + + public void setAppId(AMQShortString appId) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= APPLICATION_ID_MASK; + _appId = appId; + _hasBeenUpdated = true; + } + + public String getClusterIdAsString() + { + _hasBeenUpdated = true; + decodeIfNecessary(); + return (_clusterId == null) ? null : _clusterId.toString(); + } + + public AMQShortString getClusterId() + { + _hasBeenUpdated = true; + decodeIfNecessary(); + return _clusterId; + } + + public void setClusterId(String clusterId) + { + _hasBeenUpdated = true; + setClusterId((clusterId == null) ? null : new AMQShortString(clusterId)); + } + + public void setClusterId(AMQShortString clusterId) + { + _hasBeenUpdated = true; + clearEncodedForm(); + _propertyFlags |= CLUSTER_ID_MASK; + _clusterId = clusterId; + } + + public String toString() + { + return "reply-to = " + _replyTo + ",propertyFlags = " + _propertyFlags + ",ApplicationID = " + _appId + + ",ClusterID = " + _clusterId + ",UserId = " + _userId + ",JMSMessageID = " + _messageId + + ",JMSCorrelationID = " + _correlationId + ",JMSDeliveryMode = " + _deliveryMode + ",JMSExpiration = " + + _expiration + ",JMSPriority = " + _priority + ",JMSTimestamp = " + _timestamp + ",JMSType = " + _type; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java new file mode 100644 index 0000000000..59646577e1 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface. + */ +public interface BodyFactory +{ + AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java new file mode 100644 index 0000000000..7162c37062 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing; + +public interface CommonContentHeaderProperties extends ContentHeaderProperties +{ + AMQShortString getContentType(); + + void setContentType(AMQShortString contentType); + + FieldTable getHeaders(); + + void setHeaders(FieldTable headers); + + byte getDeliveryMode(); + + void setDeliveryMode(byte deliveryMode); + + byte getPriority(); + + void setPriority(byte priority); + + AMQShortString getCorrelationId(); + + void setCorrelationId(AMQShortString correlationId); + + AMQShortString getReplyTo(); + + void setReplyTo(AMQShortString replyTo); + + long getExpiration(); + + void setExpiration(long expiration); + + AMQShortString getMessageId(); + + void setMessageId(AMQShortString messageId); + + long getTimestamp(); + + void setTimestamp(long timestamp); + + AMQShortString getType(); + + void setType(AMQShortString type); + + AMQShortString getUserId(); + + void setUserId(AMQShortString userId); + + AMQShortString getAppId(); + + void setAppId(AMQShortString appId); + + AMQShortString getClusterId(); + + void setClusterId(AMQShortString clusterId); + + AMQShortString getEncoding(); + + void setEncoding(AMQShortString encoding); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java new file mode 100644 index 0000000000..94030f383e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock +{ + + private AMQDataBlock[] _blocks; + + public CompositeAMQDataBlock(AMQDataBlock[] blocks) + { + _blocks = blocks; + } + + + public AMQDataBlock[] getBlocks() + { + return _blocks; + } + + + public long getSize() + { + long frameSize = 0; + for (int i = 0; i < _blocks.length; i++) + { + frameSize += _blocks[i].getSize(); + } + return frameSize; + } + + public void writePayload(ByteBuffer buffer) + { + for (int i = 0; i < _blocks.length; i++) + { + _blocks[i].writePayload(buffer); + } + } + + public String toString() + { + if (_blocks == null) + { + return "No blocks contained in composite frame"; + } + else + { + StringBuilder buf = new StringBuilder(this.getClass().getName()); + buf.append("{"); + for (int i = 0 ; i < _blocks.length; i++) + { + buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]"); + } + buf.append("}"); + return buf.toString(); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java new file mode 100644 index 0000000000..e5feeec2a4 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface Content +{ + // TODO: New Content class required for AMQP 0-9. +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java new file mode 100644 index 0000000000..9d39f8aa86 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; + +public class ContentBody implements AMQBody +{ + public static final byte TYPE = 3; + + public ByteBuffer payload; + + public ContentBody() + { + } + + public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if (size > 0) + { + payload = buffer.slice(); + payload.limit((int) size); + buffer.skip((int) size); + } + + } + + + public ContentBody(ByteBuffer payload) + { + this.payload = payload; + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return (payload == null ? 0 : payload.limit()); + } + + public void writePayload(ByteBuffer buffer) + { + if (payload != null) + { + if(payload.isDirect() || payload.isReadOnly()) + { + ByteBuffer copy = payload.duplicate(); + buffer.put(copy.rewind()); + } + else + { + buffer.put(payload.array(),payload.arrayOffset(),payload.limit()); + } + } + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.contentBodyReceived(channelId, this); + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if (size > 0) + { + payload = buffer.slice(); + payload.limit((int) size); + buffer.skip((int) size); + } + + } + + public void reduceBufferToFit() + { + if (payload != null && (payload.remaining() < payload.capacity() / 2)) + { + int size = payload.limit(); + ByteBuffer newPayload = ByteBuffer.allocate(size); + + newPayload.put(payload); + newPayload.flip(); + + //reduce reference count on payload + payload.release(); + + payload = newPayload; + } + } + + + + public static AMQFrame createAMQFrame(int channelId, ContentBody body) + { + final AMQFrame frame = new AMQFrame(channelId, body); + return frame; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java new file mode 100644 index 0000000000..c42995d148 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContentBodyFactory implements BodyFactory +{ + private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); + + private static final ContentBodyFactory _instance = new ContentBodyFactory(); + + public static ContentBodyFactory getInstance() + { + return _instance; + } + + private ContentBodyFactory() + { + _log.debug("Creating content body factory"); + } + + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + { + return new ContentBody(in, bodySize); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java new file mode 100644 index 0000000000..30db3b8be7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; + +public class ContentHeaderBody implements AMQBody +{ + public static final byte TYPE = 2; + + public int classId; + + public int weight; + + /** unsigned long but java can't handle that anyway when allocating byte array */ + public long bodySize; + + /** must never be null */ + private ContentHeaderProperties properties; + + public ContentHeaderBody() + { + } + + public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + classId = buffer.getUnsignedShort(); + weight = buffer.getUnsignedShort(); + bodySize = buffer.getLong(); + int propertyFlags = buffer.getUnsignedShort(); + ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); + properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + + } + + + public ContentHeaderBody(ContentHeaderProperties props, int classId) + { + properties = props; + this.classId = classId; + } + + public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize) + { + this(props, classId); + this.weight = weight; + this.bodySize = bodySize; + } + + public byte getFrameType() + { + return TYPE; + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException + { + classId = buffer.getUnsignedShort(); + weight = buffer.getUnsignedShort(); + bodySize = buffer.getLong(); + int propertyFlags = buffer.getUnsignedShort(); + ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); + properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + } + + /** + * Helper method that is used currently by the persistence layer (by BDB at the moment). + * @param buffer + * @param size + * @return + * @throws AMQFrameDecodingException + */ + public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) + throws AMQFrameDecodingException, AMQProtocolVersionException + { + ContentHeaderBody body = new ContentHeaderBody(buffer, size); + + return body; + } + + public int getSize() + { + return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); + } + + public void writePayload(ByteBuffer buffer) + { + EncodingUtils.writeUnsignedShort(buffer, classId); + EncodingUtils.writeUnsignedShort(buffer, weight); + buffer.putLong(bodySize); + EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); + properties.writePropertyListPayload(buffer); + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.contentHeaderReceived(channelId, this); + } + + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, + long bodySize) + { + return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize)); + } + + public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body) + { + return new AMQFrame(channelId, body); + } + + public ContentHeaderProperties getProperties() + { + return properties; + } + + public void setProperties(ContentHeaderProperties props) + { + properties = props; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java new file mode 100644 index 0000000000..8d5e2f9fb4 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContentHeaderBodyFactory implements BodyFactory +{ + private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); + + private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory(); + + public static ContentHeaderBodyFactory getInstance() + { + return _instance; + } + + private ContentHeaderBodyFactory() + { + _log.debug("Creating content header body factory"); + } + + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + { + // all content headers are the same - it is only the properties that differ. + // the content header body further delegates construction of properties + return new ContentHeaderBody(in, bodySize); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java new file mode 100644 index 0000000000..7ef538cfdc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/** + * There will be an implementation of this interface for each content type. All content types have associated + * header properties and this provides a way to encode and decode them. + */ +public interface ContentHeaderProperties +{ + /** + * Writes the property list to the buffer, in a suitably encoded form. + * @param buffer The buffer to write to + */ + void writePropertyListPayload(ByteBuffer buffer); + + /** + * Populates the properties from buffer. + * @param buffer The buffer to read from. + * @param propertyFlags he property flags. + * @throws AMQFrameDecodingException when the buffer does not contain valid data + */ + void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) + throws AMQFrameDecodingException; + + /** + * @return the size of the encoded property list in bytes. + */ + int getPropertyListSize(); + + /** + * Gets the property flags. Property flags indicate which properties are set in the list. The + * position and meaning of each flag is defined in the protocol specification for the particular + * content type with which these properties are associated. + * @return flags + */ + int getPropertyFlags(); + + void updated(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java new file mode 100644 index 0000000000..46189b63d7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; + +public class ContentHeaderPropertiesFactory +{ + private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); + + public static ContentHeaderPropertiesFactory getInstance() + { + return _instance; + } + + private ContentHeaderPropertiesFactory() + { + } + + public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, + ByteBuffer buffer, int size) + throws AMQFrameDecodingException + { + ContentHeaderProperties properties; + // AMQP version change: "Hardwired" version to major=8, minor=0 + // TODO: Change so that the actual version is obtained from + // the ProtocolInitiation object for this session. + if (classId == BasicConsumeBodyImpl.CLASS_ID) + { + properties = new BasicContentHeaderProperties(); + } + else + { + throw new AMQFrameDecodingException(null, "Unsupport content header class id: " + classId, null); + } + properties.populatePropertiesFromBuffer(buffer, propertyFlags, size); + return properties; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java new file mode 100644 index 0000000000..f6795ff200 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java @@ -0,0 +1,50 @@ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public abstract class DeferredDataBlock extends AMQDataBlock +{ + private AMQDataBlock _underlyingDataBlock; + + + public long getSize() + { + if(_underlyingDataBlock == null) + { + _underlyingDataBlock = createAMQDataBlock(); + } + return _underlyingDataBlock.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + if(_underlyingDataBlock == null) + { + _underlyingDataBlock = createAMQDataBlock(); + } + _underlyingDataBlock.writePayload(buffer); + } + + abstract protected AMQDataBlock createAMQDataBlock(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java new file mode 100644 index 0000000000..9cf96e698c --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +/** + * Marker interface to indicate to MINA that a data block should be encoded with the + * single encoder/decoder that we have defined. + * + * Note that due to a bug in MINA all classes must directly implement this interface, even if + * a superclass implements it. + * TODO: fix MINA so that this is not necessary + * + */ +public interface EncodableAMQDataBlock +{ + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java new file mode 100644 index 0000000000..6425f8c591 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -0,0 +1,1033 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +public class EncodingUtils +{ + private static final Logger _logger = LoggerFactory.getLogger(EncodingUtils.class); + + private static final String STRING_ENCODING = "iso8859-15"; + + private static final Charset _charset = Charset.forName("iso8859-15"); + + public static final int SIZEOF_UNSIGNED_SHORT = 2; + public static final int SIZEOF_UNSIGNED_INT = 4; + private static final boolean[] ALL_FALSE_ARRAY = new boolean[8]; + + public static int encodedShortStringLength(String s) + { + if (s == null) + { + return 1; + } + else + { + return (short) (1 + s.length()); + } + } + + public static int encodedShortStringLength(short s) + { + if (s == 0) + { + return 1 + 1; + } + + int len = 0; + if (s < 0) + { + len = 1; + // sloppy - doesn't work of Integer.MIN_VALUE + s = (short) -s; + } + + if (s > 9999) + { + return 1 + 5; + } + else if (s > 999) + { + return 1 + 4; + } + else if (s > 99) + { + return 1 + 3; + } + else if (s > 9) + { + return 1 + 2; + } + else + { + return 1 + 1; + } + + } + + public static int encodedShortStringLength(int i) + { + if (i == 0) + { + return 1 + 1; + } + + int len = 0; + if (i < 0) + { + len = 1; + // sloppy - doesn't work of Integer.MIN_VALUE + i = -i; + } + + // range is now 1 - 2147483647 + if (i < Short.MAX_VALUE) + { + return len + encodedShortStringLength((short) i); + } + else if (i > 999999) + { + return len + 6 + encodedShortStringLength((short) (i / 1000000)); + } + else // if (i > 99999) + { + return len + 5 + encodedShortStringLength((short) (i / 100000)); + } + + } + + public static int encodedShortStringLength(long l) + { + if (l == 0) + { + return 1 + 1; + } + + int len = 0; + if (l < 0) + { + len = 1; + // sloppy - doesn't work of Long.MIN_VALUE + l = -l; + } + + if (l < Integer.MAX_VALUE) + { + return len + encodedShortStringLength((int) l); + } + else if (l > 9999999999L) + { + return len + 10 + encodedShortStringLength((int) (l / 10000000000L)); + } + else + { + return len + 1 + encodedShortStringLength((int) (l / 10L)); + } + + } + + public static int encodedShortStringLength(AMQShortString s) + { + if (s == null) + { + return 1; + } + else + { + return (1 + s.length()); + } + } + + public static int encodedLongStringLength(String s) + { + if (s == null) + { + return 4; + } + else + { + return 4 + s.length(); + } + } + + public static int encodedLongStringLength(char[] s) + { + if (s == null) + { + return 4; + } + else + { + return 4 + s.length; + } + } + + public static int encodedLongstrLength(byte[] bytes) + { + if (bytes == null) + { + return 4; + } + else + { + return 4 + bytes.length; + } + } + + public static int encodedFieldTableLength(FieldTable table) + { + if (table == null) + { + // length is encoded as 4 octets + return 4; + } + else + { + // length of the table plus 4 octets for the length + return (int) table.getEncodedSize() + 4; + } + } + + public static int encodedContentLength(Content table) + { + // TODO: New Content class required for AMQP 0-9. + return 0; + } + + public static void writeShortStringBytes(ByteBuffer buffer, String s) + { + if (s != null) + { + byte[] encodedString = new byte[s.length()]; + char[] cha = s.toCharArray(); + for (int i = 0; i < cha.length; i++) + { + encodedString[i] = (byte) cha[i]; + } + + // TODO: check length fits in an unsigned byte + writeUnsignedByte(buffer, (short)encodedString.length); + buffer.put(encodedString); + + + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + } + + public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s) + { + if (s != null) + { + + s.writeToBuffer(buffer); + } + else + { + // really writing out unsigned byte + buffer.put((byte) 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, String s) + { + assert (s == null) || (s.length() <= 0xFFFE); + if (s != null) + { + int len = s.length(); + writeUnsignedInteger(buffer, s.length()); + byte[] encodedString = new byte[len]; + char[] cha = s.toCharArray(); + for (int i = 0; i < cha.length; i++) + { + encodedString[i] = (byte) cha[i]; + } + + buffer.put(encodedString); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, char[] s) + { + assert (s == null) || (s.length <= 0xFFFE); + if (s != null) + { + int len = s.length; + writeUnsignedInteger(buffer, s.length); + byte[] encodedString = new byte[len]; + for (int i = 0; i < s.length; i++) + { + encodedString[i] = (byte) s[i]; + } + + buffer.put(encodedString); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes) + { + assert (bytes == null) || (bytes.length <= 0xFFFE); + if (bytes != null) + { + writeUnsignedInteger(buffer, bytes.length); + buffer.put(bytes); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeUnsignedByte(ByteBuffer buffer, short b) + { + byte bv = (byte) b; + buffer.put(bv); + } + + public static void writeUnsignedShort(ByteBuffer buffer, int s) + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (s < Short.MAX_VALUE) + { + buffer.putShort((short) s); + } + else + { + short sv = (short) s; + buffer.put((byte) (0xFF & (sv >> 8))); + buffer.put((byte) (0xFF & sv)); + } + } + + public static int unsignedIntegerLength() + { + return 4; + } + + public static void writeUnsignedInteger(ByteBuffer buffer, long l) + { + // TODO: Is this comparison safe? Do I need to cast RHS to long? + if (l < Integer.MAX_VALUE) + { + buffer.putInt((int) l); + } + else + { + int iv = (int) l; + + // FIXME: This *may* go faster if we build this into a local 4-byte array and then + // put the array in a single call. + buffer.put((byte) (0xFF & (iv >> 24))); + buffer.put((byte) (0xFF & (iv >> 16))); + buffer.put((byte) (0xFF & (iv >> 8))); + buffer.put((byte) (0xFF & iv)); + } + } + + public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) + { + if (table != null) + { + table.writeToBuffer(buffer); + } + else + { + EncodingUtils.writeUnsignedInteger(buffer, 0); + } + } + + public static void writeContentBytes(ByteBuffer buffer, Content content) + { + // TODO: New Content class required for AMQP 0-9. + } + + public static void writeBooleans(ByteBuffer buffer, boolean[] values) + { + byte packedValue = 0; + for (int i = 0; i < values.length; i++) + { + if (values[i]) + { + packedValue = (byte) (packedValue | (1 << i)); + } + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value) + { + + buffer.put(value ? (byte) 1 : (byte) 0); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte) (1 << 1)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte) (1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte) (1 << 2)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte) (1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte) (1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte) (1 << 3)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte) (1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte) (1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte) (1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte) (1 << 4)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte) (1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte) (1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte) (1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte) (1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte) (1 << 5)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5, boolean value6) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte) (1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte) (1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte) (1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte) (1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte) (1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte) (1 << 6)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3, + boolean value4, boolean value5, boolean value6, boolean value7) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte) (1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte) (1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte) (1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte) (1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte) (1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte) (1 << 6)); + } + + if (value7) + { + packedValue = (byte) (packedValue | (byte) (1 << 7)); + } + + buffer.put(packedValue); + } + + /** + * This is used for writing longstrs. + * + * @param buffer + * @param data + */ + public static void writeLongstr(ByteBuffer buffer, byte[] data) + { + if (data != null) + { + writeUnsignedInteger(buffer, data.length); + buffer.put(data); + } + else + { + writeUnsignedInteger(buffer, 0); + } + } + + public static void writeTimestamp(ByteBuffer buffer, long timestamp) + { + writeLong(buffer, timestamp); + } + + public static boolean[] readBooleans(ByteBuffer buffer) + { + final byte packedValue = buffer.get(); + if (packedValue == 0) + { + return ALL_FALSE_ARRAY; + } + + final boolean[] result = new boolean[8]; + + result[0] = ((packedValue & 1) != 0); + result[1] = ((packedValue & (1 << 1)) != 0); + result[2] = ((packedValue & (1 << 2)) != 0); + result[3] = ((packedValue & (1 << 3)) != 0); + if ((packedValue & 0xF0) == 0) + { + result[0] = ((packedValue & 1) != 0); + } + + result[4] = ((packedValue & (1 << 4)) != 0); + result[5] = ((packedValue & (1 << 5)) != 0); + result[6] = ((packedValue & (1 << 6)) != 0); + result[7] = ((packedValue & (1 << 7)) != 0); + + return result; + } + + public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + return FieldTableFactory.newFieldTable(buffer, length); + } + } + + public static Content readContent(ByteBuffer buffer) throws AMQFrameDecodingException + { + // TODO: New Content class required for AMQP 0-9. + return null; + } + + public static AMQShortString readAMQShortString(ByteBuffer buffer) + { + return AMQShortString.readFromBuffer(buffer); + + } + + public static String readShortString(ByteBuffer buffer) + { + short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + // this may seem rather odd to declare two array but testing has shown + // that constructing a string from a byte array is 5 (five) times slower + // than constructing one from a char array. + // this approach here is valid since we know that all the chars are + // ASCII (0-127) + byte[] stringBytes = new byte[length]; + buffer.get(stringBytes, 0, length); + char[] stringChars = new char[length]; + for (int i = 0; i < stringChars.length; i++) + { + stringChars[i] = (char) stringBytes[i]; + } + + return new String(stringChars); + } + } + + public static String readLongString(ByteBuffer buffer) + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return ""; + } + else + { + // this may seem rather odd to declare two array but testing has shown + // that constructing a string from a byte array is 5 (five) times slower + // than constructing one from a char array. + // this approach here is valid since we know that all the chars are + // ASCII (0-127) + byte[] stringBytes = new byte[(int) length]; + buffer.get(stringBytes, 0, (int) length); + char[] stringChars = new char[(int) length]; + for (int i = 0; i < stringChars.length; i++) + { + stringChars[i] = (char) stringBytes[i]; + } + + return new String(stringChars); + } + } + + public static byte[] readLongstr(ByteBuffer buffer) + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + byte[] result = new byte[(int) length]; + buffer.get(result); + + return result; + } + } + + public static long readTimestamp(ByteBuffer buffer) + { + // Discard msb from AMQ timestamp + // buffer.getUnsignedInt(); + return buffer.getLong(); + } + + static byte[] hexToByteArray(String id) + { + // Should check param for null, long enough for this check, upper-case and trailing char + String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x + + int len = s.length(); + int byte_len = len / 2; + byte[] b = new byte[byte_len]; + + for (int i = 0; i < byte_len; i++) + { + // fixme: refine these repetitive subscript calcs. + int ch = i * 2; + + byte b1 = Byte.parseByte(s.substring(ch, ch + 1), 16); + byte b2 = Byte.parseByte(s.substring(ch + 1, ch + 2), 16); + + b[i] = (byte) ((b1 * 16) + b2); + } + + return (b); + } + + public static char[] convertToHexCharArray(byte[] from) + { + int length = from.length; + char[] result_buff = new char[(length * 2) + 2]; + + result_buff[0] = '0'; + result_buff[1] = 'x'; + + int bite; + int dest = 2; + + for (int i = 0; i < length; i++) + { + bite = from[i]; + + if (bite < 0) + { + bite += 256; + } + + result_buff[dest++] = hex_chars[bite >> 4]; + result_buff[dest++] = hex_chars[bite & 0x0f]; + } + + return (result_buff); + } + + public static String convertToHexString(byte[] from) + { + return (new String(convertToHexCharArray(from))); + } + + public static String convertToHexString(ByteBuffer bb) + { + int size = bb.limit(); + + byte[] from = new byte[size]; + + // Is this not the same. + // bb.get(from, 0, length); + for (int i = 0; i < size; i++) + { + from[i] = bb.get(i); + } + + return (new String(convertToHexCharArray(from))); + } + + private static char[] hex_chars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; + + // **** new methods + + // AMQP_BOOLEAN_PROPERTY_PREFIX + + public static void writeBoolean(ByteBuffer buffer, Boolean aBoolean) + { + buffer.put((byte) (aBoolean ? 1 : 0)); + } + + public static boolean readBoolean(ByteBuffer buffer) + { + byte packedValue = buffer.get(); + + return (packedValue == 1); + } + + public static int encodedBooleanLength() + { + return 1; + } + + // AMQP_BYTE_PROPERTY_PREFIX + public static void writeByte(ByteBuffer buffer, Byte aByte) + { + buffer.put(aByte); + } + + public static byte readByte(ByteBuffer buffer) + { + return buffer.get(); + } + + public static int encodedByteLength() + { + return 1; + } + + // AMQP_SHORT_PROPERTY_PREFIX + public static void writeShort(ByteBuffer buffer, Short aShort) + { + buffer.putShort(aShort); + } + + public static short readShort(ByteBuffer buffer) + { + return buffer.getShort(); + } + + public static int encodedShortLength() + { + return 2; + } + + // INTEGER_PROPERTY_PREFIX + public static void writeInteger(ByteBuffer buffer, Integer aInteger) + { + buffer.putInt(aInteger); + } + + public static int readInteger(ByteBuffer buffer) + { + return buffer.getInt(); + } + + public static int encodedIntegerLength() + { + return 4; + } + + // AMQP_LONG_PROPERTY_PREFIX + public static void writeLong(ByteBuffer buffer, Long aLong) + { + buffer.putLong(aLong); + } + + public static long readLong(ByteBuffer buffer) + { + return buffer.getLong(); + } + + public static int encodedLongLength() + { + return 8; + } + + // Float_PROPERTY_PREFIX + public static void writeFloat(ByteBuffer buffer, Float aFloat) + { + buffer.putFloat(aFloat); + } + + public static float readFloat(ByteBuffer buffer) + { + return buffer.getFloat(); + } + + public static int encodedFloatLength() + { + return 4; + } + + // Double_PROPERTY_PREFIX + public static void writeDouble(ByteBuffer buffer, Double aDouble) + { + buffer.putDouble(aDouble); + } + + public static double readDouble(ByteBuffer buffer) + { + return buffer.getDouble(); + } + + public static int encodedDoubleLength() + { + return 8; + } + + public static byte[] readBytes(ByteBuffer buffer) + { + long length = buffer.getUnsignedInt(); + if (length == 0) + { + return null; + } + else + { + byte[] dataBytes = new byte[(int)length]; + buffer.get(dataBytes, 0, (int)length); + + return dataBytes; + } + } + + public static void writeBytes(ByteBuffer buffer, byte[] data) + { + if (data != null) + { + // TODO: check length fits in an unsigned byte + writeUnsignedInteger(buffer, (long)data.length); + buffer.put(data); + } + else + { + // really writing out unsigned byte + //buffer.put((byte) 0); + writeUnsignedInteger(buffer, 0L); + } + } + + // CHAR_PROPERTY + public static int encodedCharLength() + { + return encodedByteLength(); + } + + public static char readChar(ByteBuffer buffer) + { + // This is valid as we know that the Character is ASCII 0..127 + return (char) buffer.get(); + } + + public static void writeChar(ByteBuffer buffer, char character) + { + // This is valid as we know that the Character is ASCII 0..127 + writeByte(buffer, (byte) character); + } + + public static long readLongAsShortString(ByteBuffer buffer) + { + short length = buffer.getUnsigned(); + short pos = 0; + if (length == 0) + { + return 0L; + } + + byte digit = buffer.get(); + boolean isNegative; + long result = 0; + if (digit == (byte) '-') + { + isNegative = true; + pos++; + digit = buffer.get(); + } + else + { + isNegative = false; + } + + result = digit - (byte) '0'; + pos++; + + while (pos < length) + { + pos++; + digit = buffer.get(); + result = (result << 3) + (result << 1); + result += digit - (byte) '0'; + } + + return result; + } + + public static long readUnsignedInteger(ByteBuffer buffer) + { + long l = 0xFF & buffer.get(); + l <<= 8; + l = l | (0xFF & buffer.get()); + l <<= 8; + l = l | (0xFF & buffer.get()); + l <<= 8; + l = l | (0xFF & buffer.get()); + + return l; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java new file mode 100644 index 0000000000..22205d49f8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -0,0 +1,1246 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.AMQPInvalidClassException; + +import java.math.BigDecimal; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +// extends FieldTable +public class FieldTable +{ + private static final Logger _logger = LoggerFactory.getLogger(FieldTable.class); + private static final String STRICT_AMQP = "STRICT_AMQP"; + private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false")); + + private ByteBuffer _encodedForm; + private LinkedHashMap<AMQShortString, AMQTypedValue> _properties; + private long _encodedSize; + private static final int INITIAL_HASHMAP_CAPACITY = 16; + private static final int INITIAL_ENCODED_FORM_SIZE = 256; + + public FieldTable() + { + super(); + // _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE); + // _encodedForm.setAutoExpand(true); + // _encodedForm.limit(0); + } + + /** + * Construct a new field table. + * + * @param buffer the buffer from which to read data. The length byte must be read already + * @param length the length of the field table. Must be > 0. + */ + public FieldTable(ByteBuffer buffer, long length) + { + this(); + ByteBuffer encodedForm = buffer.slice(); + encodedForm.limit((int) length); + _encodedForm = ByteBuffer.allocate((int)length); + _encodedForm.put(encodedForm); + _encodedForm.flip(); + _encodedSize = length; + buffer.skip((int) length); + } + + public AMQTypedValue getProperty(AMQShortString string) + { + checkPropertyName(string); + + synchronized (this) + { + if (_properties == null) + { + if (_encodedForm == null) + { + return null; + } + else + { + populateFromBuffer(); + } + } + } + + if (_properties == null) + { + return null; + } + else + { + return _properties.get(string); + } + } + + private void populateFromBuffer() + { + try + { + setFromBuffer(_encodedForm, _encodedSize); + } + catch (AMQFrameDecodingException e) + { + _logger.error("Error decoding FieldTable in deferred decoding mode ", e); + throw new IllegalArgumentException(e); + } + } + + private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val) + { + checkPropertyName(key); + initMapIfNecessary(); + if (_properties.containsKey(key)) + { + _encodedForm = null; + + if (val == null) + { + return removeKey(key); + } + } + else if ((_encodedForm != null) && (val != null)) + { + // We have updated data to store in the buffer + // So clear the _encodedForm to allow it to be rebuilt later + // this is safer than simply appending to any existing buffer. + _encodedForm = null; + } + else if (val == null) + { + return null; + } + + AMQTypedValue oldVal = _properties.put(key, val); + if (oldVal != null) + { + _encodedSize -= oldVal.getEncodingSize(); + } + else + { + _encodedSize += EncodingUtils.encodedShortStringLength(key) + 1; + } + + _encodedSize += val.getEncodingSize(); + + return oldVal; + } + + private void initMapIfNecessary() + { + synchronized (this) + { + if (_properties == null) + { + if ((_encodedForm == null) || (_encodedSize == 0)) + { + _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(); + } + else + { + populateFromBuffer(); + } + } + + } + } + + public Boolean getBoolean(String string) + { + return getBoolean(new AMQShortString(string)); + } + + public Boolean getBoolean(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.BOOLEAN)) + { + return (Boolean) value.getValue(); + } + else + { + return null; + } + } + + public Byte getByte(String string) + { + return getByte(new AMQShortString(string)); + } + + public Byte getByte(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.BYTE)) + { + return (Byte) value.getValue(); + } + else + { + return null; + } + } + + public Short getShort(String string) + { + return getShort(new AMQShortString(string)); + } + + public Short getShort(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.SHORT)) + { + return (Short) value.getValue(); + } + else + { + return null; + } + } + + public Integer getInteger(String string) + { + return getInteger(new AMQShortString(string)); + } + + public Integer getInteger(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.INT)) + { + return (Integer) value.getValue(); + } + else + { + return null; + } + } + + public Long getLong(String string) + { + return getLong(new AMQShortString(string)); + } + + public Long getLong(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.LONG)) + { + return (Long) value.getValue(); + } + else + { + return null; + } + } + + public Float getFloat(String string) + { + return getFloat(new AMQShortString(string)); + } + + public Float getFloat(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.FLOAT)) + { + return (Float) value.getValue(); + } + else + { + return null; + } + } + + public Double getDouble(String string) + { + return getDouble(new AMQShortString(string)); + } + + public Double getDouble(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.DOUBLE)) + { + return (Double) value.getValue(); + } + else + { + return null; + } + } + + public String getString(String string) + { + return getString(new AMQShortString(string)); + } + + public String getString(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && ((value.getType() == AMQType.WIDE_STRING) || (value.getType() == AMQType.ASCII_STRING))) + { + return (String) value.getValue(); + } + else if ((value != null) && (value.getValue() != null) && !(value.getValue() instanceof byte[])) + { + return String.valueOf(value.getValue()); + } + else + { + return null; + } + + } + + public Character getCharacter(String string) + { + return getCharacter(new AMQShortString(string)); + } + + public Character getCharacter(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.ASCII_CHARACTER)) + { + return (Character) value.getValue(); + } + else + { + return null; + } + } + + public byte[] getBytes(String string) + { + return getBytes(new AMQShortString(string)); + } + + public byte[] getBytes(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if ((value != null) && (value.getType() == AMQType.BINARY)) + { + return (byte[]) value.getValue(); + } + else + { + return null; + } + } + + /** + * Extracts a value from the field table that is itself a FieldTable associated with the specified parameter name. + * + * @param string The name of the parameter to get the associated FieldTable value for. + * + * @return The associated FieldTable value, or <tt>null</tt> if the associated value is not of FieldTable type or + * not present in the field table at all. + */ + public FieldTable getFieldTable(String string) + { + return getFieldTable(new AMQShortString(string)); + } + + /** + * Extracts a value from the field table that is itself a FieldTable associated with the specified parameter name. + * + * @param string The name of the parameter to get the associated FieldTable value for. + * + * @return The associated FieldTable value, or <tt>null</tt> if the associated value is not of FieldTable type or + * not present in the field table at all. + */ + public FieldTable getFieldTable(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + + if ((value != null) && (value.getType() == AMQType.FIELD_TABLE)) + { + return (FieldTable) value.getValue(); + } + else + { + return null; + } + } + + public Object getObject(String string) + { + return getObject(new AMQShortString(string)); + } + + public Object getObject(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + if (value != null) + { + return value.getValue(); + } + else + { + return value; + } + + } + + public Long getTimestamp(AMQShortString name) + { + AMQTypedValue value = getProperty(name); + if ((value != null) && (value.getType() == AMQType.TIMESTAMP)) + { + return (Long) value.getValue(); + } + else + { + return null; + } + } + + public BigDecimal getDecimal(AMQShortString propertyName) + { + AMQTypedValue value = getProperty(propertyName); + if ((value != null) && (value.getType() == AMQType.DECIMAL)) + { + return (BigDecimal) value.getValue(); + } + else + { + return null; + } + } + + // ************ Setters + public Object setBoolean(String string, Boolean b) + { + return setBoolean(new AMQShortString(string), b); + } + + public Object setBoolean(AMQShortString string, Boolean b) + { + return setProperty(string, AMQType.BOOLEAN.asTypedValue(b)); + } + + public Object setByte(String string, Byte b) + { + return setByte(new AMQShortString(string), b); + } + + public Object setByte(AMQShortString string, Byte b) + { + return setProperty(string, AMQType.BYTE.asTypedValue(b)); + } + + public Object setShort(String string, Short i) + { + return setShort(new AMQShortString(string), i); + } + + public Object setShort(AMQShortString string, Short i) + { + return setProperty(string, AMQType.SHORT.asTypedValue(i)); + } + + public Object setInteger(String string, Integer i) + { + return setInteger(new AMQShortString(string), i); + } + + public Object setInteger(AMQShortString string, Integer i) + { + return setProperty(string, AMQType.INT.asTypedValue(i)); + } + + public Object setLong(String string, Long l) + { + return setLong(new AMQShortString(string), l); + } + + public Object setLong(AMQShortString string, Long l) + { + return setProperty(string, AMQType.LONG.asTypedValue(l)); + } + + public Object setFloat(String string, Float f) + { + return setFloat(new AMQShortString(string), f); + } + + public Object setFloat(AMQShortString string, Float v) + { + return setProperty(string, AMQType.FLOAT.asTypedValue(v)); + } + + public Object setDouble(String string, Double d) + { + return setDouble(new AMQShortString(string), d); + } + + public Object setDouble(AMQShortString string, Double v) + { + return setProperty(string, AMQType.DOUBLE.asTypedValue(v)); + } + + public Object setString(String string, String s) + { + return setString(new AMQShortString(string), s); + } + + public Object setAsciiString(AMQShortString string, String value) + { + if (value == null) + { + return setProperty(string, AMQType.VOID.asTypedValue(null)); + } + else + { + return setProperty(string, AMQType.ASCII_STRING.asTypedValue(value)); + } + } + + public Object setString(AMQShortString string, String value) + { + if (value == null) + { + return setProperty(string, AMQType.VOID.asTypedValue(null)); + } + else + { + return setProperty(string, AMQType.LONG_STRING.asTypedValue(value)); + } + } + + public Object setChar(String string, char c) + { + return setChar(new AMQShortString(string), c); + } + + public Object setChar(AMQShortString string, char c) + { + return setProperty(string, AMQType.ASCII_CHARACTER.asTypedValue(c)); + } + + public Object setBytes(String string, byte[] b) + { + return setBytes(new AMQShortString(string), b); + } + + public Object setBytes(AMQShortString string, byte[] bytes) + { + return setProperty(string, AMQType.BINARY.asTypedValue(bytes)); + } + + public Object setBytes(String string, byte[] bytes, int start, int length) + { + return setBytes(new AMQShortString(string), bytes, start, length); + } + + public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) + { + byte[] newBytes = new byte[length]; + System.arraycopy(bytes, start, newBytes, 0, length); + + return setBytes(string, bytes); + } + + public Object setObject(String string, Object o) + { + return setObject(new AMQShortString(string), o); + } + + public Object setTimestamp(AMQShortString string, long datetime) + { + return setProperty(string, AMQType.TIMESTAMP.asTypedValue(datetime)); + } + + public Object setDecimal(AMQShortString string, BigDecimal decimal) + { + if (decimal.longValue() > Integer.MAX_VALUE) + { + throw new UnsupportedOperationException("AMQP doesnot support decimals larger than " + Integer.MAX_VALUE); + } + + if (decimal.scale() > Byte.MAX_VALUE) + { + throw new UnsupportedOperationException("AMQP doesnot support decimal scales larger than " + Byte.MAX_VALUE); + } + + return setProperty(string, AMQType.DECIMAL.asTypedValue(decimal)); + } + + public Object setVoid(AMQShortString string) + { + return setProperty(string, AMQType.VOID.asTypedValue(null)); + } + + /** + * Associates a nested field table with the specified parameter name. + * + * @param string The name of the parameter to store in the table. + * @param ftValue The field table value to associate with the parameter name. + * + * @return The stored value. + */ + public Object setFieldTable(String string, FieldTable ftValue) + { + return setFieldTable(new AMQShortString(string), ftValue); + } + + /** + * Associates a nested field table with the specified parameter name. + * + * @param string The name of the parameter to store in the table. + * @param ftValue The field table value to associate with the parameter name. + * + * @return The stored value. + */ + public Object setFieldTable(AMQShortString string, FieldTable ftValue) + { + return setProperty(string, AMQType.FIELD_TABLE.asTypedValue(ftValue)); + } + + public Object setObject(AMQShortString string, Object object) + { + if (object instanceof Boolean) + { + return setBoolean(string, (Boolean) object); + } + else if (object instanceof Byte) + { + return setByte(string, (Byte) object); + } + else if (object instanceof Short) + { + return setShort(string, (Short) object); + } + else if (object instanceof Integer) + { + return setInteger(string, (Integer) object); + } + else if (object instanceof Long) + { + return setLong(string, (Long) object); + } + else if (object instanceof Float) + { + return setFloat(string, (Float) object); + } + else if (object instanceof Double) + { + return setDouble(string, (Double) object); + } + else if (object instanceof String) + { + return setString(string, (String) object); + } + else if (object instanceof Character) + { + return setChar(string, (Character) object); + } + else if (object instanceof byte[]) + { + return setBytes(string, (byte[]) object); + } + + throw new AMQPInvalidClassException(AMQPInvalidClassException.INVALID_OBJECT_MSG + (object == null ? "null" : object.getClass())); + } + + public boolean isNullStringValue(String name) + { + AMQTypedValue value = getProperty(new AMQShortString(name)); + + return (value != null) && (value.getType() == AMQType.VOID); + } + + // ***** Methods + + public Enumeration getPropertyNames() + { + return Collections.enumeration(keys()); + } + + public boolean propertyExists(AMQShortString propertyName) + { + return itemExists(propertyName); + } + + public boolean propertyExists(String propertyName) + { + return itemExists(propertyName); + } + + public boolean itemExists(AMQShortString propertyName) + { + checkPropertyName(propertyName); + initMapIfNecessary(); + + return _properties.containsKey(propertyName); + } + + public boolean itemExists(String string) + { + return itemExists(new AMQShortString(string)); + } + + public String toString() + { + initMapIfNecessary(); + + return _properties.toString(); + } + + private void checkPropertyName(AMQShortString propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if (propertyName.length() == 0) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + if (_strictAMQP) + { + checkIdentiferFormat(propertyName); + } + } + + protected static void checkIdentiferFormat(AMQShortString propertyName) + { + // AMQP Spec: 4.2.5.5 Field Tables + // Guidelines for implementers: + // * Field names MUST start with a letter, '$' or '#' and may continue with + // letters, '$' or '#', digits, or underlines, to a maximum length of 128 + // characters. + // * The server SHOULD validate field names and upon receiving an invalid + // field name, it SHOULD signal a connection exception with reply code + // 503 (syntax error). Conformance test: amq_wlp_table_01. + // * A peer MUST handle duplicate fields by using only the first instance. + + // AMQP length limit + if (propertyName.length() > 128) + { + throw new IllegalArgumentException("AMQP limits property names to 128 characters"); + } + + // AMQ start character + if (!(Character.isLetter(propertyName.charAt(0)) || (propertyName.charAt(0) == '$') + || (propertyName.charAt(0) == '#') || (propertyName.charAt(0) == '_'))) // Not official AMQP added for JMS. + { + throw new IllegalArgumentException("Identifier '" + propertyName + + "' does not start with a valid AMQP start character"); + } + } + + // ************************* Byte Buffer Processing + + public void writeToBuffer(ByteBuffer buffer) + { + final boolean trace = _logger.isDebugEnabled(); + + if (trace) + { + _logger.debug("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); + if (_properties != null) + { + _logger.debug(_properties.toString()); + } + } + + EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize()); + + putDataInBuffer(buffer); + } + + public byte[] getDataAsBytes() + { + final int encodedSize = (int) getEncodedSize(); + final ByteBuffer buffer = ByteBuffer.allocate(encodedSize); // FIXME XXX: Is cast a problem? + + putDataInBuffer(buffer); + + final byte[] result = new byte[encodedSize]; + buffer.flip(); + buffer.get(result); + buffer.release(); + + return result; + } + + public long getEncodedSize() + { + return _encodedSize; + } + + private void recalculateEncodedSize() + { + + int encodedSize = 0; + if (_properties != null) + { + for (Map.Entry<AMQShortString, AMQTypedValue> e : _properties.entrySet()) + { + encodedSize += EncodingUtils.encodedShortStringLength(e.getKey()); + encodedSize++; // the byte for the encoding Type + encodedSize += e.getValue().getEncodingSize(); + + } + } + + _encodedSize = encodedSize; + } + + public void addAll(FieldTable fieldTable) + { + initMapIfNecessary(); + _encodedForm = null; + _properties.putAll(fieldTable._properties); + recalculateEncodedSize(); + } + + public static Map<String, Object> convertToMap(final FieldTable fieldTable) + { + final Map<String, Object> map = new HashMap<String, Object>(); + + if(fieldTable != null) + { + fieldTable.processOverElements( + new FieldTableElementProcessor() + { + + public boolean processElement(String propertyName, AMQTypedValue value) + { + Object val = value.getValue(); + if(val instanceof AMQShortString) + { + val = val.toString(); + } + map.put(propertyName, val); + return true; + } + + public Object getResult() + { + return map; + } + }); + } + return map; + } + + + public static interface FieldTableElementProcessor + { + public boolean processElement(String propertyName, AMQTypedValue value); + + public Object getResult(); + } + + public Object processOverElements(FieldTableElementProcessor processor) + { + initMapIfNecessary(); + if (_properties != null) + { + for (Map.Entry<AMQShortString, AMQTypedValue> e : _properties.entrySet()) + { + boolean result = processor.processElement(e.getKey().toString(), e.getValue()); + if (!result) + { + break; + } + } + } + + return processor.getResult(); + + } + + public int size() + { + initMapIfNecessary(); + + return _properties.size(); + + } + + public boolean isEmpty() + { + return size() == 0; + } + + public boolean containsKey(AMQShortString key) + { + initMapIfNecessary(); + + return _properties.containsKey(key); + } + + public boolean containsKey(String key) + { + return containsKey(new AMQShortString(key)); + } + + public Set<String> keys() + { + initMapIfNecessary(); + Set<String> keys = new LinkedHashSet<String>(); + for (AMQShortString key : _properties.keySet()) + { + keys.add(key.toString()); + } + + return keys; + } + + public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator() + { + if(_encodedForm != null) + { + return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize); + } + else + { + initMapIfNecessary(); + return _properties.entrySet().iterator(); + } + } + + public Object get(String key) + { + return get(new AMQShortString(key)); + } + + public Object get(AMQShortString key) + { + return getObject(key); + } + + public Object put(AMQShortString key, Object value) + { + return setObject(key, value); + } + + public Object remove(String key) + { + + return remove(new AMQShortString(key)); + + } + + public Object remove(AMQShortString key) + { + AMQTypedValue val = removeKey(key); + + return (val == null) ? null : val.getValue(); + + } + + public AMQTypedValue removeKey(AMQShortString key) + { + initMapIfNecessary(); + _encodedForm = null; + AMQTypedValue value = _properties.remove(key); + if (value == null) + { + return null; + } + else + { + _encodedSize -= EncodingUtils.encodedShortStringLength(key); + _encodedSize--; + _encodedSize -= value.getEncodingSize(); + + return value; + } + + } + + public void clear() + { + initMapIfNecessary(); + _encodedForm = null; + _properties.clear(); + _encodedSize = 0; + } + + public Set<AMQShortString> keySet() + { + initMapIfNecessary(); + + return _properties.keySet(); + } + + private void putDataInBuffer(ByteBuffer buffer) + { + + if (_encodedForm != null) + { + if(buffer.isDirect() || buffer.isReadOnly()) + { + ByteBuffer encodedForm = _encodedForm.duplicate(); + + if (encodedForm.position() != 0) + { + encodedForm.flip(); + } + + buffer.put(encodedForm); + } + else + { + buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize); + } + } + else if (_properties != null) + { + final Iterator<Map.Entry<AMQShortString, AMQTypedValue>> it = _properties.entrySet().iterator(); + + // If there are values then write out the encoded Size... could check _encodedSize != 0 + // write out the total length, which we have kept up to date as data is added + + while (it.hasNext()) + { + final Map.Entry<AMQShortString, AMQTypedValue> me = it.next(); + try + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + + me.getValue().getValue()); + _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); + } + + // Write the actual parameter name + EncodingUtils.writeShortStringBytes(buffer, me.getKey()); + me.getValue().writeToBuffer(buffer); + } + catch (Exception e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Exception thrown:" + e); + _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + + me.getValue().getValue()); + _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); + } + + throw new RuntimeException(e); + } + } + } + } + + private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException + { + + final boolean trace = _logger.isDebugEnabled(); + if (length > 0) + { + + final int expectedRemaining = buffer.remaining() - (int) length; + + _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY); + + do + { + + final AMQShortString key = EncodingUtils.readAMQShortString(buffer); + + _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read key '" + key); + + AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer); + + if (trace) + { + _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + + "', key '" + key + "', value '" + value.getValue() + "'"); + } + + _properties.put(key, value); + + } + while (buffer.remaining() > expectedRemaining); + + } + + _encodedSize = length; + + if (trace) + { + _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done."); + } + } + + private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue> + { + private final AMQTypedValue _value; + private final AMQShortString _key; + + public FieldTableEntry(final AMQShortString key, final AMQTypedValue value) + { + _key = key; + _value = value; + } + + public AMQShortString getKey() + { + return _key; + } + + public AMQTypedValue getValue() + { + return _value; + } + + public AMQTypedValue setValue(final AMQTypedValue value) + { + throw new UnsupportedOperationException(); + } + + public boolean equals(Object o) + { + if(o instanceof FieldTableEntry) + { + FieldTableEntry other = (FieldTableEntry) o; + return (_key == null ? other._key == null : _key.equals(other._key)) + && (_value == null ? other._value == null : _value.equals(other._value)); + } + else + { + return false; + } + } + + public int hashCode() + { + return (getKey()==null ? 0 : getKey().hashCode()) + ^ (getValue()==null ? 0 : getValue().hashCode()); + } + + } + + + private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>> + { + + private final ByteBuffer _buffer; + private int _expectedRemaining; + + public FieldTableIterator(ByteBuffer buffer, int length) + { + _buffer = buffer; + _expectedRemaining = buffer.remaining() - length; + } + + public boolean hasNext() + { + return (_buffer.remaining() > _expectedRemaining); + } + + public Map.Entry<AMQShortString, AMQTypedValue> next() + { + if(hasNext()) + { + final AMQShortString key = EncodingUtils.readAMQShortString(_buffer); + AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer); + return new FieldTableEntry(key, value); + } + else + { + return null; + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + } + + + + + public int hashCode() + { + initMapIfNecessary(); + + return _properties.hashCode(); + } + + public boolean equals(Object o) + { + if (o == this) + { + return true; + } + + if (o == null) + { + return false; + } + + if (!(o instanceof FieldTable)) + { + return false; + } + + initMapIfNecessary(); + + FieldTable f = (FieldTable) o; + f.initMapIfNecessary(); + + return _properties.equals(f._properties); + } + + public static FieldTable convertToFieldTable(Map<String, Object> map) + { + if (map != null) + { + FieldTable table = new FieldTable(); + for(Map.Entry<String,Object> entry : map.entrySet()) + { + table.put(new AMQShortString(entry.getKey()), entry.getValue()); + } + + return table; + } + else + { + return null; + } + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java new file mode 100644 index 0000000000..e9d75137ef --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class FieldTableFactory +{ + public static FieldTable newFieldTable() + { + return new FieldTable(); + } + + public static FieldTable newFieldTable(ByteBuffer byteBuffer, long length) throws AMQFrameDecodingException + { + return new FieldTable(byteBuffer, length); + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java new file mode 100644 index 0000000000..18ab05ffa1 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; + +public class HeartbeatBody implements AMQBody +{ + public static final byte TYPE = 8; + public static final AMQFrame FRAME = new HeartbeatBody().toFrame(); + + public HeartbeatBody() + { + + } + + public HeartbeatBody(ByteBuffer buffer, long size) + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + + public byte getFrameType() + { + return TYPE; + } + + public int getSize() + { + return 0;//heartbeats we generate have no payload + } + + public void writePayload(ByteBuffer buffer) + { + } + + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.heartbeatBodyReceived(channelId, this); + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + + public AMQFrame toFrame() + { + return new AMQFrame(0, this); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java new file mode 100644 index 0000000000..c7ada708dc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class HeartbeatBodyFactory implements BodyFactory +{ + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException + { + return new HeartbeatBody(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java new file mode 100644 index 0000000000..fb3dd89717 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -0,0 +1,223 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.qpid.AMQException; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock +{ + + // TODO: generate these constants automatically from the xml protocol spec file + private static final byte[] AMQP_HEADER = new byte[]{(byte)'A',(byte)'M',(byte)'Q',(byte)'P'}; + + private static final byte CURRENT_PROTOCOL_CLASS = 1; + private static final byte TCP_PROTOCOL_INSTANCE = 1; + + public final byte[] _protocolHeader; + public final byte _protocolClass; + public final byte _protocolInstance; + public final byte _protocolMajor; + public final byte _protocolMinor; + + +// public ProtocolInitiation() {} + + public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor) + { + _protocolHeader = protocolHeader; + _protocolClass = protocolClass; + _protocolInstance = protocolInstance; + _protocolMajor = protocolMajor; + _protocolMinor = protocolMinor; + } + + public ProtocolInitiation(ProtocolVersion pv) + { + this(AMQP_HEADER, + pv.equals(ProtocolVersion.v0_91) ? 0 : CURRENT_PROTOCOL_CLASS, + pv.equals(ProtocolVersion.v0_91) ? 0 : TCP_PROTOCOL_INSTANCE, + pv.equals(ProtocolVersion.v0_91) ? 9 : pv.getMajorVersion(), + pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion()); + } + + public ProtocolInitiation(ByteBuffer in) + { + _protocolHeader = new byte[4]; + in.get(_protocolHeader); + + _protocolClass = in.get(); + _protocolInstance = in.get(); + _protocolMajor = in.get(); + _protocolMinor = in.get(); + } + + public void writePayload(org.apache.mina.common.ByteBuffer buffer) + { + writePayload(buffer.buf()); + } + + public long getSize() + { + return 4 + 1 + 1 + 1 + 1; + } + + public void writePayload(ByteBuffer buffer) + { + + buffer.put(_protocolHeader); + buffer.put(_protocolClass); + buffer.put(_protocolInstance); + buffer.put(_protocolMajor); + buffer.put(_protocolMinor); + } + + public boolean equals(Object o) + { + if (!(o instanceof ProtocolInitiation)) + { + return false; + } + + ProtocolInitiation pi = (ProtocolInitiation) o; + if (pi._protocolHeader == null) + { + return false; + } + + if (_protocolHeader.length != pi._protocolHeader.length) + { + return false; + } + + for (int i = 0; i < _protocolHeader.length; i++) + { + if (_protocolHeader[i] != pi._protocolHeader[i]) + { + return false; + } + } + + return (_protocolClass == pi._protocolClass && + _protocolInstance == pi._protocolInstance && + _protocolMajor == pi._protocolMajor && + _protocolMinor == pi._protocolMinor); + } + + @Override + public int hashCode() + { + int result = _protocolHeader != null ? Arrays.hashCode(_protocolHeader) : 0; + result = 31 * result + (int) _protocolClass; + result = 31 * result + (int) _protocolInstance; + result = 31 * result + (int) _protocolMajor; + result = 31 * result + (int) _protocolMinor; + return result; + } + + public static class Decoder //implements MessageDecoder + { + /** + * + * @param in input buffer + * @return true if we have enough data to decode the PI frame fully, false if more + * data is required + */ + public boolean decodable(ByteBuffer in) + { + return (in.remaining() >= 8); + } + + } + + public ProtocolVersion checkVersion() throws AMQException + { + + if(_protocolHeader.length != 4) + { + throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null); + } + for(int i = 0; i < 4; i++) + { + if(_protocolHeader[i] != AMQP_HEADER[i]) + { + try + { + throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"), null); + } + catch (UnsupportedEncodingException e) + { + + } + } + } + + ProtocolVersion pv; + + // Hack for 0-9-1 which changed how the header was defined + if(_protocolInstance == 0 && _protocolMajor == 9 && _protocolMinor == 1) + { + pv = ProtocolVersion.v0_91; + if (_protocolClass != 0) + { + throw new AMQProtocolClassException("Protocol class " + 0 + " was expected; received " + + _protocolClass, null); + } + } + else if (_protocolClass != CURRENT_PROTOCOL_CLASS) + { + throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + + _protocolClass, null); + } + else if (_protocolInstance != TCP_PROTOCOL_INSTANCE) + { + throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " + + _protocolInstance, null); + } + else + { + pv = new ProtocolVersion(_protocolMajor, _protocolMinor); + } + + + if (!pv.isSupported()) + { + // TODO: add list of available versions in list to msg... + throw new AMQProtocolVersionException("Protocol version " + + _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.", null); + } + return pv; + } + + public String toString() + { + StringBuffer buffer = new StringBuffer(new String(_protocolHeader)); + buffer.append(Integer.toHexString(_protocolClass)); + buffer.append(Integer.toHexString(_protocolInstance)); + buffer.append(Integer.toHexString(_protocolMajor)); + buffer.append(Integer.toHexString(_protocolMinor)); + return buffer.toString(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java new file mode 100644 index 0000000000..bd763599b0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock +{ + private AMQDataBlock _firstFrame; + + private AMQDataBlock _block; + + public SmallCompositeAMQDataBlock(AMQDataBlock block) + { + _block = block; + } + + /** + * The encoded block will be logically first before the AMQDataBlocks which are encoded + * into the buffer afterwards. + * @param encodedBlock already-encoded data + * @param block a block to be encoded. + */ + public SmallCompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock block) + { + this(block); + _firstFrame = encodedBlock; + } + + public AMQDataBlock getBlock() + { + return _block; + } + + public AMQDataBlock getFirstFrame() + { + return _firstFrame; + } + + public long getSize() + { + long frameSize = _block.getSize(); + + if (_firstFrame != null) + { + + frameSize += _firstFrame.getSize(); + } + return frameSize; + } + + public void writePayload(ByteBuffer buffer) + { + if (_firstFrame != null) + { + _firstFrame.writePayload(buffer); + } + _block.writePayload(buffer); + + } + + public String toString() + { + if (_block == null) + { + return "No blocks contained in composite frame"; + } + else + { + StringBuilder buf = new StringBuilder(this.getClass().getName()); + buf.append("{encodedBlock=").append(_firstFrame); + + buf.append(" _block=[").append(_block.toString()).append("]"); + + buf.append("}"); + return buf.toString(); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java new file mode 100644 index 0000000000..76c154581d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -0,0 +1,198 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VersionSpecificRegistry +{ + private static final Logger _log = LoggerFactory.getLogger(VersionSpecificRegistry.class); + + private final byte _protocolMajorVersion; + private final byte _protocolMinorVersion; + + private static final int DEFAULT_MAX_CLASS_ID = 200; + private static final int DEFAULT_MAX_METHOD_ID = 50; + + private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][]; + + private ProtocolVersionMethodConverter _protocolVersionConverter; + + public VersionSpecificRegistry(byte major, byte minor) + { + _protocolMajorVersion = major; + _protocolMinorVersion = minor; + + _protocolVersionConverter = loadProtocolVersionConverters(major, minor); + } + + private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, + byte protocolMinorVersion) + { + try + { + Class<ProtocolVersionMethodConverter> versionMethodConverterClass = + (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_" + + protocolMajorVersion + "_" + protocolMinorVersion); + + return versionMethodConverterClass.newInstance(); + + } + catch (ClassNotFoundException e) + { + _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion); + if (protocolMinorVersion != 0) + { + protocolMinorVersion--; + + return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); + } + else if (protocolMajorVersion != 0) + { + protocolMajorVersion--; + + return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); + } + else + { + return null; + } + + } + catch (IllegalAccessException e) + { + throw new IllegalStateException("Unable to load protocol version converter: ", e); + } + catch (InstantiationException e) + { + throw new IllegalStateException("Unable to load protocol version converter: ", e); + } + } + + public byte getProtocolMajorVersion() + { + return _protocolMajorVersion; + } + + public byte getProtocolMinorVersion() + { + return _protocolMinorVersion; + } + + public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID) + { + try + { + return _registry[classID][methodID]; + } + catch (IndexOutOfBoundsException e) + { + return null; + } + catch (NullPointerException e) + { + return null; + } + } + + public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory) + { + if (_registry.length <= classID) + { + AMQMethodBodyInstanceFactory[][] oldRegistry = _registry; + _registry = new AMQMethodBodyInstanceFactory[classID + 1][]; + System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length); + } + + if (_registry[classID] == null) + { + _registry[classID] = + new AMQMethodBodyInstanceFactory[(methodID > DEFAULT_MAX_METHOD_ID) ? (methodID + 1) + : (DEFAULT_MAX_METHOD_ID + 1)]; + } + else if (_registry[classID].length <= methodID) + { + AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID]; + _registry[classID] = new AMQMethodBodyInstanceFactory[methodID + 1]; + System.arraycopy(oldMethods, 0, _registry[classID], 0, oldMethods.length); + } + + _registry[classID][methodID] = instanceFactory; + + } + + public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) throws AMQFrameDecodingException + { + AMQMethodBodyInstanceFactory bodyFactory; + try + { + bodyFactory = _registry[classID][methodID]; + } + catch (NullPointerException e) + { + throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " + + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + + " method " + methodID + ".", e); + } + catch (IndexOutOfBoundsException e) + { + if (classID >= _registry.length) + { + throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " + + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + + " method " + methodID + ".", e); + + } + else + { + throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version " + + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + + " method " + methodID + ".", e); + + } + } + + if (bodyFactory == null) + { + throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version " + + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID + + " method " + methodID + ".", null); + } + + return bodyFactory.newInstance( in, size); + + } + + public ProtocolVersionMethodConverter getProtocolVersionMethodConverter() + { + return _protocolVersionConverter; + } + + public void configure() + { + _protocolVersionConverter.configure(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java new file mode 100644 index 0000000000..1d7c05e9cc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private final byte _protocolMajorVersion; + + + private final byte _protocolMinorVersion; + + public AbstractMethodConverter(byte major, byte minor) + { + _protocolMajorVersion = major; + _protocolMinorVersion = minor; + } + + + public final byte getProtocolMajorVersion() + { + return _protocolMajorVersion; + } + + public final byte getProtocolMinorVersion() + { + return _protocolMinorVersion; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java new file mode 100644 index 0000000000..0695349f76 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.mina.common.ByteBuffer; + +public interface ContentChunk +{ + int getSize(); + ByteBuffer getData(); + + void reduceToFit(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java new file mode 100644 index 0000000000..a96bdcc171 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQShortString; + +public interface MessagePublishInfo +{ + + public AMQShortString getExchange(); + + public void setExchange(AMQShortString exchange); + + public boolean isImmediate(); + + public boolean isMandatory(); + + public AMQShortString getRoutingKey(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java new file mode 100644 index 0000000000..01d1a8a17b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQMethodBody; + + +public interface MessagePublishInfoConverter +{ + public MessagePublishInfo convertToInfo(AMQMethodBody body); + public AMQMethodBody convertToBody(MessagePublishInfo info); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java new file mode 100644 index 0000000000..e3d5da73da --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.AMQShortString; + +public class MessagePublishInfoImpl implements MessagePublishInfo +{ + private AMQShortString _exchange; + private boolean _immediate; + private boolean _mandatory; + private AMQShortString _routingKey; + + public MessagePublishInfoImpl() + { + } + + public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, + AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public void setExchange(AMQShortString exchange) + { + _exchange = exchange; + } + + public boolean isImmediate() + { + return _immediate; + } + + public void setImmediate(boolean immedate) + { + _immediate = immedate; + } + + public boolean isMandatory() + { + return _mandatory; + } + + public void setMandatory(boolean mandatory) + { + _mandatory = mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + public void setRoutingKey(AMQShortString routingKey) + { + _routingKey = routingKey; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java new file mode 100644 index 0000000000..7544d9b7e7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.abstraction; + +import org.apache.qpid.framing.AMQBody; + +import java.nio.ByteBuffer; + +public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter +{ + AMQBody convertToBody(ContentChunk contentBody); + ContentChunk convertToContentChunk(AMQBody body); + + void configure(); + + AMQBody convertToBody(ByteBuffer buf); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java new file mode 100644 index 0000000000..8d51343507 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_0_9; + +public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMethodBodyImpl +{ + + public byte getMajor() + { + return 0; + } + + public byte getMinor() + { + return 9; + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java new file mode 100644 index 0000000000..1c4a29b106 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -0,0 +1,134 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_0_9; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_9.*; +import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl; + +public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private int _basicPublishClassId; + private int _basicPublishMethodId; + + public MethodConverter_0_9() + { + super((byte)0,(byte)9); + + + } + + public AMQBody convertToBody(ContentChunk contentChunk) + { + if(contentChunk instanceof ContentChunk_0_9) + { + return ((ContentChunk_0_9)contentChunk).toBody(); + } + else + { + return new ContentBody(contentChunk.getData()); + } + } + + public ContentChunk convertToContentChunk(AMQBody body) + { + final ContentBody contentBodyChunk = (ContentBody) body; + + return new ContentChunk_0_9(contentBodyChunk); + + } + + public void configure() + { + + _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID; + _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; + + } + + public AMQBody convertToBody(java.nio.ByteBuffer buf) + { + return new ContentBody(ByteBuffer.wrap(buf)); + } + + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) + { + final BasicPublishBody publishBody = ((BasicPublishBody) methodBody); + + final AMQShortString exchange = publishBody.getExchange(); + final AMQShortString routingKey = publishBody.getRoutingKey(); + + return new MessagePublishInfoImpl(exchange, + publishBody.getImmediate(), + publishBody.getMandatory(), + routingKey); + + } + + public AMQMethodBody convertToBody(MessagePublishInfo info) + { + + return new BasicPublishBodyImpl(0, + info.getExchange(), + info.getRoutingKey(), + info.isMandatory(), + info.isImmediate()) ; + + } + + private static class ContentChunk_0_9 implements ContentChunk + { + private final ContentBody _contentBodyChunk; + + public ContentChunk_0_9(final ContentBody contentBodyChunk) + { + _contentBodyChunk = contentBodyChunk; + } + + public int getSize() + { + return _contentBodyChunk.getSize(); + } + + public ByteBuffer getData() + { + return _contentBodyChunk.payload; + } + + public void reduceToFit() + { + _contentBodyChunk.reduceBufferToFit(); + } + + public AMQBody toBody() + { + return _contentBodyChunk; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java new file mode 100644 index 0000000000..60b8a7e1a6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_0_91; + +public abstract class AMQMethodBody_0_91 extends org.apache.qpid.framing.AMQMethodBodyImpl +{ + + public byte getMajor() + { + return 0; + } + + public byte getMinor() + { + return 91; + } + +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java new file mode 100644 index 0000000000..6e330574bc --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_0_91; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.*; + +public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private int _basicPublishClassId; + private int _basicPublishMethodId; + + public MethodConverter_0_91() + { + super((byte)0,(byte)9); + + + } + + public AMQBody convertToBody(ContentChunk contentChunk) + { + if(contentChunk instanceof ContentChunk_0_9) + { + return ((ContentChunk_0_9)contentChunk).toBody(); + } + else + { + return new ContentBody(contentChunk.getData()); + } + } + + public ContentChunk convertToContentChunk(AMQBody body) + { + final ContentBody contentBodyChunk = (ContentBody) body; + + return new ContentChunk_0_9(contentBodyChunk); + + } + + public void configure() + { + + _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; + _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; + + } + + public AMQBody convertToBody(java.nio.ByteBuffer buf) + { + return new ContentBody(ByteBuffer.wrap(buf)); + } + + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) + { + final BasicPublishBody publishBody = ((BasicPublishBody) methodBody); + + final AMQShortString exchange = publishBody.getExchange(); + final AMQShortString routingKey = publishBody.getRoutingKey(); + + return new MessagePublishInfoImpl(exchange, + publishBody.getImmediate(), + publishBody.getMandatory(), + routingKey); + + } + + public AMQMethodBody convertToBody(MessagePublishInfo info) + { + + return new BasicPublishBodyImpl(0, + info.getExchange(), + info.getRoutingKey(), + info.isMandatory(), + info.isImmediate()) ; + + } + + private static class ContentChunk_0_9 implements ContentChunk + { + private final ContentBody _contentBodyChunk; + + public ContentChunk_0_9(final ContentBody contentBodyChunk) + { + _contentBodyChunk = contentBodyChunk; + } + + public int getSize() + { + return _contentBodyChunk.getSize(); + } + + public ByteBuffer getData() + { + return _contentBodyChunk.payload; + } + + public void reduceToFit() + { + _contentBodyChunk.reduceBufferToFit(); + } + + public AMQBody toBody() + { + return _contentBodyChunk; + } + } +}
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java new file mode 100644 index 0000000000..35645854c0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_8_0; + +public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl +{ + + public byte getMajor() + { + return 8; + } + + public byte getMinor() + { + return 0; + } + + + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java new file mode 100644 index 0000000000..c87820b9b2 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing.amqp_8_0; + +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; +import org.apache.qpid.framing.*; + +import org.apache.mina.common.ByteBuffer; + +public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter +{ + private int _basicPublishClassId; + private int _basicPublishMethodId; + + public MethodConverter_8_0() + { + super((byte)8,(byte)0); + + + } + + public AMQBody convertToBody(ContentChunk contentChunk) + { + return new ContentBody(contentChunk.getData()); + } + + public ContentChunk convertToContentChunk(AMQBody body) + { + final ContentBody contentBodyChunk = (ContentBody) body; + + return new ContentChunk() + { + + public int getSize() + { + return contentBodyChunk.getSize(); + } + + public ByteBuffer getData() + { + return contentBodyChunk.payload; + } + + public void reduceToFit() + { + contentBodyChunk.reduceBufferToFit(); + } + }; + + } + + public void configure() + { + + _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; + _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; + + } + + public AMQBody convertToBody(java.nio.ByteBuffer buf) + { + return new ContentBody(ByteBuffer.wrap(buf)); + } + + public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) + { + final BasicPublishBody publishBody = ((BasicPublishBody) methodBody); + + final AMQShortString exchange = publishBody.getExchange(); + final AMQShortString routingKey = publishBody.getRoutingKey(); + + return new MessagePublishInfoImpl(exchange == null ? null : exchange.intern(), + publishBody.getImmediate(), + publishBody.getMandatory(), + routingKey == null ? null : routingKey.intern()); + + } + + public AMQMethodBody convertToBody(MessagePublishInfo info) + { + + return new BasicPublishBodyImpl(0, + info.getExchange(), + info.getRoutingKey(), + info.isMandatory(), + info.isImmediate()) ; + + } +} |