diff options
33 files changed, 776 insertions, 201 deletions
diff --git a/qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java b/qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java index a18338f3fe..13560f5c17 100644 --- a/qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java +++ b/qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java @@ -61,7 +61,9 @@ public class JavaGenerator extends Generator static private Method mbGetGenerateMethod; static private Method mbMangledGetGenerateMethod; static private Method mbParamListGenerateMethod; + static private Method mbPassedParamListGenerateMethod; static private Method mbMangledParamListGenerateMethod; + static private Method mbMangledPassedParamListGenerateMethod; static private Method mbBodyInitGenerateMethod; static private Method mbMangledBodyInitGenerateMethod; static private Method mbSizeGenerateMethod; @@ -131,11 +133,22 @@ public class JavaGenerator extends Generator AmqpVersionSet.class, int.class, int.class, boolean.class); } catch (NoSuchMethodException e) { e.printStackTrace(); } + + try { mbPassedParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod( + "generateMbPassedParamList", String.class, AmqpField.class, + AmqpVersionSet.class, int.class, int.class, boolean.class); } + catch (NoSuchMethodException e) { e.printStackTrace(); } + try { mbMangledParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod( "generateMbMangledParamList", AmqpField.class, int.class, int.class, boolean.class); } catch (NoSuchMethodException e) { e.printStackTrace(); } + try { mbMangledPassedParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod( + "generateMbMangledPassedParamList", AmqpField.class, + int.class, int.class, boolean.class); } + catch (NoSuchMethodException e) { e.printStackTrace(); } + try { mbBodyInitGenerateMethod = JavaGenerator.class.getDeclaredMethod( "generateMbBodyInit", String.class, AmqpField.class, AmqpVersionSet.class, int.class, int.class, boolean.class); } @@ -457,11 +470,11 @@ public class JavaGenerator extends Generator if (token.compareTo("${CLASS}") == 0 && thisClass != null) return thisClass.name; if (token.compareTo("${CLASS_ID_INIT}") == 0 && thisClass != null) - return generateIndexInitializer("classIdMap", thisClass.indexMap, 8); + return generateIndexInitializer("registerClassId", thisClass.indexMap, 8); if (token.compareTo("${METHOD}") == 0 && method != null) return method.name; if (token.compareTo("${METHOD_ID_INIT}") == 0 && method != null) - return generateIndexInitializer("methodIdMap", method.indexMap, 8); + return generateIndexInitializer("registerMethodId", method.indexMap, 8); if (token.compareTo("${FIELD}") == 0 && field != null) return field.name; @@ -573,6 +586,16 @@ public class JavaGenerator extends Generator codeSnippet += fieldMap.parseFieldMap(mbParamListGenerateMethod, mbMangledParamListGenerateMethod, 42, 4, this); } + + else if (token.compareTo("${mb_field_passed_parameter_list}") == 0) + { + // <cringe> The code generated by this is ugly... It puts a comma on a line by itself! + // TODO: Find a more elegant solution here sometime... + codeSnippet = fieldMap.size() > 0 ? Utils.createSpaces(42) + "," + cr : ""; + // </cringe> + codeSnippet += fieldMap.parseFieldMap(mbPassedParamListGenerateMethod, + mbMangledPassedParamListGenerateMethod, 42, 4, this); + } else if (token.compareTo("${mb_field_body_initialize}") == 0) { codeSnippet = fieldMap.parseFieldMap(mbBodyInitGenerateMethod, @@ -721,7 +744,7 @@ public class JavaGenerator extends Generator while (vItr.hasNext()) { AmqpVersion version = vItr.next(); - sb.append(indent + mapName + ".put(\"" + version.toString() + "\", " + index + ");" + cr); + sb.append(indent + mapName + "( (byte) " + version.getMajor() +", (byte) " + version.getMinor() + ", " + index + ");" + cr); } } return sb.toString(); @@ -746,12 +769,12 @@ public class JavaGenerator extends Generator { int classIndex = findIndex(thisClass.indexMap, version); int methodIndex = findIndex(method.indexMap, version); - sb.append(indent + "classIDMethodIDVersionBodyMap.put(" + cr); - sb.append(indent + tab + "createMapKey((short)" + classIndex + - ", (short)" + methodIndex + ", (byte)" + version.getMajor() + - ", (byte)" + version.getMinor() + "), " + cr); + sb.append(indent + "registerMethod(" + cr); + sb.append(indent + tab + "(short)" + classIndex + + ", (short)" + methodIndex + ", (byte)" + version.getMajor() + + ", (byte)" + version.getMinor() + ", " + cr); sb.append(indent + tab + Utils.firstUpper(thisClass.name) + - Utils.firstUpper(method.name) + "Body.class);" + cr); + Utils.firstUpper(method.name) + "Body.getFactory());" + cr); } catch (Exception e) {} // Ignore } @@ -924,6 +947,15 @@ public class JavaGenerator extends Generator (nextFlag ? "," : "") + " // AMQP version(s): " + versionSet + cr; } + + protected String generateMbPassedParamList(String codeType, AmqpField field, + AmqpVersionSet versionSet, int indentSize, int tabSize, boolean nextFlag) + { + return Utils.createSpaces(indentSize) + field.name + + (nextFlag ? "," : "") + " // AMQP version(s): " + versionSet + cr; + } + + protected String generateMbMangledParamList(AmqpField field, int indentSize, int tabSize, boolean nextFlag) throws AmqpTypeMappingException @@ -943,10 +975,29 @@ public class JavaGenerator extends Generator return sb.toString(); } + protected String generateMbMangledPassedParamList(AmqpField field, int indentSize, + int tabSize, boolean nextFlag) + throws AmqpTypeMappingException + { + StringBuffer sb = new StringBuffer(); + Iterator<String> dItr = field.domainMap.keySet().iterator(); + int domainCntr = 0; + while (dItr.hasNext()) + { + String domainName = dItr.next(); + AmqpVersionSet versionSet = field.domainMap.get(domainName); + sb.append(Utils.createSpaces(indentSize) + field.name + "_" + + (domainCntr++) + (nextFlag ? "," : "") + " // AMQP version(s): " + + versionSet + cr); + } + return sb.toString(); + } + + protected String generateMbBodyInit(String codeType, AmqpField field, AmqpVersionSet versionSet, int indentSize, int tabSize, boolean nextFlag) { - return Utils.createSpaces(indentSize) + "body." + field.name + " = " + field.name + + return Utils.createSpaces(indentSize) + "this." + field.name + " = " + field.name + ";" + cr; } @@ -960,7 +1011,7 @@ public class JavaGenerator extends Generator while (dItr.hasNext()) { dItr.next(); - sb.append(Utils.createSpaces(indentSize) + "body." + field.name + "_" + domainCntr + + sb.append(Utils.createSpaces(indentSize) + "this." + field.name + "_" + domainCntr + " = " + field.name + "_" + (domainCntr++) + ";" + cr); } return sb.toString(); @@ -1004,23 +1055,26 @@ public class JavaGenerator extends Generator int ordinal, int indentSize, int tabSize) { String indent = Utils.createSpaces(indentSize); - String bitArrayName = "bitArray_" + ordinal; - StringBuffer sb = new StringBuffer(indent + "boolean[] " + bitArrayName + - " = new boolean[] { "); - for (int i=0; i<bitFieldList.size(); i++) + + StringBuilder sb = new StringBuilder(); + int i = 0; + while(i <bitFieldList.size()) { - if (i != 0) + + StringBuilder line = new StringBuilder(); + + for (int j=0; i<bitFieldList.size() && j<8; i++, j++) { - if ((i + 3) % 6 == 0) - sb.append("," + cr + indent + Utils.createSpaces(tabSize)); - else - sb.append(", "); + if (j != 0) + { + line.append(", "); + } + line.append(bitFieldList.get(i)); } - sb.append(bitFieldList.get(i)); + + sb.append(Utils.createSpaces(indentSize) + + typeMap.get("bit").encodeExpression.replaceAll("#", line.toString()) + ";" + cr); } - sb.append(" };" + cr); - sb.append(Utils.createSpaces(indentSize) + - typeMap.get("bit").encodeExpression.replaceAll("#", bitArrayName) + ";" + cr); return sb.toString(); } @@ -1038,14 +1092,22 @@ public class JavaGenerator extends Generator int ordinal, int indentSize, int tabSize) { String indent = Utils.createSpaces(indentSize); - String bitArrayName = "bitArray_" + ordinal; - StringBuffer sb = new StringBuffer(); - sb.append(indent + - typeMap.get("bit").decodeExpression.replaceAll("#", "boolean[] " + bitArrayName) + - ";" + cr); - for (int i=0; i<bitFieldList.size(); i++) + + StringBuilder sb = new StringBuilder(indent); + sb.append("byte packedValue;"); + sb.append(cr); + + // RG HERE! + + int i = 0; + while(i < bitFieldList.size()) { - sb.append(indent + bitFieldList.get(i) + " = " + bitArrayName + "[" + i + "];" + cr); + sb.append(indent + "packedValue = EncodingUtils.readByte(buffer);" + cr); + + for(int j = 0; i < bitFieldList.size() && j < 8; i++, j++) + { + sb.append(indent + bitFieldList.get(i) + " = ( packedValue & (byte) (1 << " + j + ") ) != 0;" + cr); + } } return sb.toString(); } diff --git a/qpid/gentools/templ.java/MethodBodyClass.tmpl b/qpid/gentools/templ.java/MethodBodyClass.tmpl index 04ce639f23..249738e109 100644 --- a/qpid/gentools/templ.java/MethodBodyClass.tmpl +++ b/qpid/gentools/templ.java/MethodBodyClass.tmpl @@ -28,36 +28,97 @@ package org.apache.qpid.framing; -import java.util.TreeMap; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.Arrays; import org.apache.mina.common.ByteBuffer; public class ${CLASS}${METHOD}Body extends AMQMethodBody implements EncodableAMQDataBlock { - public static final TreeMap<String, Integer> classIdMap = new TreeMap<String, Integer>(); - public static final TreeMap<String, Integer> methodIdMap = new TreeMap<String, Integer>(); + private static final AMQMethodBodyInstanceFactory factory = new AMQMethodBodyInstanceFactory() + { + public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer in, long size) throws AMQFrameDecodingException + { + return new ${CLASS}${METHOD}Body(major, minor, in); + } + }; + + public static AMQMethodBodyInstanceFactory getFactory() + { + return factory; + } + + public static HashMap<Integer, Integer> classIdMap = new HashMap<Integer, Integer>(); + public static HashMap<Integer, Integer> methodIdMap = new HashMap<Integer, Integer>(); + + private static void registerMethodId(byte major, byte minor, int methodId) + { + methodIdMap.put((0xff & (int) major) | ((0xff & (int) minor)<<8), methodId); + } + + private static void registerClassId(byte major, byte minor, int classId) + { + classIdMap.put((0xff & (int) major) | ((0xff & (int) minor)<<8), classId); + } + static { -${CLASS_ID_INIT} -${METHOD_ID_INIT} + + ${CLASS_ID_INIT} + ${METHOD_ID_INIT} + } // Fields declared in specification %{FLIST} ${field_declaration} + private final int _clazz; + private final int _method; + + // Constructor - public ${CLASS}${METHOD}Body(byte major, byte minor) + + public ${CLASS}${METHOD}Body(byte major, byte minor, ByteBuffer buffer) throws AMQFrameDecodingException + { + super(major, minor); + _clazz = getClazz(major,minor); + _method = getMethod(major,minor); + %{FLIST} ${mb_field_decode} + } + public ${CLASS}${METHOD}Body(byte major, byte minor + %{FLIST} ${mb_field_parameter_list} + ) { super(major, minor); + _clazz = getClazz(major,minor); + _method = getMethod(major,minor); + %{FLIST} ${mb_field_body_initialize} + } + + public int getClazz() + { + return _clazz; + } + + public int getMethod() + { + return _method; } - public int getClazz() { return classIdMap.get(major + "-" + minor); } - public int getMethod() { return methodIdMap.get(major + "-" + minor); } - public static int getClazz(byte major, byte minor) { return classIdMap.get(major + "-" + minor); } - public static int getMethod(byte major, byte minor) { return methodIdMap.get(major + "-" + minor); } + public static int getClazz(byte major, byte minor) + { + return classIdMap.get((0xff & (int) major) | ((0xff & (int) minor)<<8)); + } - // Field methods + public static int getMethod(byte major, byte minor) + { + return methodIdMap.get((0xff & (int) major) | ((0xff & (int) minor)<<8)); + } + + + // Field methods %{FLIST} ${mb_field_get_method} protected int getBodySize() @@ -84,16 +145,16 @@ ${METHOD_ID_INIT} return buf.toString(); } - public static AMQFrame createAMQFrame(int _channelId, byte major, byte minor + public static AMQFrame createAMQFrame(int channelId, byte major, byte minor %{FLIST} ${mb_field_parameter_list} ) { - ${CLASS}${METHOD}Body body = new ${CLASS}${METHOD}Body(major, minor); -%{FLIST} ${mb_field_body_initialize} - - AMQFrame frame = new AMQFrame(); - frame.channel = _channelId; - frame.bodyFrame = body; + ${CLASS}${METHOD}Body body = new ${CLASS}${METHOD}Body(major, minor +%{FLIST} ${mb_field_passed_parameter_list} + ); + + + AMQFrame frame = new AMQFrame(channelId, body); return frame; } } diff --git a/qpid/gentools/templ.java/MethodRegistryClass.tmpl b/qpid/gentools/templ.java/MethodRegistryClass.tmpl index 0f15918f90..a243ace00e 100644 --- a/qpid/gentools/templ.java/MethodRegistryClass.tmpl +++ b/qpid/gentools/templ.java/MethodRegistryClass.tmpl @@ -31,41 +31,41 @@ package org.apache.qpid.framing; import java.util.HashMap; import java.lang.reflect.Constructor; import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; class MainRegistry { + private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>(); + + private static final Logger _log = Logger.getLogger(MainRegistry.class); - private static HashMap<Long, Class> classIDMethodIDVersionBodyMap = new HashMap<Long, Class>(); static { %{CLIST} ${reg_map_put_method} } - public static AMQMethodBody get(short classID, short methodID, byte major, byte minor) + public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size) throws AMQFrameDecodingException { - Class bodyClass = classIDMethodIDVersionBodyMap.get( - createMapKey(classID, methodID, major, minor)); - if (bodyClass == null) - { - throw new AMQFrameDecodingException(_log, - "Unable to find a suitable decoder for class " + classID + " and method " + - methodID + " in AMQP version " + major + "-" + minor + "."); - } - try - { - Constructor initFn = bodyClass.getConstructor(byte.class, byte.class); - return (AMQMethodBody) initFn.newInstance(major, minor); - } - catch (Exception e) - { - throw new AMQFrameDecodingException(_log, - "Unable to instantiate body class for class " + classID + " and method " + - methodID + " in AMQP version " + major + "-" + minor + " : " + e, e); - } + AMQMethodBodyInstanceFactory bodyFactory = classIDMethodIDVersionBodyMap.get(createMapKey(classID,methodID,major,minor)); + if (bodyFactory == null) + { + throw new AMQFrameDecodingException(_log, + "Unable to find a suitable decoder for class " + classID + " and method " + + methodID + " in AMQP version " + major + "-" + minor + "."); + } + return bodyFactory.newInstance(major, minor, in, size); + + } + private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory ) + { + classIDMethodIDVersionBodyMap.put(createMapKey(classID,methodID,major,minor), instanceFactory); + } + + private static Long createMapKey(short classID, short methodID, byte major, byte minor) { /** @@ -77,4 +77,5 @@ class MainRegistry */ return new Long(((long)classID << 32) + ((long)methodID << 16) + ((long)major << 8) + minor); } + } diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index d8d2220a8e..f9ec0eb878 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -108,10 +108,11 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0); - ping.broker = new AMQShortString(_group.getLocal().getDetails()); - ping.responseRequired = true; - ping.load = _loadTable.getLocalLoad(); + ClusterPingBody ping = new ClusterPingBody((byte)8, + (byte)0, + _group.getLocal().getDetails(), + _loadTable.getLocalLoad(), + true); BlockingHandler handler = new BlockingHandler(); send(getLeader(), new SimpleBodySendable(ping), handler); handler.waitForCompletion(); @@ -156,8 +157,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, _logger.info(new LogMessage("Connected to {0}. joining", leader)); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); - join.broker = new AMQShortString(_group.getLocal().getDetails()); + ClusterJoinBody join = new ClusterJoinBody((byte)8, + (byte)0, + _group.getLocal().getDetails()); + send(leader, new SimpleBodySendable(join)); } @@ -177,8 +180,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); - leave.broker = new AMQShortString(_group.getLocal().getDetails()); + ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, + (byte)0, + _group.getLocal().getDetails()); + send(getLeader(), new SimpleBodySendable(leave)); } @@ -200,8 +205,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); - suspect.broker = new AMQShortString(broker.getDetails()); + ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, + (byte)0, + broker.getDetails()); + send(getLeader(), new SimpleBodySendable(suspect)); } } @@ -224,8 +231,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, //pass request on to leader: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); - request.broker = new AMQShortString(member.getDetails()); + ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, member.getDetails()); + Broker leader = getLeader(); send(leader, new SimpleBodySendable(request)); _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader)); @@ -271,9 +278,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0); - //TODO: revise this way of converting String to bytes... - announce.members = membership.getBytes(); + ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, membership.getBytes()); + + return announce; } diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 722ec1b256..aa16595095 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -54,7 +54,16 @@ class ConsumerCounts { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0); + BasicConsumeBody m = new BasicConsumeBody((byte)8, + (byte)0, + null, + queue, + false, + false, + false, + false, + queue, + 0); m.queue = queue; m.consumerTag = queue; replay(m, messages); diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java index ce3e71f0a5..243a28e5e8 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -50,13 +50,13 @@ public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory private final byte minor = (byte)0; private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] { - new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)), - new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor)), - new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)), - new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)), - new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)), - new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)), - new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor)) + new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)), + new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor,false,false,false,null,0)), + new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor,null,null,false,null,null,0)), + new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor,null,false,false,null,false,false,false,0,null)), + new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor,null,false,false,0)), + new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)), + new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor,null,false)) }); diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 6898ffcec2..5cf6d5c3ff 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -79,8 +79,14 @@ public class ClusteredQueue extends AMQQueue //send deletion request to all other members: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); - request.queue = getName(); + QueueDeleteBody request = new QueueDeleteBody((byte)8, + (byte)0, + false, + false, + false, + getName(), + 0); + _groupMgr.broadcast(new SimpleBodySendable(request)); } } @@ -93,8 +99,11 @@ public class ClusteredQueue extends AMQQueue //signal other members: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0); - request.consumerTag = getName(); + BasicCancelBody request = new BasicCancelBody((byte)8, + (byte)0, + getName(), + false); + _groupMgr.broadcast(new SimpleBodySendable(request)); } diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 89ce0bc8b1..568de62d1b 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -60,7 +60,7 @@ public class PrivateQueue extends AMQQueue //send delete request to peers: // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, false,false,false,null,0); request.queue = getName(); _groupMgr.broadcast(new SimpleBodySendable(request)); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java new file mode 100644 index 0000000000..9513cfc468 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -0,0 +1,9 @@ +package org.apache.qpid;
+
+public class AMQConnectionFailureException extends AMQException
+{
+ public AMQConnectionFailureException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java new file mode 100644 index 0000000000..ed1d2e8beb --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java @@ -0,0 +1,9 @@ +package org.apache.qpid;
+
+public class AMQUnknownExchangeType extends AMQException
+{
+ public AMQUnknownExchangeType(String message)
+ {
+ super(message);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 2a999fe130..552c8e599e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -34,18 +34,24 @@ public class AMQDataBlockDecoder private final Map _supportedBodies = new HashMap(); + private final static BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + static + { + _bodiesSupported[AMQMethodBody.TYPE] = AMQMethodBodyFactory.getInstance(); + _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); + _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + public AMQDataBlockDecoder() { - _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance()); - _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance()); - _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance()); - _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory()); } public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException { - // type, channel, body size and end byte - if (in.remaining() < (1 + 2 + 4 + 1)) + final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) { return false; } @@ -61,16 +67,13 @@ public class AMQDataBlockDecoder " bodySize = " + bodySize); } - if (in.remaining() < (bodySize + 1)) - { - return false; - } - return true; + return (remainingAfterAttributes >= bodySize); + } private boolean isSupportedFrameType(byte frameType) { - final boolean result = _supportedBodies.containsKey(new Byte(frameType)); + final boolean result = _bodiesSupported[frameType] != null; if (!result) { @@ -84,6 +87,7 @@ public class AMQDataBlockDecoder throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); + BodyFactory bodyFactory = _bodiesSupported[type]; if (!isSupportedFrameType(type)) { throw new AMQFrameDecodingException("Unsupported frame type: " + type); @@ -91,19 +95,19 @@ public class AMQDataBlockDecoder final int channel = in.getUnsignedShort(); final long bodySize = in.getUnsignedInt(); - BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type)); + /* if (bodyFactory == null) { throw new AMQFrameDecodingException("Unsupported body type: " + type); } - AMQFrame frame = new AMQFrame(); - - frame.populateFromBuffer(in, channel, bodySize, bodyFactory); + */ + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); + byte marker = in.get(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type); + throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type); } return frame; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 6af691fbe8..9e98d9792b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -38,6 +38,12 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock this.bodyFrame = bodyFrame; } + public AMQFrame(ByteBuffer in, int channel, long bodySize, BodyFactory bodyFactory) throws AMQFrameDecodingException + { + this.channel = channel; + this.bodyFrame = bodyFactory.createBody(in,bodySize); + } + public long getSize() { return 1 + 2 + 4 + bodyFrame.getSize() + 1; @@ -65,8 +71,8 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock throws AMQFrameDecodingException, AMQProtocolVersionException { this.channel = channel; - bodyFrame = bodyFactory.createBody(buffer); - bodyFrame.populateFromBuffer(buffer, bodySize); + bodyFrame = bodyFactory.createBody(buffer, bodySize); + } public String toString() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index da0909d32f..95b461b6dc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -39,13 +39,13 @@ public class AMQMethodBodyFactory implements BodyFactory _log.debug("Creating method body factory"); } - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML // segments generated together are now handled by MainRegistry. The Cluster class, // if generated together with amqp.xml is a part of MainRegistry. // TODO: Connect with version acquired from ProtocolInitiation class. return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), - (byte)8, (byte)0); + (byte)8, (byte)0, in, bodySize); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java new file mode 100644 index 0000000000..c0a12a9aad --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -0,0 +1,9 @@ +package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public abstract interface AMQMethodBodyInstanceFactory
+{
+ public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index ad07634554..23c1929205 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -65,27 +65,46 @@ public enum AMQType public int getEncodingSize(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ return EncodingUtils.unsignedIntegerLength();
}
-
- public Object toNativeValue(Object value)
+ public Long toNativeValue(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ if (value instanceof Long)
+ {
+ return (Long) value;
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String)value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" +
+ value.getClass().getName() + ") to int.");
+ }
}
public void writeValueImpl(Object value, ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
public Object readValueFromBuffer(ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ return EncodingUtils.readUnsignedInteger(buffer);
}
},
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 14d1d0c7b0..7c881c5a78 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -264,7 +264,7 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties } public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException + throws AMQFrameDecodingException { _propertyFlags = propertyFlags; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java index cf5708d993..59646577e1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java @@ -27,5 +27,5 @@ import org.apache.mina.common.ByteBuffer; */ public interface BodyFactory { - AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException; + AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index d5fccf9409..baeecaa17a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -32,6 +32,18 @@ public class ContentBody extends AMQBody { } + public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + if (size > 0) + { + payload = buffer.slice(); + payload.limit((int) size); + buffer.skip((int) size); + } + + } + + public ContentBody(ByteBuffer payload) { this.payload = payload; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index 22af331ab7..5636229d53 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -39,9 +39,9 @@ public class ContentBodyFactory implements BodyFactory _log.debug("Creating content body factory"); } - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - return new ContentBody(); + return new ContentBody(in, bodySize); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 4ee36ee831..45280bdae3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -40,6 +40,18 @@ public class ContentHeaderBody extends AMQBody { } + public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + classId = buffer.getUnsignedShort(); + weight = buffer.getUnsignedShort(); + bodySize = buffer.getLong(); + int propertyFlags = buffer.getUnsignedShort(); + ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); + properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + + } + + public ContentHeaderBody(ContentHeaderProperties props, int classId) { properties = props; @@ -79,8 +91,8 @@ public class ContentHeaderBody extends AMQBody public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException, AMQProtocolVersionException { - ContentHeaderBody body = new ContentHeaderBody(); - body.populateFromBuffer(buffer, size); + ContentHeaderBody body = new ContentHeaderBody(buffer, size); + return body; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index ddf63f8aa3..818fc9cf0c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -39,11 +39,11 @@ public class ContentHeaderBodyFactory implements BodyFactory _log.debug("Creating content header body factory"); } - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { // all content headers are the same - it is only the properties that differ. // the content header body further delegates construction of properties - return new ContentHeaderBody(); + return new ContentHeaderBody(in,bodySize); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java index 88bdefca88..561d7852fd 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -41,7 +41,7 @@ public interface ContentHeaderProperties * @throws AMQFrameDecodingException when the buffer does not contain valid data */ void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException; + throws AMQFrameDecodingException; /** * @return the size of the encoded property list in bytes. diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index cfcc5db857..7dac018872 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -37,7 +37,7 @@ public class ContentHeaderPropertiesFactory public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, ByteBuffer buffer, int size) - throws AMQFrameDecodingException, AMQProtocolVersionException + throws AMQFrameDecodingException { ContentHeaderProperties properties; // AMQP version change: "Hardwired" version to major=8, minor=0 diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 67b2d16ec0..c4d568ba88 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -331,7 +331,7 @@ public class EncodingUtils } - public static long unsignedIntegerLength() + public static int unsignedIntegerLength() { return 4; } @@ -356,6 +356,7 @@ public class EncodingUtils } } + public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table) { if (table != null) @@ -387,6 +388,238 @@ public class EncodingUtils buffer.put(packedValue); } + public static void writeBooleans(ByteBuffer buffer, boolean value) + { + + buffer.put(value ? (byte) 1 : (byte) 0); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + + buffer.put(packedValue); + } + + + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5, + boolean value6) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte)(1 << 6)); + } + + buffer.put(packedValue); + } + + public static void writeBooleans(ByteBuffer buffer, + boolean value0, + boolean value1, + boolean value2, + boolean value3, + boolean value4, + boolean value5, + boolean value6, + boolean value7) + { + byte packedValue = value0 ? (byte) 1 : (byte) 0; + + if (value1) + { + packedValue = (byte) (packedValue | (byte)(1 << 1)); + } + + if (value2) + { + packedValue = (byte) (packedValue | (byte)(1 << 2)); + } + + if (value3) + { + packedValue = (byte) (packedValue | (byte)(1 << 3)); + } + + if (value4) + { + packedValue = (byte) (packedValue | (byte)(1 << 4)); + } + + if (value5) + { + packedValue = (byte) (packedValue | (byte)(1 << 5)); + } + + if (value6) + { + packedValue = (byte) (packedValue | (byte)(1 << 6)); + } + + if (value7) + { + packedValue = (byte) (packedValue | (byte)(1 << 7)); + } + + buffer.put(packedValue); + } + + + + /** * This is used for writing longstrs. * @@ -619,7 +852,7 @@ public class EncodingUtils buffer.put((byte) (aBoolean ? 1 : 0)); } - public static Boolean readBoolean(ByteBuffer buffer) + public static boolean readBoolean(ByteBuffer buffer) { byte packedValue = buffer.get(); return (packedValue == 1); @@ -636,7 +869,7 @@ public class EncodingUtils buffer.put(aByte); } - public static Byte readByte(ByteBuffer buffer) + public static byte readByte(ByteBuffer buffer) { return buffer.get(); } @@ -653,7 +886,7 @@ public class EncodingUtils buffer.putShort(aShort); } - public static Short readShort(ByteBuffer buffer) + public static short readShort(ByteBuffer buffer) { return buffer.getShort(); } @@ -669,7 +902,7 @@ public class EncodingUtils buffer.putInt(aInteger); } - public static Integer readInteger(ByteBuffer buffer) + public static int readInteger(ByteBuffer buffer) { return buffer.getInt(); } @@ -685,7 +918,7 @@ public class EncodingUtils buffer.putLong(aLong); } - public static Long readLong(ByteBuffer buffer) + public static long readLong(ByteBuffer buffer) { return buffer.getLong(); } @@ -701,7 +934,7 @@ public class EncodingUtils buffer.putFloat(aFloat); } - public static Float readFloat(ByteBuffer buffer) + public static float readFloat(ByteBuffer buffer) { return buffer.getFloat(); } @@ -718,7 +951,7 @@ public class EncodingUtils buffer.putDouble(aDouble); } - public static Double readDouble(ByteBuffer buffer) + public static double readDouble(ByteBuffer buffer) { return buffer.getDouble(); } @@ -780,48 +1013,6 @@ public class EncodingUtils - public static void main(String[] args) - { - long[] nums = { 1000000000000000000L, - 100000000000000000L, - 10000000000000000L, - 1000000000000000L, - 100000000000000L, - 10000000000000L, - 1000000000000L, - 100000000000L, - 10000000000L, - 1000000000L, - 100000000L, - 10000000L, - 1000000L, - 100000L, - 10000L, - 1000L, - 100L, - 10L, - 1L, - 0L, - 787987932453564535L, - 543289830889480230L, - 3748104703875785L, - 463402485702857L, - 87402780489392L, - 1190489015032L, - 134303883744L - }; - - - - - for(int i = 0; i < nums.length; i++) - { - ByteBuffer buffer = ByteBuffer.allocate(25); - writeShortStringBytes(buffer, String.valueOf(nums[i])); - buffer.flip(); - System.out.println(nums[i] + " : " + readLongAsShortString(buffer)); - } - } public static long readLongAsShortString(ByteBuffer buffer) { @@ -857,4 +1048,37 @@ public class EncodingUtils return result; } + + public static long readUnsignedInteger(ByteBuffer buffer) + { + long l = 0xFF & buffer.get(); + l <<=8; + l = l | (0xFF & buffer.get()); + l <<=8; + l = l | (0xFF & buffer.get()); + l <<=8; + l = l | (0xFF & buffer.get()); + + return l; + } + + + public static void main(String[] args) + { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.setAutoExpand(true); + + long l = (long) Integer.MAX_VALUE; + l += 1024L; + + writeUnsignedInteger(buf, l); + + buf.flip(); + + long l2 = readUnsignedInteger(buf); + + System.out.println("before: " + l); + System.out.println("after: " + l2); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 7a160ef471..ca03f29047 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -27,6 +27,20 @@ public class HeartbeatBody extends AMQBody public static final byte TYPE = 8; public static AMQFrame FRAME = new HeartbeatBody().toFrame(); + public HeartbeatBody() + { + + } + + public HeartbeatBody(ByteBuffer buffer, long size) + { + if(size > 0) + { + //allow other implementations to have a payload, but ignore it: + buffer.skip((int) size); + } + } + protected byte getFrameType() { return TYPE; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java index 97bd3d9253..c7ada708dc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public class HeartbeatBodyFactory implements BodyFactory { - public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException + public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { return new HeartbeatBody(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java new file mode 100644 index 0000000000..174cb142e0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -0,0 +1,77 @@ +package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ private ByteBuffer _encodedBlock;
+
+ private AMQDataBlock _block;
+
+ public SmallCompositeAMQDataBlock(AMQDataBlock block)
+ {
+ _block = block;
+ }
+
+ /**
+ * The encoded block will be logically first before the AMQDataBlocks which are encoded
+ * into the buffer afterwards.
+ * @param encodedBlock already-encoded data
+ * @param block a block to be encoded.
+ */
+ public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block)
+ {
+ this(block);
+ _encodedBlock = encodedBlock;
+ }
+
+ public AMQDataBlock getBlock()
+ {
+ return _block;
+ }
+
+ public ByteBuffer getEncodedBlock()
+ {
+ return _encodedBlock;
+ }
+
+ public long getSize()
+ {
+ long frameSize = _block.getSize();
+
+ if (_encodedBlock != null)
+ {
+ _encodedBlock.rewind();
+ frameSize += _encodedBlock.remaining();
+ }
+ return frameSize;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (_encodedBlock != null)
+ {
+ buffer.put(_encodedBlock);
+ }
+ _block.writePayload(buffer);
+
+ }
+
+ public String toString()
+ {
+ if (_block == null)
+ {
+ return "No blocks contained in composite frame";
+ }
+ else
+ {
+ StringBuilder buf = new StringBuilder(this.getClass().getName());
+ buf.append("{encodedBlock=").append(_encodedBlock);
+
+ buf.append(" _block=[").append(_block.toString()).append("]");
+
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 0eb43bdf5f..9d3c588fc8 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -105,7 +105,12 @@ public class TxAckTest extends TestCase long deliveryTag = i + 1; // TODO: fix hardcoded protocol version data TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8, - (byte)0), txnContext); + (byte)0, + null, + false, + false, + null, + 0), txnContext); _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); } _acked = acked; diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 52afecdb6a..ea576a5661 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -153,8 +153,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0); - request.routingKey = new AMQShortString(id); + BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,null,false,false,new AMQShortString(id),0); + return request; } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index d4a8f6c7f9..91a26632a1 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -163,8 +163,14 @@ public class AMQQueueMBeanTest extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); - publish.immediate = immediate; + BasicPublishBody publish = new BasicPublishBody((byte)8, + (byte)0, + null, + immediate, + false, + null, + 0); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = 1000; // in bytes return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java index 222b2c696a..d10d5acdd0 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -96,9 +96,13 @@ public class AckTest extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0); - publishBody.routingKey = new AMQShortString("rk"); - publishBody.exchange = new AMQShortString("someExchange"); + BasicPublishBody publishBody = new BasicPublishBody((byte)8, + (byte)0, + new AMQShortString("someExchange"), + false, + false, + new AMQShortString("rk"), + 0); AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); if (persistent) { diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java index da4627411d..6c48bb2bf4 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -61,8 +61,14 @@ class MessageTestHelper extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); - publish.immediate = immediate; + BasicPublishBody publish = new BasicPublishBody((byte)8, + (byte)0, + null, + immediate, + false, + null, + 0); + return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, new ContentHeaderBody()); } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index b874ca9594..e2500d9865 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -52,7 +52,12 @@ public class TestReferenceCounting extends TestCase createPersistentContentHeader(); // TODO: fix hardcoded protocol version data AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0), + (byte)0, + null, + false, + false, + null, + 0), new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); @@ -76,7 +81,12 @@ public class TestReferenceCounting extends TestCase { // TODO: fix hardcoded protocol version data AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0), + (byte)0, + null, + false, + false, + null, + 0), new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); |