summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/framing
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/java/common/src/main/java/org/apache/qpid/framing
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/framing')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java63
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java131
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java61
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java125
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java47
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java83
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java45
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java258
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java90
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java41
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java779
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java31
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java795
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java179
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java838
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java31
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java81
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java78
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java121
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java141
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java50
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java60
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java59
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java50
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java35
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java1033
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java1246
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java38
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java79
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java31
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java223
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java98
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java198
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java47
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java38
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java32
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java85
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java36
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java37
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java134
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java37
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java132
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java113
53 files changed, 8290 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
new file mode 100644
index 0000000000..fe04155bb8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
+
+public interface AMQBody
+{
+ public byte getFrameType();
+
+ /**
+ * Get the size of the body
+ * @return unsigned short
+ */
+ public abstract int getSize();
+
+ public void writePayload(ByteBuffer buffer);
+
+ void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
new file mode 100644
index 0000000000..a2fc3a03ef
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * A data block represents something that has a size in bytes and the ability to write itself to a byte
+ * buffer (similar to a byte array).
+ */
+public abstract class AMQDataBlock implements EncodableAMQDataBlock
+{
+ /**
+ * Get the size of buffer needed to store the byte representation of this
+ * frame.
+ * @return unsigned integer
+ */
+ public abstract long getSize();
+
+ /**
+ * Writes the datablock to the specified buffer.
+ * @param buffer
+ */
+ public abstract void writePayload(ByteBuffer buffer);
+
+ public ByteBuffer toByteBuffer()
+ {
+ final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
+
+ writePayload(buffer);
+ buffer.flip();
+ return buffer;
+ }
+
+ public java.nio.ByteBuffer toNioByteBuffer()
+ {
+ final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+ ByteBuffer buf = ByteBuffer.wrap(buffer);
+ writePayload(buf);
+ buffer.flip();
+ return buffer;
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
new file mode 100644
index 0000000000..228867b2b0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQDataBlockDecoder
+{
+ private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
+
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+
+ static
+ {
+ _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+ _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+ _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
+
+ Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class);
+
+ public AMQDataBlockDecoder()
+ { }
+
+ public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
+ {
+ final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
+ // type, channel, body length and end byte
+ if (remainingAfterAttributes < 0)
+ {
+ return false;
+ }
+
+ in.position(in.position() + 1 + 2);
+ // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
+ final long bodySize = in.getInt() & 0xffffffffL;
+
+ return (remainingAfterAttributes >= bodySize);
+
+ }
+
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ final byte type = in.get();
+
+ BodyFactory bodyFactory;
+ if (type == AMQMethodBody.TYPE)
+ {
+ bodyFactory = methodBodyFactory;
+ }
+ else
+ {
+ bodyFactory = _bodiesSupported[type];
+ }
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+ }
+
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
+ {
+ throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize, null);
+ }
+
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.get();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type, null);
+ }
+
+ return frame;
+ }
+
+ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+ if (bodyFactory == null)
+ {
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQMethodBodyFactory(protocolSession);
+ session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+ }
+
+ out.write(createAndPopulateFrame(bodyFactory, in));
+ }
+
+ public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+ {
+ return decodable(msg.buf());
+ }
+
+ public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
new file mode 100644
index 0000000000..374644b4f2
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Set;
+
+public final class AMQDataBlockEncoder implements MessageEncoder
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQDataBlockEncoder.class);
+
+ private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class);
+
+ public AMQDataBlockEncoder()
+ { }
+
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+ {
+ final AMQDataBlock frame = (AMQDataBlock) message;
+
+ final ByteBuffer buffer = frame.toByteBuffer();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
+ }
+
+ out.write(buffer);
+ }
+
+ public Set getMessageTypes()
+ {
+ return _messageTypes;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
new file mode 100644
index 0000000000..02a46f3748
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ private final int _channel;
+
+ private final AMQBody _bodyFrame;
+ public static final byte FRAME_END_BYTE = (byte) 0xCE;
+
+
+ public AMQFrame(final int channel, final AMQBody bodyFrame)
+ {
+ _channel = channel;
+ _bodyFrame = bodyFrame;
+ }
+
+ public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
+ {
+ this._channel = channel;
+ this._bodyFrame = bodyFactory.createBody(in,bodySize);
+ }
+
+ public long getSize()
+ {
+ return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
+ }
+
+ public static final int getFrameOverhead()
+ {
+ return 1 + 2 + 4 + 1;
+ }
+
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ buffer.put(_bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, _channel);
+ EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+ _bodyFrame.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ }
+
+ public final int getChannel()
+ {
+ return _channel;
+ }
+
+ public final AMQBody getBodyFrame()
+ {
+ return _bodyFrame;
+ }
+
+ public String toString()
+ {
+ return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
+ }
+
+ public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+ {
+ buffer.put(body.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+ body.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body3.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
+ body3.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
new file mode 100644
index 0000000000..2373edb478
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQFrameDecodingException indicates that an AMQP frame cannot be decoded because it does not have the correct
+ * format as defined by the protocol.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents a format error in a protocol frame.
+ * </table>
+ */
+public class AMQFrameDecodingException extends AMQException
+{
+ public AMQFrameDecodingException(AMQConstant errorCode, String message, Throwable cause)
+ {
+ super(errorCode, message, cause);
+ }
+
+ public AMQFrameDecodingException(AMQConstant errorCode, String message)
+ {
+ super(errorCode, message, null);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
new file mode 100644
index 0000000000..4763b22290
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public interface AMQMethodBody extends AMQBody
+{
+ public static final byte TYPE = 1;
+
+ /** AMQP version */
+ public byte getMajor();
+
+ public byte getMinor();
+
+
+
+ /** @return unsigned short */
+ public int getClazz();
+
+ /** @return unsigned short */
+ public int getMethod();
+
+ public void writeMethodPayload(ByteBuffer buffer);
+
+
+ public int getSize();
+
+ public void writePayload(ByteBuffer buffer);
+
+ //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
+
+ //public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+
+ public AMQFrame generateFrame(int channelId);
+
+ public String toString();
+
+
+
+ /**
+ * Convenience Method to create a channel not found exception
+ *
+ * @param channelId The channel id that is not found
+ *
+ * @return new AMQChannelException
+ */
+ public AMQChannelException getChannelNotFoundException(int channelId);
+
+ public AMQChannelException getChannelException(AMQConstant code, String message);
+
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause);
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message);
+
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause);
+
+
+ public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
new file mode 100644
index 0000000000..1a7022c11b
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQMethodBodyFactory implements BodyFactory
+{
+ private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
+
+ private final AMQVersionAwareProtocolSession _protocolSession;
+
+ public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+ {
+ _protocolSession = protocolSession;
+ }
+
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ {
+ return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
new file mode 100644
index 0000000000..cd3d721065
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -0,0 +1,258 @@
+package org.apache.qpid.framing;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+public abstract class AMQMethodBodyImpl implements AMQMethodBody
+{
+ public static final byte TYPE = 1;
+
+ public AMQMethodBodyImpl()
+ {
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+
+ /** unsigned short */
+ abstract protected int getBodySize();
+
+
+ public AMQFrame generateFrame(int channelId)
+ {
+ return new AMQFrame(channelId, this);
+ }
+
+ /**
+ * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and
+ * method ids of the body it resulted from).
+ */
+
+ /**
+ * Convenience Method to create a channel not found exception
+ *
+ * @param channelId The channel id that is not found
+ *
+ * @return new AMQChannelException
+ */
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId);
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null);
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), null);
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+ {
+ session.methodFrameReceived(channelId, this);
+ }
+
+ public int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+
+ protected byte readByte(ByteBuffer buffer)
+ {
+ return buffer.get();
+ }
+
+ protected AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return EncodingUtils.readAMQShortString(buffer);
+ }
+
+ protected int getSizeOf(AMQShortString string)
+ {
+ return EncodingUtils.encodedShortStringLength(string);
+ }
+
+ protected void writeByte(ByteBuffer buffer, byte b)
+ {
+ buffer.put(b);
+ }
+
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, string);
+ }
+
+ protected int readInt(ByteBuffer buffer)
+ {
+ return buffer.getInt();
+ }
+
+ protected void writeInt(ByteBuffer buffer, int i)
+ {
+ buffer.putInt(i);
+ }
+
+ protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ return EncodingUtils.readFieldTable(buffer);
+ }
+
+ protected int getSizeOf(FieldTable table)
+ {
+ return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, table);
+ }
+
+ protected long readLong(ByteBuffer buffer)
+ {
+ return buffer.getLong();
+ }
+
+ protected void writeLong(ByteBuffer buffer, long l)
+ {
+ buffer.putLong(l);
+ }
+
+ protected int getSizeOf(byte[] response)
+ {
+ return (response == null) ? 4 : response.length + 4;
+ }
+
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
+ {
+ EncodingUtils.writeBytes(buffer,data);
+ }
+
+ protected byte[] readBytes(ByteBuffer buffer)
+ {
+ return EncodingUtils.readBytes(buffer);
+ }
+
+ protected short readShort(ByteBuffer buffer)
+ {
+ return EncodingUtils.readShort(buffer);
+ }
+
+ protected void writeShort(ByteBuffer buffer, short s)
+ {
+ EncodingUtils.writeShort(buffer, s);
+ }
+
+ protected Content readContent(ByteBuffer buffer)
+ {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int getSizeOf(Content body)
+ {
+ return 0; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeContent(ByteBuffer buffer, Content body)
+ {
+ //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected byte readBitfield(ByteBuffer buffer)
+ {
+ return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int readUnsignedShort(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+ {
+ buffer.put(bitfield0);
+ }
+
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, s);
+ }
+
+ protected long readUnsignedInteger(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedInt();
+ }
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, i);
+ }
+
+
+ protected short readUnsignedByte(ByteBuffer buffer)
+ {
+ return buffer.getUnsigned();
+ }
+
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+ {
+ EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
+ }
+
+ protected long readTimestamp(ByteBuffer buffer)
+ {
+ return EncodingUtils.readTimestamp(buffer);
+ }
+
+ protected void writeTimestamp(ByteBuffer buffer, long t)
+ {
+ EncodingUtils.writeTimestamp(buffer, t);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
new file mode 100644
index 0000000000..0c61d9db3c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public abstract interface AMQMethodBodyInstanceFactory
+{
+ public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java
new file mode 100644
index 0000000000..bfcc38ad60
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodFactory.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public interface AMQMethodFactory
+{
+
+ // Connection Methods
+
+ ConnectionCloseBody createConnectionClose();
+
+ // Access Methods
+
+ AccessRequestBody createAccessRequest(boolean active, boolean exclusive, boolean passive, boolean read, AMQShortString realm, boolean write);
+
+
+ // Tx Methods
+
+ TxSelectBody createTxSelect();
+
+ TxCommitBody createTxCommit();
+
+ TxRollbackBody createTxRollback();
+
+ // Channel Methods
+
+ ChannelOpenBody createChannelOpen();
+
+ ChannelCloseBody createChannelClose(int replyCode, AMQShortString replyText);
+
+ ChannelFlowBody createChannelFlow(boolean active);
+
+
+ // Exchange Methods
+
+
+ ExchangeBoundBody createExchangeBound(AMQShortString exchangeName,
+ AMQShortString queueName,
+ AMQShortString routingKey);
+
+ ExchangeDeclareBody createExchangeDeclare(AMQShortString name, AMQShortString type, int ticket);
+
+
+ // Queue Methods
+
+ QueueDeclareBody createQueueDeclare(AMQShortString name, FieldTable arguments, boolean autoDelete, boolean durable, boolean exclusive, boolean passive, int ticket);
+
+ QueueBindBody createQueueBind(AMQShortString queueName, AMQShortString exchangeName, AMQShortString routingKey, FieldTable arguments, int ticket);
+
+ QueueDeleteBody createQueueDelete(AMQShortString queueName, boolean ifEmpty, boolean ifUnused, int ticket);
+
+
+ // Message Methods
+
+ // In different versions of the protocol we change the class used for message transfer
+ // abstract this out so the appropriate methods are created
+ AMQMethodBody createRecover(boolean requeue);
+
+ AMQMethodBody createConsumer(AMQShortString tag, AMQShortString queueName, FieldTable arguments, boolean noAck, boolean exclusive, boolean noLocal, int ticket);
+
+ AMQMethodBody createConsumerCancel(AMQShortString consumerTag);
+
+ AMQMethodBody createAcknowledge(long deliveryTag, boolean multiple);
+
+ AMQMethodBody createRejectBody(long deliveryTag, boolean requeue);
+
+ AMQMethodBody createMessageQos(int prefetchCount, int prefetchSize);
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java
new file mode 100644
index 0000000000..ab09c1de6d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+/**
+ * AMQProtocolInstanceException indicates that the protocol class is incorrect in a header.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent incorrect protocol class in frame header.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public class AMQProtocolClassException extends AMQProtocolHeaderException
+{
+ public AMQProtocolClassException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
new file mode 100644
index 0000000000..6b819364da
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * AMQProtocolHeaderException indicates a format error in an AMQP frame header.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent format error in frame header.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public class AMQProtocolHeaderException extends AMQException
+{
+ public AMQProtocolHeaderException(String message, Throwable cause)
+ {
+ super(null, message, cause);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java
new file mode 100644
index 0000000000..3165c373a9
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+/**
+ * AMQProtocolInstanceException indicates that the protocol instance is incorrect in a header.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent incorrect protocol instance in frame header.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public class AMQProtocolInstanceException extends AMQProtocolHeaderException
+{
+ public AMQProtocolInstanceException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java
new file mode 100644
index 0000000000..c9b0973ea6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+/**
+ * AMQProtocolInstanceException indicates that the client and server differ on expected protocol version in a header.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent incorrect protocol version in frame header.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public class AMQProtocolVersionException extends AMQProtocolHeaderException
+{
+ public AMQProtocolVersionException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
new file mode 100644
index 0000000000..39a9beb9e8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -0,0 +1,779 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.lang.ref.WeakReference;
+
+/**
+ * A short string is a representation of an AMQ Short String
+ * Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
+ * and thus can be held more effectively in a byte buffer.
+ *
+ */
+public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
+{
+ private static final byte MINUS = (byte)'-';
+ private static final byte ZERO = (byte) '0';
+
+ private final class TokenizerImpl implements AMQShortStringTokenizer
+ {
+ private final byte _delim;
+ private int _count = -1;
+ private int _pos = 0;
+
+ public TokenizerImpl(final byte delim)
+ {
+ _delim = delim;
+ }
+
+ public int countTokens()
+ {
+ if(_count == -1)
+ {
+ _count = 1 + AMQShortString.this.occurences(_delim);
+ }
+ return _count;
+ }
+
+ public AMQShortString nextToken()
+ {
+ if(_pos <= AMQShortString.this.length())
+ {
+ int nextDelim = AMQShortString.this.indexOf(_delim, _pos);
+ if(nextDelim == -1)
+ {
+ nextDelim = AMQShortString.this.length();
+ }
+
+ AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++);
+ _pos = nextDelim;
+ return nextToken;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean hasMoreTokens()
+ {
+ return _pos <= AMQShortString.this.length();
+ }
+ }
+
+ private AMQShortString substring(final int from, final int to)
+ {
+ return new AMQShortString(_data, from+_offset, to+_offset);
+ }
+
+
+ private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
+ new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
+ {
+ protected Map<AMQShortString, WeakReference<AMQShortString>> initialValue()
+ {
+ return new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+ };
+ };
+
+ private static final Map<AMQShortString, WeakReference<AMQShortString>> _globalInternMap =
+ new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+
+ private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
+
+ private final byte[] _data;
+ private final int _offset;
+ private int _hashCode;
+ private String _asString = null;
+
+ private final int _length;
+ private static final char[] EMPTY_CHAR_ARRAY = new char[0];
+
+ public static final AMQShortString EMPTY_STRING = new AMQShortString((String)null);
+
+ public AMQShortString(byte[] data)
+ {
+
+ _data = data.clone();
+ _length = data.length;
+ _offset = 0;
+ }
+
+ public AMQShortString(byte[] data, int pos)
+ {
+ final int size = data[pos++];
+ final byte[] dataCopy = new byte[size];
+ System.arraycopy(data,pos,dataCopy,0,size);
+ _length = size;
+ _data = dataCopy;
+ _offset = 0;
+ }
+
+ public AMQShortString(String data)
+ {
+ this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
+ _asString = data;
+ }
+
+ public AMQShortString(char[] data)
+ {
+ if (data == null)
+ {
+ throw new NullPointerException("Cannot create AMQShortString with null char[]");
+ }
+
+ final int length = data.length;
+ final byte[] stringBytes = new byte[length];
+ int hash = 0;
+ for (int i = 0; i < length; i++)
+ {
+ stringBytes[i] = (byte) (0xFF & data[i]);
+ hash = (31 * hash) + stringBytes[i];
+ }
+ _hashCode = hash;
+ _data = stringBytes;
+
+ _length = length;
+ _offset = 0;
+
+ }
+
+ public AMQShortString(CharSequence charSequence)
+ {
+ final int length = charSequence.length();
+ final byte[] stringBytes = new byte[length];
+ int hash = 0;
+ for (int i = 0; i < length; i++)
+ {
+ stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i)));
+ hash = (31 * hash) + stringBytes[i];
+
+ }
+
+ _data = stringBytes;
+ _hashCode = hash;
+ _length = length;
+ _offset = 0;
+
+ }
+
+ private AMQShortString(ByteBuffer data, final int length)
+ {
+ if(data.isDirect() || data.isReadOnly())
+ {
+ byte[] dataBytes = new byte[length];
+ data.get(dataBytes);
+ _data = dataBytes;
+ _offset = 0;
+ }
+ else
+ {
+
+ _data = data.array();
+ _offset = data.arrayOffset() + data.position();
+ data.skip(length);
+
+ }
+ _length = length;
+
+ }
+
+ private AMQShortString(final byte[] data, final int from, final int to)
+ {
+ _offset = from;
+ _length = to - from;
+ _data = data;
+ }
+
+ public AMQShortString shrink()
+ {
+ if(_data.length != _length)
+ {
+ byte[] dataBytes = new byte[_length];
+ System.arraycopy(_data,_offset,dataBytes,0,_length);
+ return new AMQShortString(dataBytes,0,_length);
+ }
+ else
+ {
+ return this;
+ }
+ }
+
+ /**
+ * Get the length of the short string
+ * @return length of the underlying byte array
+ */
+ public int length()
+ {
+ return _length;
+ }
+
+ public char charAt(int index)
+ {
+
+ return (char) _data[_offset + index];
+
+ }
+
+ public CharSequence subSequence(int start, int end)
+ {
+ return new CharSubSequence(start, end);
+ }
+
+ public int writeToByteArray(byte[] encoding, int pos)
+ {
+ final int size = length();
+ encoding[pos++] = (byte) size;
+ System.arraycopy(_data,_offset,encoding,pos,size);
+ return pos+size;
+ }
+
+ public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
+ {
+
+
+ final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+ if(shortString.length() == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return shortString;
+ }
+ }
+
+ public static AMQShortString readFromBuffer(ByteBuffer buffer)
+ {
+ final short length = buffer.getUnsigned();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+
+ return new AMQShortString(buffer, length);
+ }
+ }
+
+ public byte[] getBytes()
+ {
+ if(_offset == 0 && _length == _data.length)
+ {
+ return _data.clone();
+ }
+ else
+ {
+ byte[] data = new byte[_length];
+ System.arraycopy(_data,_offset,data,0,_length);
+ return data;
+ }
+ }
+
+ public void writeToBuffer(ByteBuffer buffer)
+ {
+
+ final int size = length();
+ //buffer.setAutoExpand(true);
+ buffer.put((byte) size);
+ buffer.put(_data, _offset, size);
+
+ }
+
+ public boolean endsWith(String s)
+ {
+ return endsWith(new AMQShortString(s));
+ }
+
+
+ public boolean endsWith(AMQShortString otherString)
+ {
+
+ if (otherString.length() > length())
+ {
+ return false;
+ }
+
+
+ int thisLength = length();
+ int otherLength = otherString.length();
+
+ for (int i = 1; i <= otherLength; i++)
+ {
+ if (charAt(thisLength - i) != otherString.charAt(otherLength - i))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean startsWith(String s)
+ {
+ return startsWith(new AMQShortString(s));
+ }
+
+ public boolean startsWith(AMQShortString otherString)
+ {
+
+ if (otherString.length() > length())
+ {
+ return false;
+ }
+
+ for (int i = 0; i < otherString.length(); i++)
+ {
+ if (charAt(i) != otherString.charAt(i))
+ {
+ return false;
+ }
+ }
+
+ return true;
+
+ }
+
+ public boolean startsWith(CharSequence otherString)
+ {
+ if (otherString.length() > length())
+ {
+ return false;
+ }
+
+ for (int i = 0; i < otherString.length(); i++)
+ {
+ if (charAt(i) != otherString.charAt(i))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+ private final class CharSubSequence implements CharSequence
+ {
+ private final int _sequenceOffset;
+ private final int _end;
+
+ public CharSubSequence(final int offset, final int end)
+ {
+ _sequenceOffset = offset;
+ _end = end;
+ }
+
+ public int length()
+ {
+ return _end - _sequenceOffset;
+ }
+
+ public char charAt(int index)
+ {
+ return AMQShortString.this.charAt(index + _sequenceOffset);
+ }
+
+ public CharSequence subSequence(int start, int end)
+ {
+ return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
+ }
+ }
+
+ public char[] asChars()
+ {
+ final int size = length();
+ final char[] chars = new char[size];
+
+ for (int i = 0; i < size; i++)
+ {
+ chars[i] = (char) _data[i + _offset];
+ }
+
+ return chars;
+ }
+
+
+ public String asString()
+ {
+ if (_asString == null)
+ {
+ _asString = new String(asChars());
+ }
+ return _asString;
+ }
+
+ public boolean equals(Object o)
+ {
+
+
+ if(o instanceof AMQShortString)
+ {
+ return equals((AMQShortString)o);
+ }
+ if(o instanceof CharSequence)
+ {
+ return equals((CharSequence)o);
+ }
+
+ if (o == null)
+ {
+ return false;
+ }
+
+ if (o == this)
+ {
+ return true;
+ }
+
+
+ return false;
+
+ }
+
+ public boolean equals(final AMQShortString otherString)
+ {
+ if (otherString == this)
+ {
+ return true;
+ }
+
+ if (otherString == null)
+ {
+ return false;
+ }
+
+ final int hashCode = _hashCode;
+
+ final int otherHashCode = otherString._hashCode;
+
+ if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
+ {
+ return false;
+ }
+
+ final int length = _length;
+
+ if(length != otherString._length)
+ {
+ return false;
+ }
+
+
+ final byte[] data = _data;
+
+ final byte[] otherData = otherString._data;
+
+ final int offset = _offset;
+
+ final int otherOffset = otherString._offset;
+
+ if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+ {
+ return Arrays.equals(data, otherData);
+ }
+ else
+ {
+ int thisIdx = offset;
+ int otherIdx = otherOffset;
+ for(int i = length; i-- != 0; )
+ {
+ if(!(data[thisIdx++] == otherData[otherIdx++]))
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
+
+ }
+
+ public boolean equals(CharSequence s)
+ {
+ if(s instanceof AMQShortString)
+ {
+ return equals((AMQShortString)s);
+ }
+
+ if (s == null)
+ {
+ return false;
+ }
+
+ if (s.length() != length())
+ {
+ return false;
+ }
+
+ for (int i = 0; i < length(); i++)
+ {
+ if (charAt(i) != s.charAt(i))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int hash = _hashCode;
+ if (hash == 0)
+ {
+ final int size = length();
+
+ for (int i = 0; i < size; i++)
+ {
+ hash = (31 * hash) + _data[i+_offset];
+ }
+
+ _hashCode = hash;
+ }
+
+ return hash;
+ }
+
+ public void setDirty()
+ {
+ _hashCode = 0;
+ }
+
+ public String toString()
+ {
+ return asString();
+ }
+
+ public int compareTo(AMQShortString name)
+ {
+ if (name == null)
+ {
+ return 1;
+ }
+ else
+ {
+
+ if (name.length() < length())
+ {
+ return -name.compareTo(this);
+ }
+
+ for (int i = 0; i < length(); i++)
+ {
+ final byte d = _data[i+_offset];
+ final byte n = name._data[i+name._offset];
+ if (d < n)
+ {
+ return -1;
+ }
+
+ if (d > n)
+ {
+ return 1;
+ }
+ }
+
+ return (length() == name.length()) ? 0 : -1;
+ }
+ }
+
+
+ public AMQShortStringTokenizer tokenize(byte delim)
+ {
+ return new TokenizerImpl(delim);
+ }
+
+
+ public AMQShortString intern()
+ {
+
+ hashCode();
+
+ Map<AMQShortString, WeakReference<AMQShortString>> localMap =
+ _localInternMap.get();
+
+ WeakReference<AMQShortString> ref = localMap.get(this);
+ AMQShortString internString;
+
+ if(ref != null)
+ {
+ internString = ref.get();
+ if(internString != null)
+ {
+ return internString;
+ }
+ }
+
+
+ synchronized(_globalInternMap)
+ {
+
+ ref = _globalInternMap.get(this);
+ if((ref == null) || ((internString = ref.get()) == null))
+ {
+ internString = shrink();
+ ref = new WeakReference(internString);
+ _globalInternMap.put(internString, ref);
+ }
+
+ }
+ localMap.put(internString, ref);
+ return internString;
+
+ }
+
+ private int occurences(final byte delim)
+ {
+ int count = 0;
+ final int end = _offset + _length;
+ for(int i = _offset ; i < end ; i++ )
+ {
+ if(_data[i] == delim)
+ {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private int indexOf(final byte val, final int pos)
+ {
+
+ for(int i = pos; i < length(); i++)
+ {
+ if(_data[_offset+i] == val)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+
+ public static AMQShortString join(final Collection<AMQShortString> terms,
+ final AMQShortString delim)
+ {
+ if(terms.size() == 0)
+ {
+ return EMPTY_STRING;
+ }
+
+ int size = delim.length() * (terms.size() - 1);
+ for(AMQShortString term : terms)
+ {
+ size += term.length();
+ }
+
+ byte[] data = new byte[size];
+ int pos = 0;
+ final byte[] delimData = delim._data;
+ final int delimOffset = delim._offset;
+ final int delimLength = delim._length;
+
+
+ for(AMQShortString term : terms)
+ {
+
+ if(pos!=0)
+ {
+ System.arraycopy(delimData, delimOffset,data,pos, delimLength);
+ pos+=delimLength;
+ }
+ System.arraycopy(term._data,term._offset,data,pos,term._length);
+ pos+=term._length;
+ }
+
+
+
+ return new AMQShortString(data,0,size);
+ }
+
+ public int toIntValue()
+ {
+ int pos = _offset;
+ int val = 0;
+
+
+ boolean isNegative = (_data[pos] == MINUS);
+ if(isNegative)
+ {
+ pos++;
+ }
+
+ final int end = _length + _offset;
+
+ while(pos < end)
+ {
+ int digit = (int) (_data[pos++] - ZERO);
+ if((digit < 0) || (digit > 9))
+ {
+ throw new NumberFormatException("\""+toString()+"\" is not a valid number");
+ }
+ val = val * 10;
+ val += digit;
+ }
+ if(isNegative)
+ {
+ val = val * -1;
+ }
+ return val;
+ }
+
+ public boolean contains(final byte b)
+ {
+ final int end = _length + _offset;
+ for(int i = _offset; i < end; i++)
+ {
+ if(_data[i] == b)
+ {
+ return true;
+ }
+ }
+ return false; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ public static AMQShortString valueOf(Object obj)
+ {
+ return obj == null ? null : new AMQShortString(String.valueOf(obj));
+ }
+
+
+ public static void main(String args[])
+ {
+ AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k");
+ AMQShortString s2 = s.substring(2, 7);
+
+ AMQShortStringTokenizer t = s2.tokenize((byte) '.');
+ while(t.hasMoreTokens())
+ {
+ System.err.println(t.nextToken());
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
new file mode 100644
index 0000000000..e2db8906a1
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java
@@ -0,0 +1,31 @@
+package org.apache.qpid.framing;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface AMQShortStringTokenizer
+{
+
+ public int countTokens();
+
+ public AMQShortString nextToken();
+
+ boolean hasMoreTokens();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
new file mode 100644
index 0000000000..14fb63da03
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
@@ -0,0 +1,795 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.math.BigDecimal;
+
+/**
+ * AMQType is a type that represents the different possible AMQP field table types. It provides operations for each
+ * of the types to perform tasks such as calculating the size of an instance of the type, converting types between AMQP
+ * and Java native types, and reading and writing instances of AMQP types in binary formats to and from byte buffers.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Get the equivalent one byte identifier for a type.
+ * <tr><td> Calculate the size of an instance of an AMQP parameter type. <td> {@link EncodingUtils}
+ * <tr><td> Convert an instance of an AMQP parameter into a compatable Java object tagged with its AMQP type.
+ * <td> {@link AMQTypedValue}
+ * <tr><td> Write an instance of an AMQP parameter type to a byte buffer. <td> {@link EncodingUtils}
+ * <tr><td> Read an instance of an AMQP parameter from a byte buffer. <td> {@link EncodingUtils}
+ * </table>
+ */
+public enum AMQType
+{
+ LONG_STRING('S')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedLongStringLength((String) value);
+ }
+
+ public String toNativeValue(Object value)
+ {
+ if (value != null)
+ {
+ return value.toString();
+ }
+ else
+ {
+ throw new NullPointerException("Cannot convert: null to String.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeLongStringBytes(buffer, (String) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readLongString(buffer);
+ }
+ },
+
+ INTEGER('i')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.unsignedIntegerLength();
+ }
+
+ public Long toNativeValue(Object value)
+ {
+ if (value instanceof Long)
+ {
+ return (Long) value;
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String) value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + ") to int.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readUnsignedInteger(buffer);
+ }
+ },
+
+ DECIMAL('D')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedByteLength() + EncodingUtils.encodedIntegerLength();
+ }
+
+ public Object toNativeValue(Object value)
+ {
+ if (value instanceof BigDecimal)
+ {
+ return (BigDecimal) value;
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to BigDecimal.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ BigDecimal bd = (BigDecimal) value;
+
+ byte places = new Integer(bd.scale()).byteValue();
+
+ int unscaled = bd.intValue();
+
+ EncodingUtils.writeByte(buffer, places);
+
+ EncodingUtils.writeInteger(buffer, unscaled);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ byte places = EncodingUtils.readByte(buffer);
+
+ int unscaled = EncodingUtils.readInteger(buffer);
+
+ BigDecimal bd = new BigDecimal(unscaled);
+
+ return bd.setScale(places);
+ }
+ },
+
+ TIMESTAMP('T')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedLongLength();
+ }
+
+ public Object toNativeValue(Object value)
+ {
+ if (value instanceof Long)
+ {
+ return (Long) value;
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to timestamp.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeLong(buffer, (Long) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readLong(buffer);
+ }
+ },
+
+ /**
+ * Implements the field table type. The native value of a field table type will be an instance of
+ * {@link FieldTable}, which itself may contain name/value pairs encoded as {@link AMQTypedValue}s.
+ */
+ FIELD_TABLE('F')
+ {
+ /**
+ * Calculates the size of an instance of the type in bytes.
+ *
+ * @param value An instance of the type.
+ *
+ * @return The size of the instance of the type in bytes.
+ */
+ public int getEncodingSize(Object value)
+ {
+ // Ensure that the value is a FieldTable.
+ if (!(value instanceof FieldTable))
+ {
+ throw new IllegalArgumentException("Value is not a FieldTable.");
+ }
+
+ FieldTable ftValue = (FieldTable) value;
+
+ // Loop over all name/value pairs adding up size of each. FieldTable itself keeps track of its encoded
+ // size as entries are added, so no need to loop over all explicitly.
+ // EncodingUtils calculation of the encoded field table lenth, will include 4 bytes for its 'size' field.
+ return EncodingUtils.encodedFieldTableLength(ftValue);
+ }
+
+ /**
+ * Converts an instance of the type to an equivalent Java native representation.
+ *
+ * @param value An instance of the type.
+ *
+ * @return An equivalent Java native representation.
+ */
+ public Object toNativeValue(Object value)
+ {
+ // Ensure that the value is a FieldTable.
+ if (!(value instanceof FieldTable))
+ {
+ throw new IllegalArgumentException("Value is not a FieldTable.");
+ }
+
+ return (FieldTable) value;
+ }
+
+ /**
+ * Writes an instance of the type to a specified byte buffer.
+ *
+ * @param value An instance of the type.
+ * @param buffer The byte buffer to write it to.
+ */
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ // Ensure that the value is a FieldTable.
+ if (!(value instanceof FieldTable))
+ {
+ throw new IllegalArgumentException("Value is not a FieldTable.");
+ }
+
+ FieldTable ftValue = (FieldTable) value;
+
+ // Loop over all name/values writing out into buffer.
+ ftValue.writeToBuffer(buffer);
+ }
+
+ /**
+ * Reads an instance of the type from a specified byte buffer.
+ *
+ * @param buffer The byte buffer to write it to.
+ *
+ * @return An instance of the type.
+ */
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ try
+ {
+ // Read size of field table then all name/value pairs.
+ return EncodingUtils.readFieldTable(buffer);
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new IllegalArgumentException("Unable to read field table from buffer.", e);
+ }
+ }
+ },
+
+ VOID('V')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return 0;
+ }
+
+ public Object toNativeValue(Object value)
+ {
+ if (value == null)
+ {
+ return null;
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to null String.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ { }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return null;
+ }
+ },
+
+ BINARY('x')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedLongstrLength((byte[]) value);
+ }
+
+ public Object toNativeValue(Object value)
+ {
+ if ((value instanceof byte[]) || (value == null))
+ {
+ return value;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Value: " + value + " (" + value.getClass().getName()
+ + ") cannot be converted to byte[]");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeLongstr(buffer, (byte[]) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readLongstr(buffer);
+ }
+ },
+
+ ASCII_STRING('c')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedLongStringLength((String) value);
+ }
+
+ public String toNativeValue(Object value)
+ {
+ if (value != null)
+ {
+ return value.toString();
+ }
+ else
+ {
+ throw new NullPointerException("Cannot convert: null to String.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeLongStringBytes(buffer, (String) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readLongString(buffer);
+ }
+ },
+
+ WIDE_STRING('C')
+ {
+ public int getEncodingSize(Object value)
+ {
+ // FIXME: use proper charset encoder
+ return EncodingUtils.encodedLongStringLength((String) value);
+ }
+
+ public String toNativeValue(Object value)
+ {
+ if (value != null)
+ {
+ return value.toString();
+ }
+ else
+ {
+ throw new NullPointerException("Cannot convert: null to String.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeLongStringBytes(buffer, (String) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readLongString(buffer);
+ }
+ },
+
+ BOOLEAN('t')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedBooleanLength();
+ }
+
+ public Object toNativeValue(Object value)
+ {
+ if (value instanceof Boolean)
+ {
+ return (Boolean) value;
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Boolean.valueOf((String) value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to boolean.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeBoolean(buffer, (Boolean) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readBoolean(buffer);
+ }
+ },
+
+ ASCII_CHARACTER('k')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedCharLength();
+ }
+
+ public Character toNativeValue(Object value)
+ {
+ if (value instanceof Character)
+ {
+ return (Character) value;
+ }
+ else if (value == null)
+ {
+ throw new NullPointerException("Cannot convert null into char");
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to char.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeChar(buffer, (Character) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readChar(buffer);
+ }
+ },
+
+ BYTE('b')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedByteLength();
+ }
+
+ public Byte toNativeValue(Object value)
+ {
+ if (value instanceof Byte)
+ {
+ return (Byte) value;
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Byte.valueOf((String) value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to byte.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeByte(buffer, (Byte) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readByte(buffer);
+ }
+ },
+
+ SHORT('s')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedShortLength();
+ }
+
+ public Short toNativeValue(Object value)
+ {
+ if (value instanceof Short)
+ {
+ return (Short) value;
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).shortValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Short.valueOf((String) value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to short.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeShort(buffer, (Short) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readShort(buffer);
+ }
+ },
+
+ INT('I')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedIntegerLength();
+ }
+
+ public Integer toNativeValue(Object value)
+ {
+ if (value instanceof Integer)
+ {
+ return (Integer) value;
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).intValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).intValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Integer.valueOf((String) value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + ") to int.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeInteger(buffer, (Integer) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readInteger(buffer);
+ }
+ },
+
+ LONG('l')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedLongLength();
+ }
+
+ public Object toNativeValue(Object value)
+ {
+ if (value instanceof Long)
+ {
+ return (Long) value;
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String) value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to long.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeLong(buffer, (Long) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readLong(buffer);
+ }
+ },
+
+ FLOAT('f')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedFloatLength();
+ }
+
+ public Float toNativeValue(Object value)
+ {
+ if (value instanceof Float)
+ {
+ return (Float) value;
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Float.valueOf((String) value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to float.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeFloat(buffer, (Float) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readFloat(buffer);
+ }
+ },
+
+ DOUBLE('d')
+ {
+ public int getEncodingSize(Object value)
+ {
+ return EncodingUtils.encodedDoubleLength();
+ }
+
+ public Double toNativeValue(Object value)
+ {
+ if (value instanceof Double)
+ {
+ return (Double) value;
+ }
+ else if (value instanceof Float)
+ {
+ return ((Float) value).doubleValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Double.valueOf((String) value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName()
+ + ") to double.");
+ }
+ }
+
+ public void writeValueImpl(Object value, ByteBuffer buffer)
+ {
+ EncodingUtils.writeDouble(buffer, (Double) value);
+ }
+
+ public Object readValueFromBuffer(ByteBuffer buffer)
+ {
+ return EncodingUtils.readDouble(buffer);
+ }
+ };
+
+ /** Holds the defined one byte identifier for the type. */
+ private final byte _identifier;
+
+ /**
+ * Creates an instance of an AMQP type from its defined one byte identifier.
+ *
+ * @param identifier The one byte identifier for the type.
+ */
+ AMQType(char identifier)
+ {
+ _identifier = (byte) identifier;
+ }
+
+ /**
+ * Extracts the byte identifier for the typ.
+ *
+ * @return The byte identifier for the typ.
+ */
+ public final byte identifier()
+ {
+ return _identifier;
+ }
+
+ /**
+ * Calculates the size of an instance of the type in bytes.
+ *
+ * @param value An instance of the type.
+ *
+ * @return The size of the instance of the type in bytes.
+ */
+ public abstract int getEncodingSize(Object value);
+
+ /**
+ * Converts an instance of the type to an equivalent Java native representation.
+ *
+ * @param value An instance of the type.
+ *
+ * @return An equivalent Java native representation.
+ */
+ public abstract Object toNativeValue(Object value);
+
+ /**
+ * Converts an instance of the type to an equivalent Java native representation, packaged as an
+ * {@link AMQTypedValue} tagged with its AMQP type.
+ *
+ * @param value An instance of the type.
+ *
+ * @return An equivalent Java native representation, tagged with its AMQP type.
+ */
+ public AMQTypedValue asTypedValue(Object value)
+ {
+ return new AMQTypedValue(this, toNativeValue(value));
+ }
+
+ /**
+ * Writes an instance of the type to a specified byte buffer, preceded by its one byte identifier. As the type and
+ * value are both written, this provides a fully encoded description of a parameters type and value.
+ *
+ * @param value An instance of the type.
+ * @param buffer The byte buffer to write it to.
+ */
+ public void writeToBuffer(Object value, ByteBuffer buffer)
+ {
+ buffer.put(identifier());
+ writeValueImpl(value, buffer);
+ }
+
+ /**
+ * Writes an instance of the type to a specified byte buffer.
+ *
+ * @param value An instance of the type.
+ * @param buffer The byte buffer to write it to.
+ */
+ abstract void writeValueImpl(Object value, ByteBuffer buffer);
+
+ /**
+ * Reads an instance of the type from a specified byte buffer.
+ *
+ * @param buffer The byte buffer to write it to.
+ *
+ * @return An instance of the type.
+ */
+ abstract Object readValueFromBuffer(ByteBuffer buffer);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java
new file mode 100644
index 0000000000..a07fd78c8c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AMQTypeMap
+{
+ public static final Map<Byte, AMQType> _reverseTypeMap = new HashMap<Byte, AMQType>();
+
+ static
+ {
+ for(AMQType type : AMQType.values())
+ {
+ _reverseTypeMap.put(type.identifier(), type);
+ }
+ }
+
+ public static AMQType getType(Byte identifier)
+ {
+ AMQType result = _reverseTypeMap.get(identifier);
+ if (result == null) {
+ throw new IllegalArgumentException
+ ("no such type code: " + Integer.toHexString(identifier.intValue()));
+ }
+ return result;
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
new file mode 100644
index 0000000000..647d531476
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Date;
+import java.util.Map;
+import java.math.BigDecimal;
+
+/**
+ * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter
+ * value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides
+ * the ability to create such parameters from Java native value and a type tag or to extract the native value and type
+ * from one.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create a fully typed AMQP value from a native type and a type tag. <td> {@link AMQType}
+ * <tr><td> Create a fully typed AMQP value from a binary representation in a byte buffer. <td> {@link AMQType}
+ * <tr><td> Write a fully typed AMQP value to a binary representation in a byte buffer. <td> {@link AMQType}
+ * <tr><td> Extract the type from a fully typed AMQP value.
+ * <tr><td> Extract the value from a fully typed AMQP value.
+ * </table>
+ */
+public class AMQTypedValue
+{
+ /** The type of the value. */
+ private final AMQType _type;
+
+ /** The Java native representation of the AMQP typed value. */
+ private final Object _value;
+
+ public AMQTypedValue(AMQType type, Object value)
+ {
+ if (type == null)
+ {
+ throw new NullPointerException("Cannot create a typed value with null type");
+ }
+
+ _type = type;
+ _value = type.toNativeValue(value);
+ }
+
+ private AMQTypedValue(AMQType type, ByteBuffer buffer)
+ {
+ _type = type;
+ _value = type.readValueFromBuffer(buffer);
+ }
+
+ public AMQType getType()
+ {
+ return _type;
+ }
+
+ public Object getValue()
+ {
+ return _value;
+ }
+
+ public void writeToBuffer(ByteBuffer buffer)
+ {
+ _type.writeToBuffer(_value, buffer);
+ }
+
+ public int getEncodingSize()
+ {
+ return _type.getEncodingSize(_value);
+ }
+
+ public static AMQTypedValue readFromBuffer(ByteBuffer buffer)
+ {
+ AMQType type = AMQTypeMap.getType(buffer.get());
+
+ return new AMQTypedValue(type, buffer);
+ }
+
+ public String toString()
+ {
+ return "[" + getType() + ": " + getValue() + "]";
+ }
+
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof AMQTypedValue)
+ {
+ AMQTypedValue other = (AMQTypedValue) o;
+ return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+ }
+
+
+ public static AMQTypedValue toTypedValue(Object val)
+ {
+ if(val == null)
+ {
+ return AMQType.VOID.asTypedValue(null);
+ }
+
+ Class klass = val.getClass();
+ if(klass == String.class)
+ {
+ return AMQType.ASCII_STRING.asTypedValue(val);
+ }
+ else if(klass == Character.class)
+ {
+ return AMQType.ASCII_CHARACTER.asTypedValue(val);
+ }
+ else if(klass == Integer.class)
+ {
+ return AMQType.INT.asTypedValue(val);
+ }
+ else if(klass == Long.class)
+ {
+ return AMQType.LONG.asTypedValue(val);
+ }
+ else if(klass == Float.class)
+ {
+ return AMQType.FLOAT.asTypedValue(val);
+ }
+ else if(klass == Double.class)
+ {
+ return AMQType.DOUBLE.asTypedValue(val);
+ }
+ else if(klass == Date.class)
+ {
+ return AMQType.TIMESTAMP.asTypedValue(val);
+ }
+ else if(klass == Byte.class)
+ {
+ return AMQType.BYTE.asTypedValue(val);
+ }
+ else if(klass == Boolean.class)
+ {
+ return AMQType.BOOLEAN.asTypedValue(val);
+ }
+ else if(klass == byte[].class)
+ {
+ return AMQType.BINARY.asTypedValue(val);
+ }
+ else if(klass == BigDecimal.class)
+ {
+ return AMQType.DECIMAL.asTypedValue(val);
+ }
+ else if(val instanceof Map)
+ {
+ return AMQType.FIELD_TABLE.asTypedValue(FieldTable.convertToFieldTable((Map)val));
+ }
+ return null;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
new file mode 100644
index 0000000000..c7d89a9927
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -0,0 +1,838 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BasicContentHeaderProperties implements CommonContentHeaderProperties
+{
+ //persistent & non-persistent constants, values as per JMS DeliveryMode
+ public static final int NON_PERSISTENT = 1;
+ public static final int PERSISTENT = 2;
+
+ private static final Logger _logger = LoggerFactory.getLogger(BasicContentHeaderProperties.class);
+
+ private static final AMQShortString ZERO_STRING = null;
+
+ /**
+ * We store the encoded form when we decode the content header so that if we need to write it out without modifying
+ * it we can do so without incurring the expense of reencoding it
+ */
+ private byte[] _encodedForm;
+
+ /** Flag indicating whether the entire content header has been decoded yet */
+ private boolean _decoded = true;
+
+ /**
+ * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker for
+ * routing in some cases so we can decode that separately.
+ */
+ private boolean _decodedHeaders = true;
+
+ /**
+ * We have some optimisations for partial decoding for maximum performance. The content type is used by all clients
+ * to determine the message type
+ */
+ private boolean _decodedContentType = true;
+
+ private AMQShortString _contentType;
+
+ private AMQShortString _encoding;
+
+ private FieldTable _headers;
+
+ private byte _deliveryMode;
+
+ private byte _priority;
+
+ private AMQShortString _correlationId;
+
+ private AMQShortString _replyTo;
+
+ private long _expiration;
+
+ private AMQShortString _messageId;
+
+ private long _timestamp;
+
+ private AMQShortString _type;
+
+ private AMQShortString _userId;
+
+ private AMQShortString _appId;
+
+ private AMQShortString _clusterId;
+
+ private int _propertyFlags = 0;
+ private static final int CONTENT_TYPE_MASK = 1 << 15;
+ private static final int ENCONDING_MASK = 1 << 14;
+ private static final int HEADERS_MASK = 1 << 13;
+ private static final int DELIVERY_MODE_MASK = 1 << 12;
+ private static final int PROPRITY_MASK = 1 << 11;
+ private static final int CORRELATION_ID_MASK = 1 << 10;
+ private static final int REPLY_TO_MASK = 1 << 9;
+ private static final int EXPIRATION_MASK = 1 << 8;
+ private static final int MESSAGE_ID_MASK = 1 << 7;
+ private static final int TIMESTAMP_MASK = 1 << 6;
+ private static final int TYPE_MASK = 1 << 5;
+ private static final int USER_ID_MASK = 1 << 4;
+ private static final int APPLICATION_ID_MASK = 1 << 3;
+ private static final int CLUSTER_ID_MASK = 1 << 2;
+
+
+ /**
+ * This is 0_10 specific. We use this property to check if some message properties have been changed.
+ */
+ private boolean _hasBeenUpdated = false;
+
+ public boolean reset()
+ {
+ boolean result = _hasBeenUpdated;
+ _hasBeenUpdated = false;
+ return result;
+ }
+
+ public void updated()
+ {
+ _hasBeenUpdated = true;
+ }
+
+ public BasicContentHeaderProperties()
+ { }
+
+ public int getPropertyListSize()
+ {
+ if (_encodedForm != null)
+ {
+ return _encodedForm.length;
+ }
+ else
+ {
+ int size = 0;
+
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_contentType);
+ }
+
+ if ((_propertyFlags & ENCONDING_MASK) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_encoding);
+ }
+
+ if ((_propertyFlags & HEADERS_MASK) > 0)
+ {
+ size += EncodingUtils.encodedFieldTableLength(_headers);
+ }
+
+ if ((_propertyFlags & DELIVERY_MODE_MASK) > 0)
+ {
+ size += 1;
+ }
+
+ if ((_propertyFlags & PROPRITY_MASK) > 0)
+ {
+ size += 1;
+ }
+
+ if ((_propertyFlags & CORRELATION_ID_MASK) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_correlationId);
+ }
+
+ if ((_propertyFlags & REPLY_TO_MASK) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_replyTo);
+ }
+
+ if ((_propertyFlags & EXPIRATION_MASK) > 0)
+ {
+ if (_expiration == 0L)
+ {
+ size += EncodingUtils.encodedShortStringLength(ZERO_STRING);
+ }
+ else
+ {
+ size += EncodingUtils.encodedShortStringLength(_expiration);
+ }
+ }
+
+ if ((_propertyFlags & MESSAGE_ID_MASK) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_messageId);
+ }
+
+ if ((_propertyFlags & TIMESTAMP_MASK) > 0)
+ {
+ size += 8;
+ }
+
+ if ((_propertyFlags & TYPE_MASK) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_type);
+ }
+
+ if ((_propertyFlags & USER_ID_MASK) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_userId);
+ }
+
+ if ((_propertyFlags & APPLICATION_ID_MASK) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_appId);
+ }
+
+ if ((_propertyFlags & CLUSTER_ID_MASK) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_clusterId);
+ }
+
+ return size;
+ }
+ }
+
+ private void clearEncodedForm()
+ {
+ if (!_decoded && (_encodedForm != null))
+ {
+ // decode();
+ }
+
+ _encodedForm = null;
+ }
+
+ public void setPropertyFlags(int propertyFlags)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags = propertyFlags;
+ }
+
+ public int getPropertyFlags()
+ {
+ return _propertyFlags;
+ }
+
+ public void writePropertyListPayload(ByteBuffer buffer)
+ {
+ if (_encodedForm != null)
+ {
+ buffer.put(_encodedForm);
+ }
+ else
+ {
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ }
+
+ if ((_propertyFlags & ENCONDING_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _encoding);
+ }
+
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, _headers);
+ }
+
+ if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
+ {
+ buffer.put(_deliveryMode);
+ }
+
+ if ((_propertyFlags & PROPRITY_MASK) != 0)
+ {
+ buffer.put(_priority);
+ }
+
+ if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+ }
+
+ if ((_propertyFlags & REPLY_TO_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _replyTo);
+ }
+
+ if ((_propertyFlags & EXPIRATION_MASK) != 0)
+ {
+ if (_expiration == 0L)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+ }
+ else
+ {
+ EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ }
+ }
+
+ if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _messageId);
+ }
+
+ if ((_propertyFlags & TIMESTAMP_MASK) != 0)
+ {
+ EncodingUtils.writeTimestamp(buffer, _timestamp);
+ }
+
+ if ((_propertyFlags & TYPE_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _type);
+ }
+
+ if ((_propertyFlags & USER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _userId);
+ }
+
+ if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _appId);
+ }
+
+ if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ }
+ }
+ }
+
+ public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException
+ {
+ _propertyFlags = propertyFlags;
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Property flags: " + _propertyFlags);
+ }
+
+ decode(buffer);
+ /*_encodedForm = new byte[size];
+ buffer.get(_encodedForm, 0, size);
+ _decoded = false;
+ _decodedHeaders = false;
+ _decodedContentType = false;*/
+ }
+
+ private void decode(ByteBuffer buffer)
+ {
+ // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ int pos = buffer.position();
+ try
+ {
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ _contentType = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ if ((_propertyFlags & ENCONDING_MASK) != 0)
+ {
+ _encoding = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ _headers = EncodingUtils.readFieldTable(buffer);
+ }
+
+ if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
+ {
+ _deliveryMode = buffer.get();
+ }
+
+ if ((_propertyFlags & PROPRITY_MASK) != 0)
+ {
+ _priority = buffer.get();
+ }
+
+ if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
+ {
+ _correlationId = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ if ((_propertyFlags & REPLY_TO_MASK) != 0)
+ {
+ _replyTo = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ if ((_propertyFlags & EXPIRATION_MASK) != 0)
+ {
+ _expiration = EncodingUtils.readLongAsShortString(buffer);
+ }
+
+ if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
+ {
+ _messageId = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ if ((_propertyFlags & TIMESTAMP_MASK) != 0)
+ {
+ _timestamp = EncodingUtils.readTimestamp(buffer);
+ }
+
+ if ((_propertyFlags & TYPE_MASK) != 0)
+ {
+ _type = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ if ((_propertyFlags & USER_ID_MASK) != 0)
+ {
+ _userId = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
+ {
+ _appId = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
+ {
+ _clusterId = EncodingUtils.readAMQShortString(buffer);
+ }
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new RuntimeException("Error in content header data: " + e, e);
+ }
+
+ final int endPos = buffer.position();
+ buffer.position(pos);
+ final int len = endPos - pos;
+ _encodedForm = new byte[len];
+ final int limit = buffer.limit();
+ buffer.limit(endPos);
+ buffer.get(_encodedForm, 0, len);
+ buffer.limit(limit);
+ buffer.position(endPos);
+ _decoded = true;
+ }
+
+ private void decodeUpToHeaders()
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ try
+ {
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ byte length = buffer.get();
+ buffer.skip(length);
+ }
+
+ if ((_propertyFlags & ENCONDING_MASK) != 0)
+ {
+ byte length = buffer.get();
+ buffer.skip(length);
+ }
+
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ _headers = EncodingUtils.readFieldTable(buffer);
+
+ }
+
+ _decodedHeaders = true;
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new RuntimeException("Error in content header data: " + e, e);
+ }
+ }
+
+ private void decodeUpToContentType()
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ _contentType = EncodingUtils.readAMQShortString(buffer);
+ }
+
+ _decodedContentType = true;
+ }
+
+ private void decodeIfNecessary()
+ {
+ if (!_decoded)
+ {
+ // decode();
+ }
+ }
+
+ private void decodeHeadersIfNecessary()
+ {
+ if (!_decoded && !_decodedHeaders)
+ {
+ decodeUpToHeaders();
+ }
+ }
+
+ private void decodeContentTypeIfNecessary()
+ {
+ if (!_decoded && !_decodedContentType)
+ {
+ decodeUpToContentType();
+ }
+ }
+
+ public AMQShortString getContentType()
+ {
+ decodeContentTypeIfNecessary();
+
+ return _contentType;
+ }
+
+ public String getContentTypeAsString()
+ {
+ decodeContentTypeIfNecessary();
+
+ return (_contentType == null) ? null : _contentType.toString();
+ }
+
+ public void setContentType(AMQShortString contentType)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= (CONTENT_TYPE_MASK);
+ _contentType = contentType;
+ }
+
+ public void setContentType(String contentType)
+ {
+ _hasBeenUpdated = true;
+ setContentType((contentType == null) ? null : new AMQShortString(contentType));
+ }
+
+ public String getEncodingAsString()
+ {
+
+ return (getEncoding() == null) ? null : getEncoding().toString();
+ }
+
+ public AMQShortString getEncoding()
+ {
+ decodeIfNecessary();
+
+ return _encoding;
+ }
+
+ public void setEncoding(String encoding)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= ENCONDING_MASK;
+ _encoding = (encoding == null) ? null : new AMQShortString(encoding);
+ }
+
+ public void setEncoding(AMQShortString encoding)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= ENCONDING_MASK;
+ _encoding = encoding;
+ }
+
+ public FieldTable getHeaders()
+ {
+ decodeHeadersIfNecessary();
+
+ if (_headers == null)
+ {
+ setHeaders(FieldTableFactory.newFieldTable());
+ }
+
+ return _headers;
+ }
+
+ public void setHeaders(FieldTable headers)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= HEADERS_MASK;
+ _headers = headers;
+ }
+
+ public byte getDeliveryMode()
+ {
+ decodeIfNecessary();
+
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(byte deliveryMode)
+ {
+ clearEncodedForm();
+ _propertyFlags |= DELIVERY_MODE_MASK;
+ _deliveryMode = deliveryMode;
+ }
+
+ public byte getPriority()
+ {
+ decodeIfNecessary();
+
+ return _priority;
+ }
+
+ public void setPriority(byte priority)
+ {
+ clearEncodedForm();
+ _propertyFlags |= PROPRITY_MASK;
+ _priority = priority;
+ }
+
+ public AMQShortString getCorrelationId()
+ {
+ decodeIfNecessary();
+
+ return _correlationId;
+ }
+
+ public String getCorrelationIdAsString()
+ {
+ decodeIfNecessary();
+
+ return (_correlationId == null) ? null : _correlationId.toString();
+ }
+
+ public void setCorrelationId(String correlationId)
+ {
+ _hasBeenUpdated = true;
+ setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId));
+ }
+
+ public void setCorrelationId(AMQShortString correlationId)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= CORRELATION_ID_MASK;
+ _correlationId = correlationId;
+ }
+
+ public String getReplyToAsString()
+ {
+ decodeIfNecessary();
+
+ return (_replyTo == null) ? null : _replyTo.toString();
+ }
+
+ public AMQShortString getReplyTo()
+ {
+ decodeIfNecessary();
+
+ return _replyTo;
+ }
+
+ public void setReplyTo(String replyTo)
+ {
+ _hasBeenUpdated = true;
+ setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo));
+ }
+
+ public void setReplyTo(AMQShortString replyTo)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= REPLY_TO_MASK;
+ _replyTo = replyTo;
+ }
+
+ public long getExpiration()
+ {
+ decodeIfNecessary();
+ return _expiration;
+ }
+
+ public void setExpiration(long expiration)
+ {
+ clearEncodedForm();
+ _propertyFlags |= EXPIRATION_MASK;
+ _expiration = expiration;
+ }
+
+ public AMQShortString getMessageId()
+ {
+ decodeIfNecessary();
+
+ return _messageId;
+ }
+
+ public String getMessageIdAsString()
+ {
+ decodeIfNecessary();
+
+ return (_messageId == null) ? null : _messageId.toString();
+ }
+
+ public void setMessageId(String messageId)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= MESSAGE_ID_MASK;
+ _messageId = (messageId == null) ? null : new AMQShortString(messageId);
+ }
+
+ public void setMessageId(AMQShortString messageId)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= MESSAGE_ID_MASK;
+ _messageId = messageId;
+ }
+
+ public long getTimestamp()
+ {
+ decodeIfNecessary();
+ return _timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ clearEncodedForm();
+ _propertyFlags |= TIMESTAMP_MASK;
+ _timestamp = timestamp;
+ }
+
+ public String getTypeAsString()
+ {
+ decodeIfNecessary();
+
+ return (_type == null) ? null : _type.toString();
+ }
+
+ public AMQShortString getType()
+ {
+ decodeIfNecessary();
+
+ return _type;
+ }
+
+ public void setType(String type)
+ {
+ _hasBeenUpdated = true;
+ setType((type == null) ? null : new AMQShortString(type));
+ }
+
+ public void setType(AMQShortString type)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= TYPE_MASK;
+ _type = type;
+ }
+
+ public String getUserIdAsString()
+ {
+ decodeIfNecessary();
+
+ return (_userId == null) ? null : _userId.toString();
+ }
+
+ public AMQShortString getUserId()
+ {
+ decodeIfNecessary();
+
+ return _userId;
+ }
+
+ public void setUserId(String userId)
+ {
+ setUserId((userId == null) ? null : new AMQShortString(userId));
+ }
+
+ public void setUserId(AMQShortString userId)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= USER_ID_MASK;
+ _userId = userId;
+ }
+
+ public String getAppIdAsString()
+ {
+ decodeIfNecessary();
+
+ return (_appId == null) ? null : _appId.toString();
+ }
+
+ public AMQShortString getAppId()
+ {
+ decodeIfNecessary();
+
+ return _appId;
+ }
+
+ public void setAppId(String appId)
+ {
+ _hasBeenUpdated = true;
+ setAppId((appId == null) ? null : new AMQShortString(appId));
+ }
+
+ public void setAppId(AMQShortString appId)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= APPLICATION_ID_MASK;
+ _appId = appId;
+ _hasBeenUpdated = true;
+ }
+
+ public String getClusterIdAsString()
+ {
+ _hasBeenUpdated = true;
+ decodeIfNecessary();
+ return (_clusterId == null) ? null : _clusterId.toString();
+ }
+
+ public AMQShortString getClusterId()
+ {
+ _hasBeenUpdated = true;
+ decodeIfNecessary();
+ return _clusterId;
+ }
+
+ public void setClusterId(String clusterId)
+ {
+ _hasBeenUpdated = true;
+ setClusterId((clusterId == null) ? null : new AMQShortString(clusterId));
+ }
+
+ public void setClusterId(AMQShortString clusterId)
+ {
+ _hasBeenUpdated = true;
+ clearEncodedForm();
+ _propertyFlags |= CLUSTER_ID_MASK;
+ _clusterId = clusterId;
+ }
+
+ public String toString()
+ {
+ return "reply-to = " + _replyTo + ",propertyFlags = " + _propertyFlags + ",ApplicationID = " + _appId
+ + ",ClusterID = " + _clusterId + ",UserId = " + _userId + ",JMSMessageID = " + _messageId
+ + ",JMSCorrelationID = " + _correlationId + ",JMSDeliveryMode = " + _deliveryMode + ",JMSExpiration = "
+ + _expiration + ",JMSPriority = " + _priority + ",JMSTimestamp = " + _timestamp + ",JMSType = " + _type;
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
new file mode 100644
index 0000000000..59646577e1
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface.
+ */
+public interface BodyFactory
+{
+ AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
new file mode 100644
index 0000000000..7162c37062
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing;
+
+public interface CommonContentHeaderProperties extends ContentHeaderProperties
+{
+ AMQShortString getContentType();
+
+ void setContentType(AMQShortString contentType);
+
+ FieldTable getHeaders();
+
+ void setHeaders(FieldTable headers);
+
+ byte getDeliveryMode();
+
+ void setDeliveryMode(byte deliveryMode);
+
+ byte getPriority();
+
+ void setPriority(byte priority);
+
+ AMQShortString getCorrelationId();
+
+ void setCorrelationId(AMQShortString correlationId);
+
+ AMQShortString getReplyTo();
+
+ void setReplyTo(AMQShortString replyTo);
+
+ long getExpiration();
+
+ void setExpiration(long expiration);
+
+ AMQShortString getMessageId();
+
+ void setMessageId(AMQShortString messageId);
+
+ long getTimestamp();
+
+ void setTimestamp(long timestamp);
+
+ AMQShortString getType();
+
+ void setType(AMQShortString type);
+
+ AMQShortString getUserId();
+
+ void setUserId(AMQShortString userId);
+
+ AMQShortString getAppId();
+
+ void setAppId(AMQShortString appId);
+
+ AMQShortString getClusterId();
+
+ void setClusterId(AMQShortString clusterId);
+
+ AMQShortString getEncoding();
+
+ void setEncoding(AMQShortString encoding);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
new file mode 100644
index 0000000000..94030f383e
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
+{
+
+ private AMQDataBlock[] _blocks;
+
+ public CompositeAMQDataBlock(AMQDataBlock[] blocks)
+ {
+ _blocks = blocks;
+ }
+
+
+ public AMQDataBlock[] getBlocks()
+ {
+ return _blocks;
+ }
+
+
+ public long getSize()
+ {
+ long frameSize = 0;
+ for (int i = 0; i < _blocks.length; i++)
+ {
+ frameSize += _blocks[i].getSize();
+ }
+ return frameSize;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ for (int i = 0; i < _blocks.length; i++)
+ {
+ _blocks[i].writePayload(buffer);
+ }
+ }
+
+ public String toString()
+ {
+ if (_blocks == null)
+ {
+ return "No blocks contained in composite frame";
+ }
+ else
+ {
+ StringBuilder buf = new StringBuilder(this.getClass().getName());
+ buf.append("{");
+ for (int i = 0 ; i < _blocks.length; i++)
+ {
+ buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");
+ }
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java
new file mode 100644
index 0000000000..e5feeec2a4
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/Content.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public interface Content
+{
+ // TODO: New Content class required for AMQP 0-9.
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
new file mode 100644
index 0000000000..9d39f8aa86
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
+
+public class ContentBody implements AMQBody
+{
+ public static final byte TYPE = 3;
+
+ public ByteBuffer payload;
+
+ public ContentBody()
+ {
+ }
+
+ public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ if (size > 0)
+ {
+ payload = buffer.slice();
+ payload.limit((int) size);
+ buffer.skip((int) size);
+ }
+
+ }
+
+
+ public ContentBody(ByteBuffer payload)
+ {
+ this.payload = payload;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return (payload == null ? 0 : payload.limit());
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (payload != null)
+ {
+ if(payload.isDirect() || payload.isReadOnly())
+ {
+ ByteBuffer copy = payload.duplicate();
+ buffer.put(copy.rewind());
+ }
+ else
+ {
+ buffer.put(payload.array(),payload.arrayOffset(),payload.limit());
+ }
+ }
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentBodyReceived(channelId, this);
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ if (size > 0)
+ {
+ payload = buffer.slice();
+ payload.limit((int) size);
+ buffer.skip((int) size);
+ }
+
+ }
+
+ public void reduceBufferToFit()
+ {
+ if (payload != null && (payload.remaining() < payload.capacity() / 2))
+ {
+ int size = payload.limit();
+ ByteBuffer newPayload = ByteBuffer.allocate(size);
+
+ newPayload.put(payload);
+ newPayload.flip();
+
+ //reduce reference count on payload
+ payload.release();
+
+ payload = newPayload;
+ }
+ }
+
+
+
+ public static AMQFrame createAMQFrame(int channelId, ContentBody body)
+ {
+ final AMQFrame frame = new AMQFrame(channelId, body);
+ return frame;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
new file mode 100644
index 0000000000..c42995d148
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContentBodyFactory implements BodyFactory
+{
+ private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
+
+ private static final ContentBodyFactory _instance = new ContentBodyFactory();
+
+ public static ContentBodyFactory getInstance()
+ {
+ return _instance;
+ }
+
+ private ContentBodyFactory()
+ {
+ _log.debug("Creating content body factory");
+ }
+
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ {
+ return new ContentBody(in, bodySize);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
new file mode 100644
index 0000000000..30db3b8be7
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
+
+public class ContentHeaderBody implements AMQBody
+{
+ public static final byte TYPE = 2;
+
+ public int classId;
+
+ public int weight;
+
+ /** unsigned long but java can't handle that anyway when allocating byte array */
+ public long bodySize;
+
+ /** must never be null */
+ private ContentHeaderProperties properties;
+
+ public ContentHeaderBody()
+ {
+ }
+
+ public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ classId = buffer.getUnsignedShort();
+ weight = buffer.getUnsignedShort();
+ bodySize = buffer.getLong();
+ int propertyFlags = buffer.getUnsignedShort();
+ ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance();
+ properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14);
+
+ }
+
+
+ public ContentHeaderBody(ContentHeaderProperties props, int classId)
+ {
+ properties = props;
+ this.classId = classId;
+ }
+
+ public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize)
+ {
+ this(props, classId);
+ this.weight = weight;
+ this.bodySize = bodySize;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ classId = buffer.getUnsignedShort();
+ weight = buffer.getUnsignedShort();
+ bodySize = buffer.getLong();
+ int propertyFlags = buffer.getUnsignedShort();
+ ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance();
+ properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14);
+ }
+
+ /**
+ * Helper method that is used currently by the persistence layer (by BDB at the moment).
+ * @param buffer
+ * @param size
+ * @return
+ * @throws AMQFrameDecodingException
+ */
+ public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ ContentHeaderBody body = new ContentHeaderBody(buffer, size);
+
+ return body;
+ }
+
+ public int getSize()
+ {
+ return 2 + 2 + 8 + 2 + properties.getPropertyListSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, classId);
+ EncodingUtils.writeUnsignedShort(buffer, weight);
+ buffer.putLong(bodySize);
+ EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags());
+ properties.writePropertyListPayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentHeaderReceived(channelId, this);
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
+ long bodySize)
+ {
+ return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize));
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body)
+ {
+ return new AMQFrame(channelId, body);
+ }
+
+ public ContentHeaderProperties getProperties()
+ {
+ return properties;
+ }
+
+ public void setProperties(ContentHeaderProperties props)
+ {
+ properties = props;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
new file mode 100644
index 0000000000..8d5e2f9fb4
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContentHeaderBodyFactory implements BodyFactory
+{
+ private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
+
+ private static final ContentHeaderBodyFactory _instance = new ContentHeaderBodyFactory();
+
+ public static ContentHeaderBodyFactory getInstance()
+ {
+ return _instance;
+ }
+
+ private ContentHeaderBodyFactory()
+ {
+ _log.debug("Creating content header body factory");
+ }
+
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ {
+ // all content headers are the same - it is only the properties that differ.
+ // the content header body further delegates construction of properties
+ return new ContentHeaderBody(in, bodySize);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
new file mode 100644
index 0000000000..7ef538cfdc
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * There will be an implementation of this interface for each content type. All content types have associated
+ * header properties and this provides a way to encode and decode them.
+ */
+public interface ContentHeaderProperties
+{
+ /**
+ * Writes the property list to the buffer, in a suitably encoded form.
+ * @param buffer The buffer to write to
+ */
+ void writePropertyListPayload(ByteBuffer buffer);
+
+ /**
+ * Populates the properties from buffer.
+ * @param buffer The buffer to read from.
+ * @param propertyFlags he property flags.
+ * @throws AMQFrameDecodingException when the buffer does not contain valid data
+ */
+ void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
+ throws AMQFrameDecodingException;
+
+ /**
+ * @return the size of the encoded property list in bytes.
+ */
+ int getPropertyListSize();
+
+ /**
+ * Gets the property flags. Property flags indicate which properties are set in the list. The
+ * position and meaning of each flag is defined in the protocol specification for the particular
+ * content type with which these properties are associated.
+ * @return flags
+ */
+ int getPropertyFlags();
+
+ void updated();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
new file mode 100644
index 0000000000..46189b63d7
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+
+public class ContentHeaderPropertiesFactory
+{
+ private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory();
+
+ public static ContentHeaderPropertiesFactory getInstance()
+ {
+ return _instance;
+ }
+
+ private ContentHeaderPropertiesFactory()
+ {
+ }
+
+ public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
+ ByteBuffer buffer, int size)
+ throws AMQFrameDecodingException
+ {
+ ContentHeaderProperties properties;
+ // AMQP version change: "Hardwired" version to major=8, minor=0
+ // TODO: Change so that the actual version is obtained from
+ // the ProtocolInitiation object for this session.
+ if (classId == BasicConsumeBodyImpl.CLASS_ID)
+ {
+ properties = new BasicContentHeaderProperties();
+ }
+ else
+ {
+ throw new AMQFrameDecodingException(null, "Unsupport content header class id: " + classId, null);
+ }
+ properties.populatePropertiesFromBuffer(buffer, propertyFlags, size);
+ return properties;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java
new file mode 100644
index 0000000000..f6795ff200
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public abstract class DeferredDataBlock extends AMQDataBlock
+{
+ private AMQDataBlock _underlyingDataBlock;
+
+
+ public long getSize()
+ {
+ if(_underlyingDataBlock == null)
+ {
+ _underlyingDataBlock = createAMQDataBlock();
+ }
+ return _underlyingDataBlock.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingDataBlock == null)
+ {
+ _underlyingDataBlock = createAMQDataBlock();
+ }
+ _underlyingDataBlock.writePayload(buffer);
+ }
+
+ abstract protected AMQDataBlock createAMQDataBlock();
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java
new file mode 100644
index 0000000000..9cf96e698c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodableAMQDataBlock.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+/**
+ * Marker interface to indicate to MINA that a data block should be encoded with the
+ * single encoder/decoder that we have defined.
+ *
+ * Note that due to a bug in MINA all classes must directly implement this interface, even if
+ * a superclass implements it.
+ * TODO: fix MINA so that this is not necessary
+ *
+ */
+public interface EncodableAMQDataBlock
+{
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
new file mode 100644
index 0000000000..6425f8c591
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
@@ -0,0 +1,1033 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+public class EncodingUtils
+{
+ private static final Logger _logger = LoggerFactory.getLogger(EncodingUtils.class);
+
+ private static final String STRING_ENCODING = "iso8859-15";
+
+ private static final Charset _charset = Charset.forName("iso8859-15");
+
+ public static final int SIZEOF_UNSIGNED_SHORT = 2;
+ public static final int SIZEOF_UNSIGNED_INT = 4;
+ private static final boolean[] ALL_FALSE_ARRAY = new boolean[8];
+
+ public static int encodedShortStringLength(String s)
+ {
+ if (s == null)
+ {
+ return 1;
+ }
+ else
+ {
+ return (short) (1 + s.length());
+ }
+ }
+
+ public static int encodedShortStringLength(short s)
+ {
+ if (s == 0)
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if (s < 0)
+ {
+ len = 1;
+ // sloppy - doesn't work of Integer.MIN_VALUE
+ s = (short) -s;
+ }
+
+ if (s > 9999)
+ {
+ return 1 + 5;
+ }
+ else if (s > 999)
+ {
+ return 1 + 4;
+ }
+ else if (s > 99)
+ {
+ return 1 + 3;
+ }
+ else if (s > 9)
+ {
+ return 1 + 2;
+ }
+ else
+ {
+ return 1 + 1;
+ }
+
+ }
+
+ public static int encodedShortStringLength(int i)
+ {
+ if (i == 0)
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if (i < 0)
+ {
+ len = 1;
+ // sloppy - doesn't work of Integer.MIN_VALUE
+ i = -i;
+ }
+
+ // range is now 1 - 2147483647
+ if (i < Short.MAX_VALUE)
+ {
+ return len + encodedShortStringLength((short) i);
+ }
+ else if (i > 999999)
+ {
+ return len + 6 + encodedShortStringLength((short) (i / 1000000));
+ }
+ else // if (i > 99999)
+ {
+ return len + 5 + encodedShortStringLength((short) (i / 100000));
+ }
+
+ }
+
+ public static int encodedShortStringLength(long l)
+ {
+ if (l == 0)
+ {
+ return 1 + 1;
+ }
+
+ int len = 0;
+ if (l < 0)
+ {
+ len = 1;
+ // sloppy - doesn't work of Long.MIN_VALUE
+ l = -l;
+ }
+
+ if (l < Integer.MAX_VALUE)
+ {
+ return len + encodedShortStringLength((int) l);
+ }
+ else if (l > 9999999999L)
+ {
+ return len + 10 + encodedShortStringLength((int) (l / 10000000000L));
+ }
+ else
+ {
+ return len + 1 + encodedShortStringLength((int) (l / 10L));
+ }
+
+ }
+
+ public static int encodedShortStringLength(AMQShortString s)
+ {
+ if (s == null)
+ {
+ return 1;
+ }
+ else
+ {
+ return (1 + s.length());
+ }
+ }
+
+ public static int encodedLongStringLength(String s)
+ {
+ if (s == null)
+ {
+ return 4;
+ }
+ else
+ {
+ return 4 + s.length();
+ }
+ }
+
+ public static int encodedLongStringLength(char[] s)
+ {
+ if (s == null)
+ {
+ return 4;
+ }
+ else
+ {
+ return 4 + s.length;
+ }
+ }
+
+ public static int encodedLongstrLength(byte[] bytes)
+ {
+ if (bytes == null)
+ {
+ return 4;
+ }
+ else
+ {
+ return 4 + bytes.length;
+ }
+ }
+
+ public static int encodedFieldTableLength(FieldTable table)
+ {
+ if (table == null)
+ {
+ // length is encoded as 4 octets
+ return 4;
+ }
+ else
+ {
+ // length of the table plus 4 octets for the length
+ return (int) table.getEncodedSize() + 4;
+ }
+ }
+
+ public static int encodedContentLength(Content table)
+ {
+ // TODO: New Content class required for AMQP 0-9.
+ return 0;
+ }
+
+ public static void writeShortStringBytes(ByteBuffer buffer, String s)
+ {
+ if (s != null)
+ {
+ byte[] encodedString = new byte[s.length()];
+ char[] cha = s.toCharArray();
+ for (int i = 0; i < cha.length; i++)
+ {
+ encodedString[i] = (byte) cha[i];
+ }
+
+ // TODO: check length fits in an unsigned byte
+ writeUnsignedByte(buffer, (short)encodedString.length);
+ buffer.put(encodedString);
+
+
+ }
+ else
+ {
+ // really writing out unsigned byte
+ buffer.put((byte) 0);
+ }
+ }
+
+ public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s)
+ {
+ if (s != null)
+ {
+
+ s.writeToBuffer(buffer);
+ }
+ else
+ {
+ // really writing out unsigned byte
+ buffer.put((byte) 0);
+ }
+ }
+
+ public static void writeLongStringBytes(ByteBuffer buffer, String s)
+ {
+ assert (s == null) || (s.length() <= 0xFFFE);
+ if (s != null)
+ {
+ int len = s.length();
+ writeUnsignedInteger(buffer, s.length());
+ byte[] encodedString = new byte[len];
+ char[] cha = s.toCharArray();
+ for (int i = 0; i < cha.length; i++)
+ {
+ encodedString[i] = (byte) cha[i];
+ }
+
+ buffer.put(encodedString);
+ }
+ else
+ {
+ writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeLongStringBytes(ByteBuffer buffer, char[] s)
+ {
+ assert (s == null) || (s.length <= 0xFFFE);
+ if (s != null)
+ {
+ int len = s.length;
+ writeUnsignedInteger(buffer, s.length);
+ byte[] encodedString = new byte[len];
+ for (int i = 0; i < s.length; i++)
+ {
+ encodedString[i] = (byte) s[i];
+ }
+
+ buffer.put(encodedString);
+ }
+ else
+ {
+ writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes)
+ {
+ assert (bytes == null) || (bytes.length <= 0xFFFE);
+ if (bytes != null)
+ {
+ writeUnsignedInteger(buffer, bytes.length);
+ buffer.put(bytes);
+ }
+ else
+ {
+ writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeUnsignedByte(ByteBuffer buffer, short b)
+ {
+ byte bv = (byte) b;
+ buffer.put(bv);
+ }
+
+ public static void writeUnsignedShort(ByteBuffer buffer, int s)
+ {
+ // TODO: Is this comparison safe? Do I need to cast RHS to long?
+ if (s < Short.MAX_VALUE)
+ {
+ buffer.putShort((short) s);
+ }
+ else
+ {
+ short sv = (short) s;
+ buffer.put((byte) (0xFF & (sv >> 8)));
+ buffer.put((byte) (0xFF & sv));
+ }
+ }
+
+ public static int unsignedIntegerLength()
+ {
+ return 4;
+ }
+
+ public static void writeUnsignedInteger(ByteBuffer buffer, long l)
+ {
+ // TODO: Is this comparison safe? Do I need to cast RHS to long?
+ if (l < Integer.MAX_VALUE)
+ {
+ buffer.putInt((int) l);
+ }
+ else
+ {
+ int iv = (int) l;
+
+ // FIXME: This *may* go faster if we build this into a local 4-byte array and then
+ // put the array in a single call.
+ buffer.put((byte) (0xFF & (iv >> 24)));
+ buffer.put((byte) (0xFF & (iv >> 16)));
+ buffer.put((byte) (0xFF & (iv >> 8)));
+ buffer.put((byte) (0xFF & iv));
+ }
+ }
+
+ public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
+ {
+ if (table != null)
+ {
+ table.writeToBuffer(buffer);
+ }
+ else
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeContentBytes(ByteBuffer buffer, Content content)
+ {
+ // TODO: New Content class required for AMQP 0-9.
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean[] values)
+ {
+ byte packedValue = 0;
+ for (int i = 0; i < values.length; i++)
+ {
+ if (values[i])
+ {
+ packedValue = (byte) (packedValue | (1 << i));
+ }
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value)
+ {
+
+ buffer.put(value ? (byte) 1 : (byte) 0);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 1));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 2));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 3));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 4));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 5));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5, boolean value6)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 5));
+ }
+
+ if (value6)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 6));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5, boolean value6, boolean value7)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 5));
+ }
+
+ if (value6)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 6));
+ }
+
+ if (value7)
+ {
+ packedValue = (byte) (packedValue | (byte) (1 << 7));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ /**
+ * This is used for writing longstrs.
+ *
+ * @param buffer
+ * @param data
+ */
+ public static void writeLongstr(ByteBuffer buffer, byte[] data)
+ {
+ if (data != null)
+ {
+ writeUnsignedInteger(buffer, data.length);
+ buffer.put(data);
+ }
+ else
+ {
+ writeUnsignedInteger(buffer, 0);
+ }
+ }
+
+ public static void writeTimestamp(ByteBuffer buffer, long timestamp)
+ {
+ writeLong(buffer, timestamp);
+ }
+
+ public static boolean[] readBooleans(ByteBuffer buffer)
+ {
+ final byte packedValue = buffer.get();
+ if (packedValue == 0)
+ {
+ return ALL_FALSE_ARRAY;
+ }
+
+ final boolean[] result = new boolean[8];
+
+ result[0] = ((packedValue & 1) != 0);
+ result[1] = ((packedValue & (1 << 1)) != 0);
+ result[2] = ((packedValue & (1 << 2)) != 0);
+ result[3] = ((packedValue & (1 << 3)) != 0);
+ if ((packedValue & 0xF0) == 0)
+ {
+ result[0] = ((packedValue & 1) != 0);
+ }
+
+ result[4] = ((packedValue & (1 << 4)) != 0);
+ result[5] = ((packedValue & (1 << 5)) != 0);
+ result[6] = ((packedValue & (1 << 6)) != 0);
+ result[7] = ((packedValue & (1 << 7)) != 0);
+
+ return result;
+ }
+
+ public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ long length = buffer.getUnsignedInt();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return FieldTableFactory.newFieldTable(buffer, length);
+ }
+ }
+
+ public static Content readContent(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ // TODO: New Content class required for AMQP 0-9.
+ return null;
+ }
+
+ public static AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return AMQShortString.readFromBuffer(buffer);
+
+ }
+
+ public static String readShortString(ByteBuffer buffer)
+ {
+ short length = buffer.getUnsigned();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ // this may seem rather odd to declare two array but testing has shown
+ // that constructing a string from a byte array is 5 (five) times slower
+ // than constructing one from a char array.
+ // this approach here is valid since we know that all the chars are
+ // ASCII (0-127)
+ byte[] stringBytes = new byte[length];
+ buffer.get(stringBytes, 0, length);
+ char[] stringChars = new char[length];
+ for (int i = 0; i < stringChars.length; i++)
+ {
+ stringChars[i] = (char) stringBytes[i];
+ }
+
+ return new String(stringChars);
+ }
+ }
+
+ public static String readLongString(ByteBuffer buffer)
+ {
+ long length = buffer.getUnsignedInt();
+ if (length == 0)
+ {
+ return "";
+ }
+ else
+ {
+ // this may seem rather odd to declare two array but testing has shown
+ // that constructing a string from a byte array is 5 (five) times slower
+ // than constructing one from a char array.
+ // this approach here is valid since we know that all the chars are
+ // ASCII (0-127)
+ byte[] stringBytes = new byte[(int) length];
+ buffer.get(stringBytes, 0, (int) length);
+ char[] stringChars = new char[(int) length];
+ for (int i = 0; i < stringChars.length; i++)
+ {
+ stringChars[i] = (char) stringBytes[i];
+ }
+
+ return new String(stringChars);
+ }
+ }
+
+ public static byte[] readLongstr(ByteBuffer buffer)
+ {
+ long length = buffer.getUnsignedInt();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] result = new byte[(int) length];
+ buffer.get(result);
+
+ return result;
+ }
+ }
+
+ public static long readTimestamp(ByteBuffer buffer)
+ {
+ // Discard msb from AMQ timestamp
+ // buffer.getUnsignedInt();
+ return buffer.getLong();
+ }
+
+ static byte[] hexToByteArray(String id)
+ {
+ // Should check param for null, long enough for this check, upper-case and trailing char
+ String s = (id.charAt(1) == 'x') ? id.substring(2) : id; // strip 0x
+
+ int len = s.length();
+ int byte_len = len / 2;
+ byte[] b = new byte[byte_len];
+
+ for (int i = 0; i < byte_len; i++)
+ {
+ // fixme: refine these repetitive subscript calcs.
+ int ch = i * 2;
+
+ byte b1 = Byte.parseByte(s.substring(ch, ch + 1), 16);
+ byte b2 = Byte.parseByte(s.substring(ch + 1, ch + 2), 16);
+
+ b[i] = (byte) ((b1 * 16) + b2);
+ }
+
+ return (b);
+ }
+
+ public static char[] convertToHexCharArray(byte[] from)
+ {
+ int length = from.length;
+ char[] result_buff = new char[(length * 2) + 2];
+
+ result_buff[0] = '0';
+ result_buff[1] = 'x';
+
+ int bite;
+ int dest = 2;
+
+ for (int i = 0; i < length; i++)
+ {
+ bite = from[i];
+
+ if (bite < 0)
+ {
+ bite += 256;
+ }
+
+ result_buff[dest++] = hex_chars[bite >> 4];
+ result_buff[dest++] = hex_chars[bite & 0x0f];
+ }
+
+ return (result_buff);
+ }
+
+ public static String convertToHexString(byte[] from)
+ {
+ return (new String(convertToHexCharArray(from)));
+ }
+
+ public static String convertToHexString(ByteBuffer bb)
+ {
+ int size = bb.limit();
+
+ byte[] from = new byte[size];
+
+ // Is this not the same.
+ // bb.get(from, 0, length);
+ for (int i = 0; i < size; i++)
+ {
+ from[i] = bb.get(i);
+ }
+
+ return (new String(convertToHexCharArray(from)));
+ }
+
+ private static char[] hex_chars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
+
+ // **** new methods
+
+ // AMQP_BOOLEAN_PROPERTY_PREFIX
+
+ public static void writeBoolean(ByteBuffer buffer, Boolean aBoolean)
+ {
+ buffer.put((byte) (aBoolean ? 1 : 0));
+ }
+
+ public static boolean readBoolean(ByteBuffer buffer)
+ {
+ byte packedValue = buffer.get();
+
+ return (packedValue == 1);
+ }
+
+ public static int encodedBooleanLength()
+ {
+ return 1;
+ }
+
+ // AMQP_BYTE_PROPERTY_PREFIX
+ public static void writeByte(ByteBuffer buffer, Byte aByte)
+ {
+ buffer.put(aByte);
+ }
+
+ public static byte readByte(ByteBuffer buffer)
+ {
+ return buffer.get();
+ }
+
+ public static int encodedByteLength()
+ {
+ return 1;
+ }
+
+ // AMQP_SHORT_PROPERTY_PREFIX
+ public static void writeShort(ByteBuffer buffer, Short aShort)
+ {
+ buffer.putShort(aShort);
+ }
+
+ public static short readShort(ByteBuffer buffer)
+ {
+ return buffer.getShort();
+ }
+
+ public static int encodedShortLength()
+ {
+ return 2;
+ }
+
+ // INTEGER_PROPERTY_PREFIX
+ public static void writeInteger(ByteBuffer buffer, Integer aInteger)
+ {
+ buffer.putInt(aInteger);
+ }
+
+ public static int readInteger(ByteBuffer buffer)
+ {
+ return buffer.getInt();
+ }
+
+ public static int encodedIntegerLength()
+ {
+ return 4;
+ }
+
+ // AMQP_LONG_PROPERTY_PREFIX
+ public static void writeLong(ByteBuffer buffer, Long aLong)
+ {
+ buffer.putLong(aLong);
+ }
+
+ public static long readLong(ByteBuffer buffer)
+ {
+ return buffer.getLong();
+ }
+
+ public static int encodedLongLength()
+ {
+ return 8;
+ }
+
+ // Float_PROPERTY_PREFIX
+ public static void writeFloat(ByteBuffer buffer, Float aFloat)
+ {
+ buffer.putFloat(aFloat);
+ }
+
+ public static float readFloat(ByteBuffer buffer)
+ {
+ return buffer.getFloat();
+ }
+
+ public static int encodedFloatLength()
+ {
+ return 4;
+ }
+
+ // Double_PROPERTY_PREFIX
+ public static void writeDouble(ByteBuffer buffer, Double aDouble)
+ {
+ buffer.putDouble(aDouble);
+ }
+
+ public static double readDouble(ByteBuffer buffer)
+ {
+ return buffer.getDouble();
+ }
+
+ public static int encodedDoubleLength()
+ {
+ return 8;
+ }
+
+ public static byte[] readBytes(ByteBuffer buffer)
+ {
+ long length = buffer.getUnsignedInt();
+ if (length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] dataBytes = new byte[(int)length];
+ buffer.get(dataBytes, 0, (int)length);
+
+ return dataBytes;
+ }
+ }
+
+ public static void writeBytes(ByteBuffer buffer, byte[] data)
+ {
+ if (data != null)
+ {
+ // TODO: check length fits in an unsigned byte
+ writeUnsignedInteger(buffer, (long)data.length);
+ buffer.put(data);
+ }
+ else
+ {
+ // really writing out unsigned byte
+ //buffer.put((byte) 0);
+ writeUnsignedInteger(buffer, 0L);
+ }
+ }
+
+ // CHAR_PROPERTY
+ public static int encodedCharLength()
+ {
+ return encodedByteLength();
+ }
+
+ public static char readChar(ByteBuffer buffer)
+ {
+ // This is valid as we know that the Character is ASCII 0..127
+ return (char) buffer.get();
+ }
+
+ public static void writeChar(ByteBuffer buffer, char character)
+ {
+ // This is valid as we know that the Character is ASCII 0..127
+ writeByte(buffer, (byte) character);
+ }
+
+ public static long readLongAsShortString(ByteBuffer buffer)
+ {
+ short length = buffer.getUnsigned();
+ short pos = 0;
+ if (length == 0)
+ {
+ return 0L;
+ }
+
+ byte digit = buffer.get();
+ boolean isNegative;
+ long result = 0;
+ if (digit == (byte) '-')
+ {
+ isNegative = true;
+ pos++;
+ digit = buffer.get();
+ }
+ else
+ {
+ isNegative = false;
+ }
+
+ result = digit - (byte) '0';
+ pos++;
+
+ while (pos < length)
+ {
+ pos++;
+ digit = buffer.get();
+ result = (result << 3) + (result << 1);
+ result += digit - (byte) '0';
+ }
+
+ return result;
+ }
+
+ public static long readUnsignedInteger(ByteBuffer buffer)
+ {
+ long l = 0xFF & buffer.get();
+ l <<= 8;
+ l = l | (0xFF & buffer.get());
+ l <<= 8;
+ l = l | (0xFF & buffer.get());
+ l <<= 8;
+ l = l | (0xFF & buffer.get());
+
+ return l;
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
new file mode 100644
index 0000000000..22205d49f8
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -0,0 +1,1246 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQPInvalidClassException;
+
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+// extends FieldTable
+public class FieldTable
+{
+ private static final Logger _logger = LoggerFactory.getLogger(FieldTable.class);
+ private static final String STRICT_AMQP = "STRICT_AMQP";
+ private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false"));
+
+ private ByteBuffer _encodedForm;
+ private LinkedHashMap<AMQShortString, AMQTypedValue> _properties;
+ private long _encodedSize;
+ private static final int INITIAL_HASHMAP_CAPACITY = 16;
+ private static final int INITIAL_ENCODED_FORM_SIZE = 256;
+
+ public FieldTable()
+ {
+ super();
+ // _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE);
+ // _encodedForm.setAutoExpand(true);
+ // _encodedForm.limit(0);
+ }
+
+ /**
+ * Construct a new field table.
+ *
+ * @param buffer the buffer from which to read data. The length byte must be read already
+ * @param length the length of the field table. Must be > 0.
+ */
+ public FieldTable(ByteBuffer buffer, long length)
+ {
+ this();
+ ByteBuffer encodedForm = buffer.slice();
+ encodedForm.limit((int) length);
+ _encodedForm = ByteBuffer.allocate((int)length);
+ _encodedForm.put(encodedForm);
+ _encodedForm.flip();
+ _encodedSize = length;
+ buffer.skip((int) length);
+ }
+
+ public AMQTypedValue getProperty(AMQShortString string)
+ {
+ checkPropertyName(string);
+
+ synchronized (this)
+ {
+ if (_properties == null)
+ {
+ if (_encodedForm == null)
+ {
+ return null;
+ }
+ else
+ {
+ populateFromBuffer();
+ }
+ }
+ }
+
+ if (_properties == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _properties.get(string);
+ }
+ }
+
+ private void populateFromBuffer()
+ {
+ try
+ {
+ setFromBuffer(_encodedForm, _encodedSize);
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ _logger.error("Error decoding FieldTable in deferred decoding mode ", e);
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val)
+ {
+ checkPropertyName(key);
+ initMapIfNecessary();
+ if (_properties.containsKey(key))
+ {
+ _encodedForm = null;
+
+ if (val == null)
+ {
+ return removeKey(key);
+ }
+ }
+ else if ((_encodedForm != null) && (val != null))
+ {
+ // We have updated data to store in the buffer
+ // So clear the _encodedForm to allow it to be rebuilt later
+ // this is safer than simply appending to any existing buffer.
+ _encodedForm = null;
+ }
+ else if (val == null)
+ {
+ return null;
+ }
+
+ AMQTypedValue oldVal = _properties.put(key, val);
+ if (oldVal != null)
+ {
+ _encodedSize -= oldVal.getEncodingSize();
+ }
+ else
+ {
+ _encodedSize += EncodingUtils.encodedShortStringLength(key) + 1;
+ }
+
+ _encodedSize += val.getEncodingSize();
+
+ return oldVal;
+ }
+
+ private void initMapIfNecessary()
+ {
+ synchronized (this)
+ {
+ if (_properties == null)
+ {
+ if ((_encodedForm == null) || (_encodedSize == 0))
+ {
+ _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>();
+ }
+ else
+ {
+ populateFromBuffer();
+ }
+ }
+
+ }
+ }
+
+ public Boolean getBoolean(String string)
+ {
+ return getBoolean(new AMQShortString(string));
+ }
+
+ public Boolean getBoolean(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.BOOLEAN))
+ {
+ return (Boolean) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Byte getByte(String string)
+ {
+ return getByte(new AMQShortString(string));
+ }
+
+ public Byte getByte(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.BYTE))
+ {
+ return (Byte) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Short getShort(String string)
+ {
+ return getShort(new AMQShortString(string));
+ }
+
+ public Short getShort(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.SHORT))
+ {
+ return (Short) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Integer getInteger(String string)
+ {
+ return getInteger(new AMQShortString(string));
+ }
+
+ public Integer getInteger(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.INT))
+ {
+ return (Integer) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Long getLong(String string)
+ {
+ return getLong(new AMQShortString(string));
+ }
+
+ public Long getLong(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.LONG))
+ {
+ return (Long) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Float getFloat(String string)
+ {
+ return getFloat(new AMQShortString(string));
+ }
+
+ public Float getFloat(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.FLOAT))
+ {
+ return (Float) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Double getDouble(String string)
+ {
+ return getDouble(new AMQShortString(string));
+ }
+
+ public Double getDouble(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.DOUBLE))
+ {
+ return (Double) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public String getString(String string)
+ {
+ return getString(new AMQShortString(string));
+ }
+
+ public String getString(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && ((value.getType() == AMQType.WIDE_STRING) || (value.getType() == AMQType.ASCII_STRING)))
+ {
+ return (String) value.getValue();
+ }
+ else if ((value != null) && (value.getValue() != null) && !(value.getValue() instanceof byte[]))
+ {
+ return String.valueOf(value.getValue());
+ }
+ else
+ {
+ return null;
+ }
+
+ }
+
+ public Character getCharacter(String string)
+ {
+ return getCharacter(new AMQShortString(string));
+ }
+
+ public Character getCharacter(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.ASCII_CHARACTER))
+ {
+ return (Character) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public byte[] getBytes(String string)
+ {
+ return getBytes(new AMQShortString(string));
+ }
+
+ public byte[] getBytes(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if ((value != null) && (value.getType() == AMQType.BINARY))
+ {
+ return (byte[]) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Extracts a value from the field table that is itself a FieldTable associated with the specified parameter name.
+ *
+ * @param string The name of the parameter to get the associated FieldTable value for.
+ *
+ * @return The associated FieldTable value, or <tt>null</tt> if the associated value is not of FieldTable type or
+ * not present in the field table at all.
+ */
+ public FieldTable getFieldTable(String string)
+ {
+ return getFieldTable(new AMQShortString(string));
+ }
+
+ /**
+ * Extracts a value from the field table that is itself a FieldTable associated with the specified parameter name.
+ *
+ * @param string The name of the parameter to get the associated FieldTable value for.
+ *
+ * @return The associated FieldTable value, or <tt>null</tt> if the associated value is not of FieldTable type or
+ * not present in the field table at all.
+ */
+ public FieldTable getFieldTable(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+
+ if ((value != null) && (value.getType() == AMQType.FIELD_TABLE))
+ {
+ return (FieldTable) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Object getObject(String string)
+ {
+ return getObject(new AMQShortString(string));
+ }
+
+ public Object getObject(AMQShortString string)
+ {
+ AMQTypedValue value = getProperty(string);
+ if (value != null)
+ {
+ return value.getValue();
+ }
+ else
+ {
+ return value;
+ }
+
+ }
+
+ public Long getTimestamp(AMQShortString name)
+ {
+ AMQTypedValue value = getProperty(name);
+ if ((value != null) && (value.getType() == AMQType.TIMESTAMP))
+ {
+ return (Long) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public BigDecimal getDecimal(AMQShortString propertyName)
+ {
+ AMQTypedValue value = getProperty(propertyName);
+ if ((value != null) && (value.getType() == AMQType.DECIMAL))
+ {
+ return (BigDecimal) value.getValue();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ // ************ Setters
+ public Object setBoolean(String string, Boolean b)
+ {
+ return setBoolean(new AMQShortString(string), b);
+ }
+
+ public Object setBoolean(AMQShortString string, Boolean b)
+ {
+ return setProperty(string, AMQType.BOOLEAN.asTypedValue(b));
+ }
+
+ public Object setByte(String string, Byte b)
+ {
+ return setByte(new AMQShortString(string), b);
+ }
+
+ public Object setByte(AMQShortString string, Byte b)
+ {
+ return setProperty(string, AMQType.BYTE.asTypedValue(b));
+ }
+
+ public Object setShort(String string, Short i)
+ {
+ return setShort(new AMQShortString(string), i);
+ }
+
+ public Object setShort(AMQShortString string, Short i)
+ {
+ return setProperty(string, AMQType.SHORT.asTypedValue(i));
+ }
+
+ public Object setInteger(String string, Integer i)
+ {
+ return setInteger(new AMQShortString(string), i);
+ }
+
+ public Object setInteger(AMQShortString string, Integer i)
+ {
+ return setProperty(string, AMQType.INT.asTypedValue(i));
+ }
+
+ public Object setLong(String string, Long l)
+ {
+ return setLong(new AMQShortString(string), l);
+ }
+
+ public Object setLong(AMQShortString string, Long l)
+ {
+ return setProperty(string, AMQType.LONG.asTypedValue(l));
+ }
+
+ public Object setFloat(String string, Float f)
+ {
+ return setFloat(new AMQShortString(string), f);
+ }
+
+ public Object setFloat(AMQShortString string, Float v)
+ {
+ return setProperty(string, AMQType.FLOAT.asTypedValue(v));
+ }
+
+ public Object setDouble(String string, Double d)
+ {
+ return setDouble(new AMQShortString(string), d);
+ }
+
+ public Object setDouble(AMQShortString string, Double v)
+ {
+ return setProperty(string, AMQType.DOUBLE.asTypedValue(v));
+ }
+
+ public Object setString(String string, String s)
+ {
+ return setString(new AMQShortString(string), s);
+ }
+
+ public Object setAsciiString(AMQShortString string, String value)
+ {
+ if (value == null)
+ {
+ return setProperty(string, AMQType.VOID.asTypedValue(null));
+ }
+ else
+ {
+ return setProperty(string, AMQType.ASCII_STRING.asTypedValue(value));
+ }
+ }
+
+ public Object setString(AMQShortString string, String value)
+ {
+ if (value == null)
+ {
+ return setProperty(string, AMQType.VOID.asTypedValue(null));
+ }
+ else
+ {
+ return setProperty(string, AMQType.LONG_STRING.asTypedValue(value));
+ }
+ }
+
+ public Object setChar(String string, char c)
+ {
+ return setChar(new AMQShortString(string), c);
+ }
+
+ public Object setChar(AMQShortString string, char c)
+ {
+ return setProperty(string, AMQType.ASCII_CHARACTER.asTypedValue(c));
+ }
+
+ public Object setBytes(String string, byte[] b)
+ {
+ return setBytes(new AMQShortString(string), b);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes)
+ {
+ return setProperty(string, AMQType.BINARY.asTypedValue(bytes));
+ }
+
+ public Object setBytes(String string, byte[] bytes, int start, int length)
+ {
+ return setBytes(new AMQShortString(string), bytes, start, length);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
+ {
+ byte[] newBytes = new byte[length];
+ System.arraycopy(bytes, start, newBytes, 0, length);
+
+ return setBytes(string, bytes);
+ }
+
+ public Object setObject(String string, Object o)
+ {
+ return setObject(new AMQShortString(string), o);
+ }
+
+ public Object setTimestamp(AMQShortString string, long datetime)
+ {
+ return setProperty(string, AMQType.TIMESTAMP.asTypedValue(datetime));
+ }
+
+ public Object setDecimal(AMQShortString string, BigDecimal decimal)
+ {
+ if (decimal.longValue() > Integer.MAX_VALUE)
+ {
+ throw new UnsupportedOperationException("AMQP doesnot support decimals larger than " + Integer.MAX_VALUE);
+ }
+
+ if (decimal.scale() > Byte.MAX_VALUE)
+ {
+ throw new UnsupportedOperationException("AMQP doesnot support decimal scales larger than " + Byte.MAX_VALUE);
+ }
+
+ return setProperty(string, AMQType.DECIMAL.asTypedValue(decimal));
+ }
+
+ public Object setVoid(AMQShortString string)
+ {
+ return setProperty(string, AMQType.VOID.asTypedValue(null));
+ }
+
+ /**
+ * Associates a nested field table with the specified parameter name.
+ *
+ * @param string The name of the parameter to store in the table.
+ * @param ftValue The field table value to associate with the parameter name.
+ *
+ * @return The stored value.
+ */
+ public Object setFieldTable(String string, FieldTable ftValue)
+ {
+ return setFieldTable(new AMQShortString(string), ftValue);
+ }
+
+ /**
+ * Associates a nested field table with the specified parameter name.
+ *
+ * @param string The name of the parameter to store in the table.
+ * @param ftValue The field table value to associate with the parameter name.
+ *
+ * @return The stored value.
+ */
+ public Object setFieldTable(AMQShortString string, FieldTable ftValue)
+ {
+ return setProperty(string, AMQType.FIELD_TABLE.asTypedValue(ftValue));
+ }
+
+ public Object setObject(AMQShortString string, Object object)
+ {
+ if (object instanceof Boolean)
+ {
+ return setBoolean(string, (Boolean) object);
+ }
+ else if (object instanceof Byte)
+ {
+ return setByte(string, (Byte) object);
+ }
+ else if (object instanceof Short)
+ {
+ return setShort(string, (Short) object);
+ }
+ else if (object instanceof Integer)
+ {
+ return setInteger(string, (Integer) object);
+ }
+ else if (object instanceof Long)
+ {
+ return setLong(string, (Long) object);
+ }
+ else if (object instanceof Float)
+ {
+ return setFloat(string, (Float) object);
+ }
+ else if (object instanceof Double)
+ {
+ return setDouble(string, (Double) object);
+ }
+ else if (object instanceof String)
+ {
+ return setString(string, (String) object);
+ }
+ else if (object instanceof Character)
+ {
+ return setChar(string, (Character) object);
+ }
+ else if (object instanceof byte[])
+ {
+ return setBytes(string, (byte[]) object);
+ }
+
+ throw new AMQPInvalidClassException(AMQPInvalidClassException.INVALID_OBJECT_MSG + (object == null ? "null" : object.getClass()));
+ }
+
+ public boolean isNullStringValue(String name)
+ {
+ AMQTypedValue value = getProperty(new AMQShortString(name));
+
+ return (value != null) && (value.getType() == AMQType.VOID);
+ }
+
+ // ***** Methods
+
+ public Enumeration getPropertyNames()
+ {
+ return Collections.enumeration(keys());
+ }
+
+ public boolean propertyExists(AMQShortString propertyName)
+ {
+ return itemExists(propertyName);
+ }
+
+ public boolean propertyExists(String propertyName)
+ {
+ return itemExists(propertyName);
+ }
+
+ public boolean itemExists(AMQShortString propertyName)
+ {
+ checkPropertyName(propertyName);
+ initMapIfNecessary();
+
+ return _properties.containsKey(propertyName);
+ }
+
+ public boolean itemExists(String string)
+ {
+ return itemExists(new AMQShortString(string));
+ }
+
+ public String toString()
+ {
+ initMapIfNecessary();
+
+ return _properties.toString();
+ }
+
+ private void checkPropertyName(AMQShortString propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ if (_strictAMQP)
+ {
+ checkIdentiferFormat(propertyName);
+ }
+ }
+
+ protected static void checkIdentiferFormat(AMQShortString propertyName)
+ {
+ // AMQP Spec: 4.2.5.5 Field Tables
+ // Guidelines for implementers:
+ // * Field names MUST start with a letter, '$' or '#' and may continue with
+ // letters, '$' or '#', digits, or underlines, to a maximum length of 128
+ // characters.
+ // * The server SHOULD validate field names and upon receiving an invalid
+ // field name, it SHOULD signal a connection exception with reply code
+ // 503 (syntax error). Conformance test: amq_wlp_table_01.
+ // * A peer MUST handle duplicate fields by using only the first instance.
+
+ // AMQP length limit
+ if (propertyName.length() > 128)
+ {
+ throw new IllegalArgumentException("AMQP limits property names to 128 characters");
+ }
+
+ // AMQ start character
+ if (!(Character.isLetter(propertyName.charAt(0)) || (propertyName.charAt(0) == '$')
+ || (propertyName.charAt(0) == '#') || (propertyName.charAt(0) == '_'))) // Not official AMQP added for JMS.
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName
+ + "' does not start with a valid AMQP start character");
+ }
+ }
+
+ // ************************* Byte Buffer Processing
+
+ public void writeToBuffer(ByteBuffer buffer)
+ {
+ final boolean trace = _logger.isDebugEnabled();
+
+ if (trace)
+ {
+ _logger.debug("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
+ if (_properties != null)
+ {
+ _logger.debug(_properties.toString());
+ }
+ }
+
+ EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize());
+
+ putDataInBuffer(buffer);
+ }
+
+ public byte[] getDataAsBytes()
+ {
+ final int encodedSize = (int) getEncodedSize();
+ final ByteBuffer buffer = ByteBuffer.allocate(encodedSize); // FIXME XXX: Is cast a problem?
+
+ putDataInBuffer(buffer);
+
+ final byte[] result = new byte[encodedSize];
+ buffer.flip();
+ buffer.get(result);
+ buffer.release();
+
+ return result;
+ }
+
+ public long getEncodedSize()
+ {
+ return _encodedSize;
+ }
+
+ private void recalculateEncodedSize()
+ {
+
+ int encodedSize = 0;
+ if (_properties != null)
+ {
+ for (Map.Entry<AMQShortString, AMQTypedValue> e : _properties.entrySet())
+ {
+ encodedSize += EncodingUtils.encodedShortStringLength(e.getKey());
+ encodedSize++; // the byte for the encoding Type
+ encodedSize += e.getValue().getEncodingSize();
+
+ }
+ }
+
+ _encodedSize = encodedSize;
+ }
+
+ public void addAll(FieldTable fieldTable)
+ {
+ initMapIfNecessary();
+ _encodedForm = null;
+ _properties.putAll(fieldTable._properties);
+ recalculateEncodedSize();
+ }
+
+ public static Map<String, Object> convertToMap(final FieldTable fieldTable)
+ {
+ final Map<String, Object> map = new HashMap<String, Object>();
+
+ if(fieldTable != null)
+ {
+ fieldTable.processOverElements(
+ new FieldTableElementProcessor()
+ {
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ Object val = value.getValue();
+ if(val instanceof AMQShortString)
+ {
+ val = val.toString();
+ }
+ map.put(propertyName, val);
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return map;
+ }
+ });
+ }
+ return map;
+ }
+
+
+ public static interface FieldTableElementProcessor
+ {
+ public boolean processElement(String propertyName, AMQTypedValue value);
+
+ public Object getResult();
+ }
+
+ public Object processOverElements(FieldTableElementProcessor processor)
+ {
+ initMapIfNecessary();
+ if (_properties != null)
+ {
+ for (Map.Entry<AMQShortString, AMQTypedValue> e : _properties.entrySet())
+ {
+ boolean result = processor.processElement(e.getKey().toString(), e.getValue());
+ if (!result)
+ {
+ break;
+ }
+ }
+ }
+
+ return processor.getResult();
+
+ }
+
+ public int size()
+ {
+ initMapIfNecessary();
+
+ return _properties.size();
+
+ }
+
+ public boolean isEmpty()
+ {
+ return size() == 0;
+ }
+
+ public boolean containsKey(AMQShortString key)
+ {
+ initMapIfNecessary();
+
+ return _properties.containsKey(key);
+ }
+
+ public boolean containsKey(String key)
+ {
+ return containsKey(new AMQShortString(key));
+ }
+
+ public Set<String> keys()
+ {
+ initMapIfNecessary();
+ Set<String> keys = new LinkedHashSet<String>();
+ for (AMQShortString key : _properties.keySet())
+ {
+ keys.add(key.toString());
+ }
+
+ return keys;
+ }
+
+ public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
+ {
+ if(_encodedForm != null)
+ {
+ return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
+ }
+ else
+ {
+ initMapIfNecessary();
+ return _properties.entrySet().iterator();
+ }
+ }
+
+ public Object get(String key)
+ {
+ return get(new AMQShortString(key));
+ }
+
+ public Object get(AMQShortString key)
+ {
+ return getObject(key);
+ }
+
+ public Object put(AMQShortString key, Object value)
+ {
+ return setObject(key, value);
+ }
+
+ public Object remove(String key)
+ {
+
+ return remove(new AMQShortString(key));
+
+ }
+
+ public Object remove(AMQShortString key)
+ {
+ AMQTypedValue val = removeKey(key);
+
+ return (val == null) ? null : val.getValue();
+
+ }
+
+ public AMQTypedValue removeKey(AMQShortString key)
+ {
+ initMapIfNecessary();
+ _encodedForm = null;
+ AMQTypedValue value = _properties.remove(key);
+ if (value == null)
+ {
+ return null;
+ }
+ else
+ {
+ _encodedSize -= EncodingUtils.encodedShortStringLength(key);
+ _encodedSize--;
+ _encodedSize -= value.getEncodingSize();
+
+ return value;
+ }
+
+ }
+
+ public void clear()
+ {
+ initMapIfNecessary();
+ _encodedForm = null;
+ _properties.clear();
+ _encodedSize = 0;
+ }
+
+ public Set<AMQShortString> keySet()
+ {
+ initMapIfNecessary();
+
+ return _properties.keySet();
+ }
+
+ private void putDataInBuffer(ByteBuffer buffer)
+ {
+
+ if (_encodedForm != null)
+ {
+ if(buffer.isDirect() || buffer.isReadOnly())
+ {
+ ByteBuffer encodedForm = _encodedForm.duplicate();
+
+ if (encodedForm.position() != 0)
+ {
+ encodedForm.flip();
+ }
+
+ buffer.put(encodedForm);
+ }
+ else
+ {
+ buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize);
+ }
+ }
+ else if (_properties != null)
+ {
+ final Iterator<Map.Entry<AMQShortString, AMQTypedValue>> it = _properties.entrySet().iterator();
+
+ // If there are values then write out the encoded Size... could check _encodedSize != 0
+ // write out the total length, which we have kept up to date as data is added
+
+ while (it.hasNext())
+ {
+ final Map.Entry<AMQShortString, AMQTypedValue> me = it.next();
+ try
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
+ + me.getValue().getValue());
+ _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
+ }
+
+ // Write the actual parameter name
+ EncodingUtils.writeShortStringBytes(buffer, me.getKey());
+ me.getValue().writeToBuffer(buffer);
+ }
+ catch (Exception e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Exception thrown:" + e);
+ _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
+ + me.getValue().getValue());
+ _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
+ }
+
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
+ {
+
+ final boolean trace = _logger.isDebugEnabled();
+ if (length > 0)
+ {
+
+ final int expectedRemaining = buffer.remaining() - (int) length;
+
+ _properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY);
+
+ do
+ {
+
+ final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
+
+ _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read key '" + key);
+
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
+
+ if (trace)
+ {
+ _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType()
+ + "', key '" + key + "', value '" + value.getValue() + "'");
+ }
+
+ _properties.put(key, value);
+
+ }
+ while (buffer.remaining() > expectedRemaining);
+
+ }
+
+ _encodedSize = length;
+
+ if (trace)
+ {
+ _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done.");
+ }
+ }
+
+ private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
+ {
+ private final AMQTypedValue _value;
+ private final AMQShortString _key;
+
+ public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
+ {
+ _key = key;
+ _value = value;
+ }
+
+ public AMQShortString getKey()
+ {
+ return _key;
+ }
+
+ public AMQTypedValue getValue()
+ {
+ return _value;
+ }
+
+ public AMQTypedValue setValue(final AMQTypedValue value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof FieldTableEntry)
+ {
+ FieldTableEntry other = (FieldTableEntry) o;
+ return (_key == null ? other._key == null : _key.equals(other._key))
+ && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return (getKey()==null ? 0 : getKey().hashCode())
+ ^ (getValue()==null ? 0 : getValue().hashCode());
+ }
+
+ }
+
+
+ private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
+ {
+
+ private final ByteBuffer _buffer;
+ private int _expectedRemaining;
+
+ public FieldTableIterator(ByteBuffer buffer, int length)
+ {
+ _buffer = buffer;
+ _expectedRemaining = buffer.remaining() - length;
+ }
+
+ public boolean hasNext()
+ {
+ return (_buffer.remaining() > _expectedRemaining);
+ }
+
+ public Map.Entry<AMQShortString, AMQTypedValue> next()
+ {
+ if(hasNext())
+ {
+ final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
+ AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
+ return new FieldTableEntry(key, value);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+
+
+ public int hashCode()
+ {
+ initMapIfNecessary();
+
+ return _properties.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (o == this)
+ {
+ return true;
+ }
+
+ if (o == null)
+ {
+ return false;
+ }
+
+ if (!(o instanceof FieldTable))
+ {
+ return false;
+ }
+
+ initMapIfNecessary();
+
+ FieldTable f = (FieldTable) o;
+ f.initMapIfNecessary();
+
+ return _properties.equals(f._properties);
+ }
+
+ public static FieldTable convertToFieldTable(Map<String, Object> map)
+ {
+ if (map != null)
+ {
+ FieldTable table = new FieldTable();
+ for(Map.Entry<String,Object> entry : map.entrySet())
+ {
+ table.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+
+ return table;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
new file mode 100644
index 0000000000..e9d75137ef
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class FieldTableFactory
+{
+ public static FieldTable newFieldTable()
+ {
+ return new FieldTable();
+ }
+
+ public static FieldTable newFieldTable(ByteBuffer byteBuffer, long length) throws AMQFrameDecodingException
+ {
+ return new FieldTable(byteBuffer, length);
+ }
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
new file mode 100644
index 0000000000..18ab05ffa1
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
+
+public class HeartbeatBody implements AMQBody
+{
+ public static final byte TYPE = 8;
+ public static final AMQFrame FRAME = new HeartbeatBody().toFrame();
+
+ public HeartbeatBody()
+ {
+
+ }
+
+ public HeartbeatBody(ByteBuffer buffer, long size)
+ {
+ if(size > 0)
+ {
+ //allow other implementations to have a payload, but ignore it:
+ buffer.skip((int) size);
+ }
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return 0;//heartbeats we generate have no payload
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.heartbeatBodyReceived(channelId, this);
+ }
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ if(size > 0)
+ {
+ //allow other implementations to have a payload, but ignore it:
+ buffer.skip((int) size);
+ }
+ }
+
+ public AMQFrame toFrame()
+ {
+ return new AMQFrame(0, this);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
new file mode 100644
index 0000000000..c7ada708dc
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class HeartbeatBodyFactory implements BodyFactory
+{
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ {
+ return new HeartbeatBody();
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
new file mode 100644
index 0000000000..fb3dd89717
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -0,0 +1,223 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.qpid.AMQException;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
+{
+
+ // TODO: generate these constants automatically from the xml protocol spec file
+ private static final byte[] AMQP_HEADER = new byte[]{(byte)'A',(byte)'M',(byte)'Q',(byte)'P'};
+
+ private static final byte CURRENT_PROTOCOL_CLASS = 1;
+ private static final byte TCP_PROTOCOL_INSTANCE = 1;
+
+ public final byte[] _protocolHeader;
+ public final byte _protocolClass;
+ public final byte _protocolInstance;
+ public final byte _protocolMajor;
+ public final byte _protocolMinor;
+
+
+// public ProtocolInitiation() {}
+
+ public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor)
+ {
+ _protocolHeader = protocolHeader;
+ _protocolClass = protocolClass;
+ _protocolInstance = protocolInstance;
+ _protocolMajor = protocolMajor;
+ _protocolMinor = protocolMinor;
+ }
+
+ public ProtocolInitiation(ProtocolVersion pv)
+ {
+ this(AMQP_HEADER,
+ pv.equals(ProtocolVersion.v0_91) ? 0 : CURRENT_PROTOCOL_CLASS,
+ pv.equals(ProtocolVersion.v0_91) ? 0 : TCP_PROTOCOL_INSTANCE,
+ pv.equals(ProtocolVersion.v0_91) ? 9 : pv.getMajorVersion(),
+ pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
+ }
+
+ public ProtocolInitiation(ByteBuffer in)
+ {
+ _protocolHeader = new byte[4];
+ in.get(_protocolHeader);
+
+ _protocolClass = in.get();
+ _protocolInstance = in.get();
+ _protocolMajor = in.get();
+ _protocolMinor = in.get();
+ }
+
+ public void writePayload(org.apache.mina.common.ByteBuffer buffer)
+ {
+ writePayload(buffer.buf());
+ }
+
+ public long getSize()
+ {
+ return 4 + 1 + 1 + 1 + 1;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+
+ buffer.put(_protocolHeader);
+ buffer.put(_protocolClass);
+ buffer.put(_protocolInstance);
+ buffer.put(_protocolMajor);
+ buffer.put(_protocolMinor);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ProtocolInitiation))
+ {
+ return false;
+ }
+
+ ProtocolInitiation pi = (ProtocolInitiation) o;
+ if (pi._protocolHeader == null)
+ {
+ return false;
+ }
+
+ if (_protocolHeader.length != pi._protocolHeader.length)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < _protocolHeader.length; i++)
+ {
+ if (_protocolHeader[i] != pi._protocolHeader[i])
+ {
+ return false;
+ }
+ }
+
+ return (_protocolClass == pi._protocolClass &&
+ _protocolInstance == pi._protocolInstance &&
+ _protocolMajor == pi._protocolMajor &&
+ _protocolMinor == pi._protocolMinor);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _protocolHeader != null ? Arrays.hashCode(_protocolHeader) : 0;
+ result = 31 * result + (int) _protocolClass;
+ result = 31 * result + (int) _protocolInstance;
+ result = 31 * result + (int) _protocolMajor;
+ result = 31 * result + (int) _protocolMinor;
+ return result;
+ }
+
+ public static class Decoder //implements MessageDecoder
+ {
+ /**
+ *
+ * @param in input buffer
+ * @return true if we have enough data to decode the PI frame fully, false if more
+ * data is required
+ */
+ public boolean decodable(ByteBuffer in)
+ {
+ return (in.remaining() >= 8);
+ }
+
+ }
+
+ public ProtocolVersion checkVersion() throws AMQException
+ {
+
+ if(_protocolHeader.length != 4)
+ {
+ throw new AMQProtocolHeaderException("Protocol header should have exactly four octets", null);
+ }
+ for(int i = 0; i < 4; i++)
+ {
+ if(_protocolHeader[i] != AMQP_HEADER[i])
+ {
+ try
+ {
+ throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"), null);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+
+ }
+ }
+ }
+
+ ProtocolVersion pv;
+
+ // Hack for 0-9-1 which changed how the header was defined
+ if(_protocolInstance == 0 && _protocolMajor == 9 && _protocolMinor == 1)
+ {
+ pv = ProtocolVersion.v0_91;
+ if (_protocolClass != 0)
+ {
+ throw new AMQProtocolClassException("Protocol class " + 0 + " was expected; received " +
+ _protocolClass, null);
+ }
+ }
+ else if (_protocolClass != CURRENT_PROTOCOL_CLASS)
+ {
+ throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
+ _protocolClass, null);
+ }
+ else if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
+ {
+ throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
+ _protocolInstance, null);
+ }
+ else
+ {
+ pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+ }
+
+
+ if (!pv.isSupported())
+ {
+ // TODO: add list of available versions in list to msg...
+ throw new AMQProtocolVersionException("Protocol version " +
+ _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.", null);
+ }
+ return pv;
+ }
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
+ buffer.append(Integer.toHexString(_protocolClass));
+ buffer.append(Integer.toHexString(_protocolInstance));
+ buffer.append(Integer.toHexString(_protocolMajor));
+ buffer.append(Integer.toHexString(_protocolMinor));
+ return buffer.toString();
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
new file mode 100644
index 0000000000..bd763599b0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ private AMQDataBlock _firstFrame;
+
+ private AMQDataBlock _block;
+
+ public SmallCompositeAMQDataBlock(AMQDataBlock block)
+ {
+ _block = block;
+ }
+
+ /**
+ * The encoded block will be logically first before the AMQDataBlocks which are encoded
+ * into the buffer afterwards.
+ * @param encodedBlock already-encoded data
+ * @param block a block to be encoded.
+ */
+ public SmallCompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock block)
+ {
+ this(block);
+ _firstFrame = encodedBlock;
+ }
+
+ public AMQDataBlock getBlock()
+ {
+ return _block;
+ }
+
+ public AMQDataBlock getFirstFrame()
+ {
+ return _firstFrame;
+ }
+
+ public long getSize()
+ {
+ long frameSize = _block.getSize();
+
+ if (_firstFrame != null)
+ {
+
+ frameSize += _firstFrame.getSize();
+ }
+ return frameSize;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (_firstFrame != null)
+ {
+ _firstFrame.writePayload(buffer);
+ }
+ _block.writePayload(buffer);
+
+ }
+
+ public String toString()
+ {
+ if (_block == null)
+ {
+ return "No blocks contained in composite frame";
+ }
+ else
+ {
+ StringBuilder buf = new StringBuilder(this.getClass().getName());
+ buf.append("{encodedBlock=").append(_firstFrame);
+
+ buf.append(" _block=[").append(_block.toString()).append("]");
+
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
new file mode 100644
index 0000000000..76c154581d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionSpecificRegistry
+{
+ private static final Logger _log = LoggerFactory.getLogger(VersionSpecificRegistry.class);
+
+ private final byte _protocolMajorVersion;
+ private final byte _protocolMinorVersion;
+
+ private static final int DEFAULT_MAX_CLASS_ID = 200;
+ private static final int DEFAULT_MAX_METHOD_ID = 50;
+
+ private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+
+ private ProtocolVersionMethodConverter _protocolVersionConverter;
+
+ public VersionSpecificRegistry(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+
+ _protocolVersionConverter = loadProtocolVersionConverters(major, minor);
+ }
+
+ private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion,
+ byte protocolMinorVersion)
+ {
+ try
+ {
+ Class<ProtocolVersionMethodConverter> versionMethodConverterClass =
+ (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_"
+ + protocolMajorVersion + "_" + protocolMinorVersion);
+
+ return versionMethodConverterClass.newInstance();
+
+ }
+ catch (ClassNotFoundException e)
+ {
+ _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion);
+ if (protocolMinorVersion != 0)
+ {
+ protocolMinorVersion--;
+
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else if (protocolMajorVersion != 0)
+ {
+ protocolMajorVersion--;
+
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else
+ {
+ return null;
+ }
+
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID)
+ {
+ try
+ {
+ return _registry[classID][methodID];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory)
+ {
+ if (_registry.length <= classID)
+ {
+ AMQMethodBodyInstanceFactory[][] oldRegistry = _registry;
+ _registry = new AMQMethodBodyInstanceFactory[classID + 1][];
+ System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length);
+ }
+
+ if (_registry[classID] == null)
+ {
+ _registry[classID] =
+ new AMQMethodBodyInstanceFactory[(methodID > DEFAULT_MAX_METHOD_ID) ? (methodID + 1)
+ : (DEFAULT_MAX_METHOD_ID + 1)];
+ }
+ else if (_registry[classID].length <= methodID)
+ {
+ AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID];
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID + 1];
+ System.arraycopy(oldMethods, 0, _registry[classID], 0, oldMethods.length);
+ }
+
+ _registry[classID][methodID] = instanceFactory;
+
+ }
+
+ public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) throws AMQFrameDecodingException
+ {
+ AMQMethodBodyInstanceFactory bodyFactory;
+ try
+ {
+ bodyFactory = _registry[classID][methodID];
+ }
+ catch (NullPointerException e)
+ {
+ throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version "
+ + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID
+ + " method " + methodID + ".", e);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ if (classID >= _registry.length)
+ {
+ throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version "
+ + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID
+ + " method " + methodID + ".", e);
+
+ }
+ else
+ {
+ throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version "
+ + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID
+ + " method " + methodID + ".", e);
+
+ }
+ }
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version "
+ + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID
+ + " method " + methodID + ".", null);
+ }
+
+ return bodyFactory.newInstance( in, size);
+
+ }
+
+ public ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolVersionConverter;
+ }
+
+ public void configure()
+ {
+ _protocolVersionConverter.configure();
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java
new file mode 100644
index 0000000000..1d7c05e9cc
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private final byte _protocolMajorVersion;
+
+
+ private final byte _protocolMinorVersion;
+
+ public AbstractMethodConverter(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+
+ public final byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public final byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
new file mode 100644
index 0000000000..0695349f76
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.mina.common.ByteBuffer;
+
+public interface ContentChunk
+{
+ int getSize();
+ ByteBuffer getData();
+
+ void reduceToFit();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
new file mode 100644
index 0000000000..a96bdcc171
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public interface MessagePublishInfo
+{
+
+ public AMQShortString getExchange();
+
+ public void setExchange(AMQShortString exchange);
+
+ public boolean isImmediate();
+
+ public boolean isMandatory();
+
+ public AMQShortString getRoutingKey();
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
new file mode 100644
index 0000000000..01d1a8a17b
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+
+public interface MessagePublishInfoConverter
+{
+ public MessagePublishInfo convertToInfo(AMQMethodBody body);
+ public AMQMethodBody convertToBody(MessagePublishInfo info);
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java
new file mode 100644
index 0000000000..e3d5da73da
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.AMQShortString;
+
+public class MessagePublishInfoImpl implements MessagePublishInfo
+{
+ private AMQShortString _exchange;
+ private boolean _immediate;
+ private boolean _mandatory;
+ private AMQShortString _routingKey;
+
+ public MessagePublishInfoImpl()
+ {
+ }
+
+ public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory,
+ AMQShortString routingKey)
+ {
+ _exchange = exchange;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _routingKey = routingKey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+ }
+
+ public void setImmediate(boolean immedate)
+ {
+ _immediate = immedate;
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public void setMandatory(boolean mandatory)
+ {
+ _mandatory = mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public void setRoutingKey(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
new file mode 100644
index 0000000000..7544d9b7e7
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQBody;
+
+import java.nio.ByteBuffer;
+
+public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+{
+ AMQBody convertToBody(ContentChunk contentBody);
+ ContentChunk convertToContentChunk(AMQBody body);
+
+ void configure();
+
+ AMQBody convertToBody(ByteBuffer buf);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java
new file mode 100644
index 0000000000..8d51343507
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_0_9;
+
+public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMethodBodyImpl
+{
+
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ public byte getMinor()
+ {
+ return 9;
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
new file mode 100644
index 0000000000..1c4a29b106
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_0_9;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.*;
+import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl;
+
+public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_0_9()
+ {
+ super((byte)0,(byte)9);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ if(contentChunk instanceof ContentChunk_0_9)
+ {
+ return ((ContentChunk_0_9)contentChunk).toBody();
+ }
+ else
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk_0_9(contentBodyChunk);
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID;
+ _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
+
+ }
+
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ {
+ return new ContentBody(ByteBuffer.wrap(buf));
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody publishBody = ((BasicPublishBody) methodBody);
+
+ final AMQShortString exchange = publishBody.getExchange();
+ final AMQShortString routingKey = publishBody.getRoutingKey();
+
+ return new MessagePublishInfoImpl(exchange,
+ publishBody.getImmediate(),
+ publishBody.getMandatory(),
+ routingKey);
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBodyImpl(0,
+ info.getExchange(),
+ info.getRoutingKey(),
+ info.isMandatory(),
+ info.isImmediate()) ;
+
+ }
+
+ private static class ContentChunk_0_9 implements ContentChunk
+ {
+ private final ContentBody _contentBodyChunk;
+
+ public ContentChunk_0_9(final ContentBody contentBodyChunk)
+ {
+ _contentBodyChunk = contentBodyChunk;
+ }
+
+ public int getSize()
+ {
+ return _contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return _contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ _contentBodyChunk.reduceBufferToFit();
+ }
+
+ public AMQBody toBody()
+ {
+ return _contentBodyChunk;
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java
new file mode 100644
index 0000000000..60b8a7e1a6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_0_91;
+
+public abstract class AMQMethodBody_0_91 extends org.apache.qpid.framing.AMQMethodBodyImpl
+{
+
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ public byte getMinor()
+ {
+ return 91;
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
new file mode 100644
index 0000000000..6e330574bc
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_0_91;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.*;
+
+public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_0_91()
+ {
+ super((byte)0,(byte)9);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ if(contentChunk instanceof ContentChunk_0_9)
+ {
+ return ((ContentChunk_0_9)contentChunk).toBody();
+ }
+ else
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk_0_9(contentBodyChunk);
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
+ _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
+
+ }
+
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ {
+ return new ContentBody(ByteBuffer.wrap(buf));
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody publishBody = ((BasicPublishBody) methodBody);
+
+ final AMQShortString exchange = publishBody.getExchange();
+ final AMQShortString routingKey = publishBody.getRoutingKey();
+
+ return new MessagePublishInfoImpl(exchange,
+ publishBody.getImmediate(),
+ publishBody.getMandatory(),
+ routingKey);
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBodyImpl(0,
+ info.getExchange(),
+ info.getRoutingKey(),
+ info.isMandatory(),
+ info.isImmediate()) ;
+
+ }
+
+ private static class ContentChunk_0_9 implements ContentChunk
+ {
+ private final ContentBody _contentBodyChunk;
+
+ public ContentChunk_0_9(final ContentBody contentBodyChunk)
+ {
+ _contentBodyChunk = contentBodyChunk;
+ }
+
+ public int getSize()
+ {
+ return _contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return _contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ _contentBodyChunk.reduceBufferToFit();
+ }
+
+ public AMQBody toBody()
+ {
+ return _contentBodyChunk;
+ }
+ }
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
new file mode 100644
index 0000000000..35645854c0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_8_0;
+
+public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl
+{
+
+ public byte getMajor()
+ {
+ return 8;
+ }
+
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
new file mode 100644
index 0000000000..c87820b9b2
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_8_0;
+
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
+import org.apache.qpid.framing.*;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_8_0()
+ {
+ super((byte)8,(byte)0);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
+ _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
+
+ }
+
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ {
+ return new ContentBody(ByteBuffer.wrap(buf));
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody publishBody = ((BasicPublishBody) methodBody);
+
+ final AMQShortString exchange = publishBody.getExchange();
+ final AMQShortString routingKey = publishBody.getRoutingKey();
+
+ return new MessagePublishInfoImpl(exchange == null ? null : exchange.intern(),
+ publishBody.getImmediate(),
+ publishBody.getMandatory(),
+ routingKey == null ? null : routingKey.intern());
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBodyImpl(0,
+ info.getExchange(),
+ info.getRoutingKey(),
+ info.isMandatory(),
+ info.isImmediate()) ;
+
+ }
+}