diff options
Diffstat (limited to 'java')
30 files changed, 583 insertions, 132 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index d8d2220a8e..f9ec0eb878 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -108,10 +108,11 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0); - ping.broker = new AMQShortString(_group.getLocal().getDetails()); - ping.responseRequired = true; - ping.load = _loadTable.getLocalLoad(); + ClusterPingBody ping = new ClusterPingBody((byte)8, + (byte)0, + _group.getLocal().getDetails(), + _loadTable.getLocalLoad(), + true); BlockingHandler handler = new BlockingHandler(); send(getLeader(), new SimpleBodySendable(ping), handler); handler.waitForCompletion(); @@ -156,8 +157,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, _logger.info(new LogMessage("Connected to {0}. joining", leader)); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); - join.broker = new AMQShortString(_group.getLocal().getDetails()); + ClusterJoinBody join = new ClusterJoinBody((byte)8, + (byte)0, + _group.getLocal().getDetails()); + send(leader, new SimpleBodySendable(join)); } @@ -177,8 +180,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); - leave.broker = new AMQShortString(_group.getLocal().getDetails()); + ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, + (byte)0, + _group.getLocal().getDetails()); + send(getLeader(), new SimpleBodySendable(leave)); } @@ -200,8 +205,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); - suspect.broker = new AMQShortString(broker.getDetails()); + ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, + (byte)0, + broker.getDetails()); + send(getLeader(), new SimpleBodySendable(suspect)); } } @@ -224,8 +231,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, //pass request on to leader: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); - request.broker = new AMQShortString(member.getDetails()); + ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, member.getDetails()); + Broker leader = getLeader(); send(leader, new SimpleBodySendable(request)); _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader)); @@ -271,9 +278,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0); - //TODO: revise this way of converting String to bytes... - announce.members = membership.getBytes(); + ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, membership.getBytes()); + + return announce; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 722ec1b256..aa16595095 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -54,7 +54,16 @@ class ConsumerCounts { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0); + BasicConsumeBody m = new BasicConsumeBody((byte)8, + (byte)0, + null, + queue, + false, + false, + false, + false, + queue, + 0); m.queue = queue; m.consumerTag = queue; replay(m, messages); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index ce3e71f0a5..243a28e5e8 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -50,13 +50,13 @@ public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory private final byte minor = (byte)0; private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] { - new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)), - new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor)), - new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)), - new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)), - new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)), - new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)), - new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor)) + new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)), + new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor,false,false,false,null,0)), + new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor,null,null,false,null,null,0)), + new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor,null,false,false,null,false,false,false,0,null)), + new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor,null,false,false,0)), + new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)), + new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor,null,false)) }); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 6898ffcec2..5cf6d5c3ff 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -79,8 +79,14 @@ public class ClusteredQueue extends AMQQueue //send deletion request to all other members: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); - request.queue = getName(); + QueueDeleteBody request = new QueueDeleteBody((byte)8, + (byte)0, + false, + false, + false, + getName(), + 0); + _groupMgr.broadcast(new SimpleBodySendable(request)); } } @@ -93,8 +99,11 @@ public class ClusteredQueue extends AMQQueue //signal other members: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0); - request.consumerTag = getName(); + BasicCancelBody request = new BasicCancelBody((byte)8, + (byte)0, + getName(), + false); + _groupMgr.broadcast(new SimpleBodySendable(request)); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 89ce0bc8b1..568de62d1b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -60,7 +60,7 @@ public class PrivateQueue extends AMQQueue //send delete request to peers: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, false,false,false,null,0); request.queue = getName(); _groupMgr.broadcast(new SimpleBodySendable(request)); } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java new file mode 100644 index 0000000000..9513cfc468 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -0,0 +1,9 @@ +package org.apache.qpid;
+
+public class AMQConnectionFailureException extends AMQException
+{
+ public AMQConnectionFailureException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java new file mode 100644 index 0000000000..ed1d2e8beb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java @@ -0,0 +1,9 @@ +package org.apache.qpid;
+
+public class AMQUnknownExchangeType extends AMQException
+{
+ public AMQUnknownExchangeType(String message)
+ {
+ super(message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 2a999fe130..552c8e599e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -34,18 +34,24 @@ public class AMQDataBlockDecoder private final Map _supportedBodies = new HashMap(); + private final static BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + static + { + _bodiesSupported[AMQMethodBody.TYPE] = AMQMethodBodyFactory.getInstance(); + _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); + _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + public AMQDataBlockDecoder() { - _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance()); - _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance()); - _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance()); - _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory()); } public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException { - // type, channel, body size and end byte - if (in.remaining() < (1 + 2 + 4 + 1)) + final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) { return false; } @@ -61,16 +67,13 @@ public class AMQDataBlockDecoder " bodySize = " + bodySize); } - if (in.remaining() < (bodySize + 1)) - { - return false; - } - return true; + return (remainingAfterAttributes >= bodySize); + } private boolean isSupportedFrameType(byte frameType) { - final boolean result = _supportedBodies.containsKey(new Byte(frameType)); + final boolean result = _bodiesSupported[frameType] != null; if (!result) { @@ -84,6 +87,7 @@ public class AMQDataBlockDecoder throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); + BodyFactory bodyFactory = _bodiesSupported[type]; if (!isSupportedFrameType(type)) { throw new AMQFrameDecodingException("Unsupported frame type: " + type); @@ -91,19 +95,19 @@ public class AMQDataBlockDecoder final int channel = in.getUnsignedShort(); final long bodySize = in.getUnsignedInt(); - BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type)); + /* if (bodyFactory == null) { throw new AMQFrameDecodingException("Unsupported body type: " + type); } - AMQFrame frame = new AMQFrame(); - - frame.populateFromBuffer(in, channel, bodySize, bodyFactory); + */ + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); + byte marker = in.get(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type); + throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type); } return frame; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 6af691fbe8..9e98d9792b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -38,6 +38,12 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock this.bodyFrame = bodyFrame; } + public AMQFrame(ByteBuffer in, int channel, long bodySize, BodyFactory bodyFactory) throws AMQFrameDecodingException + { + this.channel = channel; + this.bodyFrame = bodyFactory.createBody(in,bodySize); + } + public long getSize() { return 1 + 2 + 4 + bodyFrame.getSize() + 1; @@ -65,8 +71,8 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock throws AMQFrameDecodingException, AMQProtocolVersionException { this.channel = channel; - bodyFrame = bodyFactory.createBody(buffer); - bodyFrame.populateFromBuffer(buffer, bodySize); + bodyFrame = bodyFactory.createBody(buffer, bodySize); + } public String toString() diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index da0909d32f..95b461b6dc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -39,13 +39,13 @@ public class AMQMethodBodyFactory implements BodyFactory _log.debug("Creating method body factory"); } - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML // segments generated together are now handled by MainRegistry. The Cluster class, // if generated together with amqp.xml is a part of MainRegistry. // TODO: Connect with version acquired from ProtocolInitiation class. return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), - (byte)8, (byte)0); + (byte)8, (byte)0, in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java new file mode 100644 index 0000000000..c0a12a9aad --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -0,0 +1,9 @@ +package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public abstract interface AMQMethodBodyInstanceFactory
+{
+ public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index ad07634554..23c1929205 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -65,27 +65,46 @@ public enum AMQType public int getEncodingSize(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ return EncodingUtils.unsignedIntegerLength();
}
-
- public Object toNativeValue(Object value)
+ public Long toNativeValue(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ if (value instanceof Long)
+ {
+ return (Long) value;
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String)value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" +
+ value.getClass().getName() + ") to int.");
+ }
}
public void writeValueImpl(Object value, ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
public Object readValueFromBuffer(ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ return EncodingUtils.readUnsignedInteger(buffer);
}
},
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 14d1d0c7b0..7c881c5a78 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -264,7 +264,7 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties } public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException + throws AMQFrameDecodingException { _propertyFlags = propertyFlags; diff --git a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java index cf5708d993..59646577e1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -27,5 +27,5 @@ import org.apache.mina.common.ByteBuffer; */ public interface BodyFactory { - AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException; + AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index d5fccf9409..baeecaa17a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -32,6 +32,18 @@ public class ContentBody extends AMQBody { } + public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if (size > 0) + { + payload = buffer.slice(); + payload.limit((int) size); + buffer.skip((int) size); + } + + } + + public ContentBody(ByteBuffer payload) { this.payload = payload; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index 22af331ab7..5636229d53 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -39,9 +39,9 @@ public class ContentBodyFactory implements BodyFactory _log.debug("Creating content body factory"); } - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - return new ContentBody(); + return new ContentBody(in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 4ee36ee831..45280bdae3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -40,6 +40,18 @@ public class ContentHeaderBody extends AMQBody { } + public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + classId = buffer.getUnsignedShort(); + weight = buffer.getUnsignedShort(); + bodySize = buffer.getLong(); + int propertyFlags = buffer.getUnsignedShort(); + ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); + properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + + } + + public ContentHeaderBody(ContentHeaderProperties props, int classId) { properties = props; @@ -79,8 +91,8 @@ public class ContentHeaderBody extends AMQBody public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException, AMQProtocolVersionException { - ContentHeaderBody body = new ContentHeaderBody(); - body.populateFromBuffer(buffer, size); + ContentHeaderBody body = new ContentHeaderBody(buffer, size); + return body; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index ddf63f8aa3..818fc9cf0c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -39,11 +39,11 @@ public class ContentHeaderBodyFactory implements BodyFactory _log.debug("Creating content header body factory"); } - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { // all content headers are the same - it is only the properties that differ. // the content header body further delegates construction of properties - return new ContentHeaderBody(); + return new ContentHeaderBody(in,bodySize); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java index 88bdefca88..561d7852fd 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -41,7 +41,7 @@ public interface ContentHeaderProperties * @throws AMQFrameDecodingException when the buffer does not contain valid data */ void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException; + throws AMQFrameDecodingException; /** * @return the size of the encoded property list in bytes. diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index cfcc5db857..7dac018872 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -37,7 +37,7 @@ public class ContentHeaderPropertiesFactory public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, ByteBuffer buffer, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException + throws AMQFrameDecodingException { ContentHeaderProperties properties; // AMQP version change: "Hardwired" version to major=8, minor=0 diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 67b2d16ec0..c4d568ba88 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -331,7 +331,7 @@ public class EncodingUtils } - public static long unsignedIntegerLength() + public static int unsignedIntegerLength() { return 4; } @@ -356,6 +356,7 @@ public class EncodingUtils } } + public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) { if (table != null) @@ -387,6 +388,238 @@ public class EncodingUtils buffer.put(packedValue); } + public static void writeBooleans(ByteBuffer buffer, boolean value) + { + + buffer.put(value ? (byte) 1 : (byte) 0); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + + buffer.put(packedValue); + } + + + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5, + boolean value6) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte)(1 << 6)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5, + boolean value6, + boolean value7) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte)(1 << 6)); + } + + if (value7) + { + packedValue = (byte) (packedValue | (byte)(1 << 7)); + } + + buffer.put(packedValue); + } + + + + /** * This is used for writing longstrs. * @@ -619,7 +852,7 @@ public class EncodingUtils buffer.put((byte) (aBoolean ? 1 : 0)); } - public static Boolean readBoolean(ByteBuffer buffer) + public static boolean readBoolean(ByteBuffer buffer) { byte packedValue = buffer.get(); return (packedValue == 1); @@ -636,7 +869,7 @@ public class EncodingUtils buffer.put(aByte); } - public static Byte readByte(ByteBuffer buffer) + public static byte readByte(ByteBuffer buffer) { return buffer.get(); } @@ -653,7 +886,7 @@ public class EncodingUtils buffer.putShort(aShort); } - public static Short readShort(ByteBuffer buffer) + public static short readShort(ByteBuffer buffer) { return buffer.getShort(); } @@ -669,7 +902,7 @@ public class EncodingUtils buffer.putInt(aInteger); } - public static Integer readInteger(ByteBuffer buffer) + public static int readInteger(ByteBuffer buffer) { return buffer.getInt(); } @@ -685,7 +918,7 @@ public class EncodingUtils buffer.putLong(aLong); } - public static Long readLong(ByteBuffer buffer) + public static long readLong(ByteBuffer buffer) { return buffer.getLong(); } @@ -701,7 +934,7 @@ public class EncodingUtils buffer.putFloat(aFloat); } - public static Float readFloat(ByteBuffer buffer) + public static float readFloat(ByteBuffer buffer) { return buffer.getFloat(); } @@ -718,7 +951,7 @@ public class EncodingUtils buffer.putDouble(aDouble); } - public static Double readDouble(ByteBuffer buffer) + public static double readDouble(ByteBuffer buffer) { return buffer.getDouble(); } @@ -780,48 +1013,6 @@ public class EncodingUtils - public static void main(String[] args) - { - long[] nums = { 1000000000000000000L, - 100000000000000000L, - 10000000000000000L, - 1000000000000000L, - 100000000000000L, - 10000000000000L, - 1000000000000L, - 100000000000L, - 10000000000L, - 1000000000L, - 100000000L, - 10000000L, - 1000000L, - 100000L, - 10000L, - 1000L, - 100L, - 10L, - 1L, - 0L, - 787987932453564535L, - 543289830889480230L, - 3748104703875785L, - 463402485702857L, - 87402780489392L, - 1190489015032L, - 134303883744L - }; - - - - - for(int i = 0; i < nums.length; i++) - { - ByteBuffer buffer = ByteBuffer.allocate(25); - writeShortStringBytes(buffer, String.valueOf(nums[i])); - buffer.flip(); - System.out.println(nums[i] + " : " + readLongAsShortString(buffer)); - } - } public static long readLongAsShortString(ByteBuffer buffer) { @@ -857,4 +1048,37 @@ public class EncodingUtils return result; } + + public static long readUnsignedInteger(ByteBuffer buffer) + { + long l = 0xFF & buffer.get(); + l <<=8; + l = l | (0xFF & buffer.get()); + l <<=8; + l = l | (0xFF & buffer.get()); + l <<=8; + l = l | (0xFF & buffer.get()); + + return l; + } + + + public static void main(String[] args) + { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.setAutoExpand(true); + + long l = (long) Integer.MAX_VALUE; + l += 1024L; + + writeUnsignedInteger(buf, l); + + buf.flip(); + + long l2 = readUnsignedInteger(buf); + + System.out.println("before: " + l); + System.out.println("after: " + l2); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 7a160ef471..ca03f29047 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -27,6 +27,20 @@ public class HeartbeatBody extends AMQBody public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); + public HeartbeatBody() + { + + } + + public HeartbeatBody(ByteBuffer buffer, long size) + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + protected byte getFrameType() { return TYPE; diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index 97bd3d9253..c7ada708dc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java new file mode 100644 index 0000000000..174cb142e0 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -0,0 +1,77 @@ +package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ private ByteBuffer _encodedBlock;
+
+ private AMQDataBlock _block;
+
+ public SmallCompositeAMQDataBlock(AMQDataBlock block)
+ {
+ _block = block;
+ }
+
+ /**
+ * The encoded block will be logically first before the AMQDataBlocks which are encoded
+ * into the buffer afterwards.
+ * @param encodedBlock already-encoded data
+ * @param block a block to be encoded.
+ */
+ public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block)
+ {
+ this(block);
+ _encodedBlock = encodedBlock;
+ }
+
+ public AMQDataBlock getBlock()
+ {
+ return _block;
+ }
+
+ public ByteBuffer getEncodedBlock()
+ {
+ return _encodedBlock;
+ }
+
+ public long getSize()
+ {
+ long frameSize = _block.getSize();
+
+ if (_encodedBlock != null)
+ {
+ _encodedBlock.rewind();
+ frameSize += _encodedBlock.remaining();
+ }
+ return frameSize;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (_encodedBlock != null)
+ {
+ buffer.put(_encodedBlock);
+ }
+ _block.writePayload(buffer);
+
+ }
+
+ public String toString()
+ {
+ if (_block == null)
+ {
+ return "No blocks contained in composite frame";
+ }
+ else
+ {
+ StringBuilder buf = new StringBuilder(this.getClass().getName());
+ buf.append("{encodedBlock=").append(_encodedBlock);
+
+ buf.append(" _block=[").append(_block.toString()).append("]");
+
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 0eb43bdf5f..9d3c588fc8 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -105,7 +105,12 @@ public class TxAckTest extends TestCase long deliveryTag = i + 1; // TODO: fix hardcoded protocol version data TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8, - (byte)0), txnContext); + (byte)0, + null, + false, + false, + null, + 0), txnContext); _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); } _acked = acked; diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 52afecdb6a..ea576a5661 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -153,8 +153,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0); - request.routingKey = new AMQShortString(id); + BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,null,false,false,new AMQShortString(id),0); + return request; } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index d4a8f6c7f9..91a26632a1 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -163,8 +163,14 @@ public class AMQQueueMBeanTest extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); - publish.immediate = immediate; + BasicPublishBody publish = new BasicPublishBody((byte)8, + (byte)0, + null, + immediate, + false, + null, + 0); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = 1000; // in bytes return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java index 222b2c696a..d10d5acdd0 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -96,9 +96,13 @@ public class AckTest extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0); - publishBody.routingKey = new AMQShortString("rk"); - publishBody.exchange = new AMQShortString("someExchange"); + BasicPublishBody publishBody = new BasicPublishBody((byte)8, + (byte)0, + new AMQShortString("someExchange"), + false, + false, + new AMQShortString("rk"), + 0); AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); if (persistent) { diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java index da4627411d..6c48bb2bf4 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -61,8 +61,14 @@ class MessageTestHelper extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); - publish.immediate = immediate; + BasicPublishBody publish = new BasicPublishBody((byte)8, + (byte)0, + null, + immediate, + false, + null, + 0); + return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, new ContentHeaderBody()); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index b874ca9594..e2500d9865 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -52,7 +52,12 @@ public class TestReferenceCounting extends TestCase createPersistentContentHeader(); // TODO: fix hardcoded protocol version data AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0), + (byte)0, + null, + false, + false, + null, + 0), new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); @@ -76,7 +81,12 @@ public class TestReferenceCounting extends TestCase { // TODO: fix hardcoded protocol version data AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0), + (byte)0, + null, + false, + false, + null, + 0), new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); |