summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-09 23:22:52 +0000
committerRobert Greig <rgreig@apache.org>2007-01-09 23:22:52 +0000
commit9b8622e1f2aa8dbd03572504233cb7c44e1fe3fa (patch)
treef6eb5a01456f43ae1843463885c498c84e96f246
parent7d88029ead925cd86f2dce37548311efae4032b2 (diff)
downloadqpid-python-9b8622e1f2aa8dbd03572504233cb7c44e1fe3fa.tar.gz
QPID-268 : (Patch supplied by Rob Godfrey) Improvements to performance of generated code
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@494650 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java122
-rw-r--r--qpid/gentools/templ.java/MethodBodyClass.tmpl97
-rw-r--r--qpid/gentools/templ.java/MethodRegistryClass.tmpl43
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java37
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java11
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java14
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java17
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java38
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java12
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java16
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java324
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java14
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java77
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java7
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java4
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java10
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java10
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java10
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java14
33 files changed, 776 insertions, 201 deletions
diff --git a/qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java b/qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
index a18338f3fe..13560f5c17 100644
--- a/qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
+++ b/qpid/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
@@ -61,7 +61,9 @@ public class JavaGenerator extends Generator
static private Method mbGetGenerateMethod;
static private Method mbMangledGetGenerateMethod;
static private Method mbParamListGenerateMethod;
+ static private Method mbPassedParamListGenerateMethod;
static private Method mbMangledParamListGenerateMethod;
+ static private Method mbMangledPassedParamListGenerateMethod;
static private Method mbBodyInitGenerateMethod;
static private Method mbMangledBodyInitGenerateMethod;
static private Method mbSizeGenerateMethod;
@@ -131,11 +133,22 @@ public class JavaGenerator extends Generator
AmqpVersionSet.class, int.class, int.class, boolean.class); }
catch (NoSuchMethodException e) { e.printStackTrace(); }
+
+ try { mbPassedParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
+ "generateMbPassedParamList", String.class, AmqpField.class,
+ AmqpVersionSet.class, int.class, int.class, boolean.class); }
+ catch (NoSuchMethodException e) { e.printStackTrace(); }
+
try { mbMangledParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
"generateMbMangledParamList", AmqpField.class,
int.class, int.class, boolean.class); }
catch (NoSuchMethodException e) { e.printStackTrace(); }
+ try { mbMangledPassedParamListGenerateMethod = JavaGenerator.class.getDeclaredMethod(
+ "generateMbMangledPassedParamList", AmqpField.class,
+ int.class, int.class, boolean.class); }
+ catch (NoSuchMethodException e) { e.printStackTrace(); }
+
try { mbBodyInitGenerateMethod = JavaGenerator.class.getDeclaredMethod(
"generateMbBodyInit", String.class, AmqpField.class,
AmqpVersionSet.class, int.class, int.class, boolean.class); }
@@ -457,11 +470,11 @@ public class JavaGenerator extends Generator
if (token.compareTo("${CLASS}") == 0 && thisClass != null)
return thisClass.name;
if (token.compareTo("${CLASS_ID_INIT}") == 0 && thisClass != null)
- return generateIndexInitializer("classIdMap", thisClass.indexMap, 8);
+ return generateIndexInitializer("registerClassId", thisClass.indexMap, 8);
if (token.compareTo("${METHOD}") == 0 && method != null)
return method.name;
if (token.compareTo("${METHOD_ID_INIT}") == 0 && method != null)
- return generateIndexInitializer("methodIdMap", method.indexMap, 8);
+ return generateIndexInitializer("registerMethodId", method.indexMap, 8);
if (token.compareTo("${FIELD}") == 0 && field != null)
return field.name;
@@ -573,6 +586,16 @@ public class JavaGenerator extends Generator
codeSnippet += fieldMap.parseFieldMap(mbParamListGenerateMethod,
mbMangledParamListGenerateMethod, 42, 4, this);
}
+
+ else if (token.compareTo("${mb_field_passed_parameter_list}") == 0)
+ {
+ // <cringe> The code generated by this is ugly... It puts a comma on a line by itself!
+ // TODO: Find a more elegant solution here sometime...
+ codeSnippet = fieldMap.size() > 0 ? Utils.createSpaces(42) + "," + cr : "";
+ // </cringe>
+ codeSnippet += fieldMap.parseFieldMap(mbPassedParamListGenerateMethod,
+ mbMangledPassedParamListGenerateMethod, 42, 4, this);
+ }
else if (token.compareTo("${mb_field_body_initialize}") == 0)
{
codeSnippet = fieldMap.parseFieldMap(mbBodyInitGenerateMethod,
@@ -721,7 +744,7 @@ public class JavaGenerator extends Generator
while (vItr.hasNext())
{
AmqpVersion version = vItr.next();
- sb.append(indent + mapName + ".put(\"" + version.toString() + "\", " + index + ");" + cr);
+ sb.append(indent + mapName + "( (byte) " + version.getMajor() +", (byte) " + version.getMinor() + ", " + index + ");" + cr);
}
}
return sb.toString();
@@ -746,12 +769,12 @@ public class JavaGenerator extends Generator
{
int classIndex = findIndex(thisClass.indexMap, version);
int methodIndex = findIndex(method.indexMap, version);
- sb.append(indent + "classIDMethodIDVersionBodyMap.put(" + cr);
- sb.append(indent + tab + "createMapKey((short)" + classIndex +
- ", (short)" + methodIndex + ", (byte)" + version.getMajor() +
- ", (byte)" + version.getMinor() + "), " + cr);
+ sb.append(indent + "registerMethod(" + cr);
+ sb.append(indent + tab + "(short)" + classIndex +
+ ", (short)" + methodIndex + ", (byte)" + version.getMajor() +
+ ", (byte)" + version.getMinor() + ", " + cr);
sb.append(indent + tab + Utils.firstUpper(thisClass.name) +
- Utils.firstUpper(method.name) + "Body.class);" + cr);
+ Utils.firstUpper(method.name) + "Body.getFactory());" + cr);
}
catch (Exception e) {} // Ignore
}
@@ -924,6 +947,15 @@ public class JavaGenerator extends Generator
(nextFlag ? "," : "") + " // AMQP version(s): " + versionSet + cr;
}
+
+ protected String generateMbPassedParamList(String codeType, AmqpField field,
+ AmqpVersionSet versionSet, int indentSize, int tabSize, boolean nextFlag)
+ {
+ return Utils.createSpaces(indentSize) + field.name +
+ (nextFlag ? "," : "") + " // AMQP version(s): " + versionSet + cr;
+ }
+
+
protected String generateMbMangledParamList(AmqpField field, int indentSize,
int tabSize, boolean nextFlag)
throws AmqpTypeMappingException
@@ -943,10 +975,29 @@ public class JavaGenerator extends Generator
return sb.toString();
}
+ protected String generateMbMangledPassedParamList(AmqpField field, int indentSize,
+ int tabSize, boolean nextFlag)
+ throws AmqpTypeMappingException
+ {
+ StringBuffer sb = new StringBuffer();
+ Iterator<String> dItr = field.domainMap.keySet().iterator();
+ int domainCntr = 0;
+ while (dItr.hasNext())
+ {
+ String domainName = dItr.next();
+ AmqpVersionSet versionSet = field.domainMap.get(domainName);
+ sb.append(Utils.createSpaces(indentSize) + field.name + "_" +
+ (domainCntr++) + (nextFlag ? "," : "") + " // AMQP version(s): " +
+ versionSet + cr);
+ }
+ return sb.toString();
+ }
+
+
protected String generateMbBodyInit(String codeType, AmqpField field,
AmqpVersionSet versionSet, int indentSize, int tabSize, boolean nextFlag)
{
- return Utils.createSpaces(indentSize) + "body." + field.name + " = " + field.name +
+ return Utils.createSpaces(indentSize) + "this." + field.name + " = " + field.name +
";" + cr;
}
@@ -960,7 +1011,7 @@ public class JavaGenerator extends Generator
while (dItr.hasNext())
{
dItr.next();
- sb.append(Utils.createSpaces(indentSize) + "body." + field.name + "_" + domainCntr +
+ sb.append(Utils.createSpaces(indentSize) + "this." + field.name + "_" + domainCntr +
" = " + field.name + "_" + (domainCntr++) + ";" + cr);
}
return sb.toString();
@@ -1004,23 +1055,26 @@ public class JavaGenerator extends Generator
int ordinal, int indentSize, int tabSize)
{
String indent = Utils.createSpaces(indentSize);
- String bitArrayName = "bitArray_" + ordinal;
- StringBuffer sb = new StringBuffer(indent + "boolean[] " + bitArrayName +
- " = new boolean[] { ");
- for (int i=0; i<bitFieldList.size(); i++)
+
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ while(i <bitFieldList.size())
{
- if (i != 0)
+
+ StringBuilder line = new StringBuilder();
+
+ for (int j=0; i<bitFieldList.size() && j<8; i++, j++)
{
- if ((i + 3) % 6 == 0)
- sb.append("," + cr + indent + Utils.createSpaces(tabSize));
- else
- sb.append(", ");
+ if (j != 0)
+ {
+ line.append(", ");
+ }
+ line.append(bitFieldList.get(i));
}
- sb.append(bitFieldList.get(i));
+
+ sb.append(Utils.createSpaces(indentSize) +
+ typeMap.get("bit").encodeExpression.replaceAll("#", line.toString()) + ";" + cr);
}
- sb.append(" };" + cr);
- sb.append(Utils.createSpaces(indentSize) +
- typeMap.get("bit").encodeExpression.replaceAll("#", bitArrayName) + ";" + cr);
return sb.toString();
}
@@ -1038,14 +1092,22 @@ public class JavaGenerator extends Generator
int ordinal, int indentSize, int tabSize)
{
String indent = Utils.createSpaces(indentSize);
- String bitArrayName = "bitArray_" + ordinal;
- StringBuffer sb = new StringBuffer();
- sb.append(indent +
- typeMap.get("bit").decodeExpression.replaceAll("#", "boolean[] " + bitArrayName) +
- ";" + cr);
- for (int i=0; i<bitFieldList.size(); i++)
+
+ StringBuilder sb = new StringBuilder(indent);
+ sb.append("byte packedValue;");
+ sb.append(cr);
+
+ // RG HERE!
+
+ int i = 0;
+ while(i < bitFieldList.size())
{
- sb.append(indent + bitFieldList.get(i) + " = " + bitArrayName + "[" + i + "];" + cr);
+ sb.append(indent + "packedValue = EncodingUtils.readByte(buffer);" + cr);
+
+ for(int j = 0; i < bitFieldList.size() && j < 8; i++, j++)
+ {
+ sb.append(indent + bitFieldList.get(i) + " = ( packedValue & (byte) (1 << " + j + ") ) != 0;" + cr);
+ }
}
return sb.toString();
}
diff --git a/qpid/gentools/templ.java/MethodBodyClass.tmpl b/qpid/gentools/templ.java/MethodBodyClass.tmpl
index 04ce639f23..249738e109 100644
--- a/qpid/gentools/templ.java/MethodBodyClass.tmpl
+++ b/qpid/gentools/templ.java/MethodBodyClass.tmpl
@@ -28,36 +28,97 @@
package org.apache.qpid.framing;
-import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Arrays;
import org.apache.mina.common.ByteBuffer;
public class ${CLASS}${METHOD}Body extends AMQMethodBody implements EncodableAMQDataBlock
{
- public static final TreeMap<String, Integer> classIdMap = new TreeMap<String, Integer>();
- public static final TreeMap<String, Integer> methodIdMap = new TreeMap<String, Integer>();
+ private static final AMQMethodBodyInstanceFactory factory = new AMQMethodBodyInstanceFactory()
+ {
+ public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new ${CLASS}${METHOD}Body(major, minor, in);
+ }
+ };
+
+ public static AMQMethodBodyInstanceFactory getFactory()
+ {
+ return factory;
+ }
+
+ public static HashMap<Integer, Integer> classIdMap = new HashMap<Integer, Integer>();
+ public static HashMap<Integer, Integer> methodIdMap = new HashMap<Integer, Integer>();
+
+ private static void registerMethodId(byte major, byte minor, int methodId)
+ {
+ methodIdMap.put((0xff & (int) major) | ((0xff & (int) minor)<<8), methodId);
+ }
+
+ private static void registerClassId(byte major, byte minor, int classId)
+ {
+ classIdMap.put((0xff & (int) major) | ((0xff & (int) minor)<<8), classId);
+ }
+
static
{
-${CLASS_ID_INIT}
-${METHOD_ID_INIT}
+
+ ${CLASS_ID_INIT}
+ ${METHOD_ID_INIT}
+
}
// Fields declared in specification
%{FLIST} ${field_declaration}
+ private final int _clazz;
+ private final int _method;
+
+
// Constructor
- public ${CLASS}${METHOD}Body(byte major, byte minor)
+
+ public ${CLASS}${METHOD}Body(byte major, byte minor, ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ super(major, minor);
+ _clazz = getClazz(major,minor);
+ _method = getMethod(major,minor);
+ %{FLIST} ${mb_field_decode}
+ }
+ public ${CLASS}${METHOD}Body(byte major, byte minor
+ %{FLIST} ${mb_field_parameter_list}
+ )
{
super(major, minor);
+ _clazz = getClazz(major,minor);
+ _method = getMethod(major,minor);
+ %{FLIST} ${mb_field_body_initialize}
+ }
+
+ public int getClazz()
+ {
+ return _clazz;
+ }
+
+ public int getMethod()
+ {
+ return _method;
}
- public int getClazz() { return classIdMap.get(major + "-" + minor); }
- public int getMethod() { return methodIdMap.get(major + "-" + minor); }
- public static int getClazz(byte major, byte minor) { return classIdMap.get(major + "-" + minor); }
- public static int getMethod(byte major, byte minor) { return methodIdMap.get(major + "-" + minor); }
+ public static int getClazz(byte major, byte minor)
+ {
+ return classIdMap.get((0xff & (int) major) | ((0xff & (int) minor)<<8));
+ }
- // Field methods
+ public static int getMethod(byte major, byte minor)
+ {
+ return methodIdMap.get((0xff & (int) major) | ((0xff & (int) minor)<<8));
+ }
+
+
+ // Field methods
%{FLIST} ${mb_field_get_method}
protected int getBodySize()
@@ -84,16 +145,16 @@ ${METHOD_ID_INIT}
return buf.toString();
}
- public static AMQFrame createAMQFrame(int _channelId, byte major, byte minor
+ public static AMQFrame createAMQFrame(int channelId, byte major, byte minor
%{FLIST} ${mb_field_parameter_list}
)
{
- ${CLASS}${METHOD}Body body = new ${CLASS}${METHOD}Body(major, minor);
-%{FLIST} ${mb_field_body_initialize}
-
- AMQFrame frame = new AMQFrame();
- frame.channel = _channelId;
- frame.bodyFrame = body;
+ ${CLASS}${METHOD}Body body = new ${CLASS}${METHOD}Body(major, minor
+%{FLIST} ${mb_field_passed_parameter_list}
+ );
+
+
+ AMQFrame frame = new AMQFrame(channelId, body);
return frame;
}
}
diff --git a/qpid/gentools/templ.java/MethodRegistryClass.tmpl b/qpid/gentools/templ.java/MethodRegistryClass.tmpl
index 0f15918f90..a243ace00e 100644
--- a/qpid/gentools/templ.java/MethodRegistryClass.tmpl
+++ b/qpid/gentools/templ.java/MethodRegistryClass.tmpl
@@ -31,41 +31,41 @@ package org.apache.qpid.framing;
import java.util.HashMap;
import java.lang.reflect.Constructor;
import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
class MainRegistry
{
+ private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>();
+
+
private static final Logger _log = Logger.getLogger(MainRegistry.class);
- private static HashMap<Long, Class> classIDMethodIDVersionBodyMap = new HashMap<Long, Class>();
static
{
%{CLIST} ${reg_map_put_method}
}
- public static AMQMethodBody get(short classID, short methodID, byte major, byte minor)
+ public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
throws AMQFrameDecodingException
{
- Class bodyClass = classIDMethodIDVersionBodyMap.get(
- createMapKey(classID, methodID, major, minor));
- if (bodyClass == null)
- {
- throw new AMQFrameDecodingException(_log,
- "Unable to find a suitable decoder for class " + classID + " and method " +
- methodID + " in AMQP version " + major + "-" + minor + ".");
- }
- try
- {
- Constructor initFn = bodyClass.getConstructor(byte.class, byte.class);
- return (AMQMethodBody) initFn.newInstance(major, minor);
- }
- catch (Exception e)
- {
- throw new AMQFrameDecodingException(_log,
- "Unable to instantiate body class for class " + classID + " and method " +
- methodID + " in AMQP version " + major + "-" + minor + " : " + e, e);
- }
+ AMQMethodBodyInstanceFactory bodyFactory = classIDMethodIDVersionBodyMap.get(createMapKey(classID,methodID,major,minor));
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Unable to find a suitable decoder for class " + classID + " and method " +
+ methodID + " in AMQP version " + major + "-" + minor + ".");
+ }
+ return bodyFactory.newInstance(major, minor, in, size);
+
+
}
+ private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory )
+ {
+ classIDMethodIDVersionBodyMap.put(createMapKey(classID,methodID,major,minor), instanceFactory);
+ }
+
+
private static Long createMapKey(short classID, short methodID, byte major, byte minor)
{
/**
@@ -77,4 +77,5 @@ class MainRegistry
*/
return new Long(((long)classID << 32) + ((long)methodID << 16) + ((long)major << 8) + minor);
}
+
}
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index d8d2220a8e..f9ec0eb878 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -108,10 +108,11 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0);
- ping.broker = new AMQShortString(_group.getLocal().getDetails());
- ping.responseRequired = true;
- ping.load = _loadTable.getLocalLoad();
+ ClusterPingBody ping = new ClusterPingBody((byte)8,
+ (byte)0,
+ _group.getLocal().getDetails(),
+ _loadTable.getLocalLoad(),
+ true);
BlockingHandler handler = new BlockingHandler();
send(getLeader(), new SimpleBodySendable(ping), handler);
handler.waitForCompletion();
@@ -156,8 +157,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
_logger.info(new LogMessage("Connected to {0}. joining", leader));
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0);
- join.broker = new AMQShortString(_group.getLocal().getDetails());
+ ClusterJoinBody join = new ClusterJoinBody((byte)8,
+ (byte)0,
+ _group.getLocal().getDetails());
+
send(leader, new SimpleBodySendable(join));
}
@@ -177,8 +180,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0);
- leave.broker = new AMQShortString(_group.getLocal().getDetails());
+ ClusterLeaveBody leave = new ClusterLeaveBody((byte)8,
+ (byte)0,
+ _group.getLocal().getDetails());
+
send(getLeader(), new SimpleBodySendable(leave));
}
@@ -200,8 +205,10 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0);
- suspect.broker = new AMQShortString(broker.getDetails());
+ ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8,
+ (byte)0,
+ broker.getDetails());
+
send(getLeader(), new SimpleBodySendable(suspect));
}
}
@@ -224,8 +231,8 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
//pass request on to leader:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0);
- request.broker = new AMQShortString(member.getDetails());
+ ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, member.getDetails());
+
Broker leader = getLeader();
send(leader, new SimpleBodySendable(request));
_logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader));
@@ -271,9 +278,9 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0);
- //TODO: revise this way of converting String to bytes...
- announce.members = membership.getBytes();
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, membership.getBytes());
+
+
return announce;
}
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
index 722ec1b256..aa16595095 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
@@ -54,7 +54,16 @@ class ConsumerCounts
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- BasicConsumeBody m = new BasicConsumeBody((byte)8, (byte)0);
+ BasicConsumeBody m = new BasicConsumeBody((byte)8,
+ (byte)0,
+ null,
+ queue,
+ false,
+ false,
+ false,
+ false,
+ queue,
+ 0);
m.queue = queue;
m.consumerTag = queue;
replay(m, messages);
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
index ce3e71f0a5..243a28e5e8 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
@@ -50,13 +50,13 @@ public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory
private final byte minor = (byte)0;
private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
{
- new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor)),
- new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor)),
- new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor)),
- new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor)),
- new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor)),
- new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor)),
- new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor))
+ new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)),
+ new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor,false,false,false,null,0)),
+ new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor,null,null,false,null,null,0)),
+ new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor,null,false,false,null,false,false,false,0,null)),
+ new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor,null,false,false,0)),
+ new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)),
+ new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor,null,false))
});
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index 6898ffcec2..5cf6d5c3ff 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -79,8 +79,14 @@ public class ClusteredQueue extends AMQQueue
//send deletion request to all other members:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0);
- request.queue = getName();
+ QueueDeleteBody request = new QueueDeleteBody((byte)8,
+ (byte)0,
+ false,
+ false,
+ false,
+ getName(),
+ 0);
+
_groupMgr.broadcast(new SimpleBodySendable(request));
}
}
@@ -93,8 +99,11 @@ public class ClusteredQueue extends AMQQueue
//signal other members:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0);
- request.consumerTag = getName();
+ BasicCancelBody request = new BasicCancelBody((byte)8,
+ (byte)0,
+ getName(),
+ false);
+
_groupMgr.broadcast(new SimpleBodySendable(request));
}
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index 89ce0bc8b1..568de62d1b 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
@@ -60,7 +60,7 @@ public class PrivateQueue extends AMQQueue
//send delete request to peers:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0);
+ QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, false,false,false,null,0);
request.queue = getName();
_groupMgr.broadcast(new SimpleBodySendable(request));
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
new file mode 100644
index 0000000000..9513cfc468
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
@@ -0,0 +1,9 @@
+package org.apache.qpid;
+
+public class AMQConnectionFailureException extends AMQException
+{
+ public AMQConnectionFailureException(String message)
+ {
+ super(message);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
new file mode 100644
index 0000000000..ed1d2e8beb
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
@@ -0,0 +1,9 @@
+package org.apache.qpid;
+
+public class AMQUnknownExchangeType extends AMQException
+{
+ public AMQUnknownExchangeType(String message)
+ {
+ super(message);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 2a999fe130..552c8e599e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -34,18 +34,24 @@ public class AMQDataBlockDecoder
private final Map _supportedBodies = new HashMap();
+ private final static BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+ static
+ {
+ _bodiesSupported[AMQMethodBody.TYPE] = AMQMethodBodyFactory.getInstance();
+ _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+ _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+ _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
+
public AMQDataBlockDecoder()
{
- _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance());
- _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance());
- _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance());
- _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory());
}
public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
{
- // type, channel, body size and end byte
- if (in.remaining() < (1 + 2 + 4 + 1))
+ final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
+ // type, channel, body length and end byte
+ if (remainingAfterAttributes < 0)
{
return false;
}
@@ -61,16 +67,13 @@ public class AMQDataBlockDecoder
" bodySize = " + bodySize);
}
- if (in.remaining() < (bodySize + 1))
- {
- return false;
- }
- return true;
+ return (remainingAfterAttributes >= bodySize);
+
}
private boolean isSupportedFrameType(byte frameType)
{
- final boolean result = _supportedBodies.containsKey(new Byte(frameType));
+ final boolean result = _bodiesSupported[frameType] != null;
if (!result)
{
@@ -84,6 +87,7 @@ public class AMQDataBlockDecoder
throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
+ BodyFactory bodyFactory = _bodiesSupported[type];
if (!isSupportedFrameType(type))
{
throw new AMQFrameDecodingException("Unsupported frame type: " + type);
@@ -91,19 +95,19 @@ public class AMQDataBlockDecoder
final int channel = in.getUnsignedShort();
final long bodySize = in.getUnsignedInt();
- BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type));
+ /*
if (bodyFactory == null)
{
throw new AMQFrameDecodingException("Unsupported body type: " + type);
}
- AMQFrame frame = new AMQFrame();
-
- frame.populateFromBuffer(in, channel, bodySize, bodyFactory);
+ */
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
byte marker = in.get();
if ((marker & 0xFF) != 0xCE)
{
- throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type);
+ throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " length=" + bodySize + " type=" + type);
}
return frame;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
index 6af691fbe8..9e98d9792b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -38,6 +38,12 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
this.bodyFrame = bodyFrame;
}
+ public AMQFrame(ByteBuffer in, int channel, long bodySize, BodyFactory bodyFactory) throws AMQFrameDecodingException
+ {
+ this.channel = channel;
+ this.bodyFrame = bodyFactory.createBody(in,bodySize);
+ }
+
public long getSize()
{
return 1 + 2 + 4 + bodyFrame.getSize() + 1;
@@ -65,8 +71,8 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
throws AMQFrameDecodingException, AMQProtocolVersionException
{
this.channel = channel;
- bodyFrame = bodyFactory.createBody(buffer);
- bodyFrame.populateFromBuffer(buffer, bodySize);
+ bodyFrame = bodyFactory.createBody(buffer, bodySize);
+
}
public String toString()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
index da0909d32f..95b461b6dc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
@@ -39,13 +39,13 @@ public class AMQMethodBodyFactory implements BodyFactory
_log.debug("Creating method body factory");
}
- public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
// AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML
// segments generated together are now handled by MainRegistry. The Cluster class,
// if generated together with amqp.xml is a part of MainRegistry.
// TODO: Connect with version acquired from ProtocolInitiation class.
return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(),
- (byte)8, (byte)0);
+ (byte)8, (byte)0, in, bodySize);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
new file mode 100644
index 0000000000..c0a12a9aad
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
@@ -0,0 +1,9 @@
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+
+public abstract interface AMQMethodBodyInstanceFactory
+{
+ public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
index ad07634554..23c1929205 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
@@ -65,27 +65,46 @@ public enum AMQType
public int getEncodingSize(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ return EncodingUtils.unsignedIntegerLength();
}
-
- public Object toNativeValue(Object value)
+ public Long toNativeValue(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ if (value instanceof Long)
+ {
+ return (Long) value;
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String)value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value + "(" +
+ value.getClass().getName() + ") to int.");
+ }
}
public void writeValueImpl(Object value, ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
public Object readValueFromBuffer(ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ return EncodingUtils.readUnsignedInteger(buffer);
}
},
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 14d1d0c7b0..7c881c5a78 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -264,7 +264,7 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
}
public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ throws AMQFrameDecodingException
{
_propertyFlags = propertyFlags;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
index cf5708d993..59646577e1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
@@ -27,5 +27,5 @@ import org.apache.mina.common.ByteBuffer;
*/
public interface BodyFactory
{
- AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException;
+ AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index d5fccf9409..baeecaa17a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -32,6 +32,18 @@ public class ContentBody extends AMQBody
{
}
+ public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ if (size > 0)
+ {
+ payload = buffer.slice();
+ payload.limit((int) size);
+ buffer.skip((int) size);
+ }
+
+ }
+
+
public ContentBody(ByteBuffer payload)
{
this.payload = payload;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
index 22af331ab7..5636229d53 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java
@@ -39,9 +39,9 @@ public class ContentBodyFactory implements BodyFactory
_log.debug("Creating content body factory");
}
- public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- return new ContentBody();
+ return new ContentBody(in, bodySize);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 4ee36ee831..45280bdae3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -40,6 +40,18 @@ public class ContentHeaderBody extends AMQBody
{
}
+ public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ classId = buffer.getUnsignedShort();
+ weight = buffer.getUnsignedShort();
+ bodySize = buffer.getLong();
+ int propertyFlags = buffer.getUnsignedShort();
+ ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance();
+ properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14);
+
+ }
+
+
public ContentHeaderBody(ContentHeaderProperties props, int classId)
{
properties = props;
@@ -79,8 +91,8 @@ public class ContentHeaderBody extends AMQBody
public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
- ContentHeaderBody body = new ContentHeaderBody();
- body.populateFromBuffer(buffer, size);
+ ContentHeaderBody body = new ContentHeaderBody(buffer, size);
+
return body;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
index ddf63f8aa3..818fc9cf0c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java
@@ -39,11 +39,11 @@ public class ContentHeaderBodyFactory implements BodyFactory
_log.debug("Creating content header body factory");
}
- public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
// all content headers are the same - it is only the properties that differ.
// the content header body further delegates construction of properties
- return new ContentHeaderBody();
+ return new ContentHeaderBody(in,bodySize);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
index 88bdefca88..561d7852fd 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
@@ -41,7 +41,7 @@ public interface ContentHeaderProperties
* @throws AMQFrameDecodingException when the buffer does not contain valid data
*/
void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
- throws AMQFrameDecodingException, AMQProtocolVersionException;
+ throws AMQFrameDecodingException;
/**
* @return the size of the encoded property list in bytes.
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
index cfcc5db857..7dac018872 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -37,7 +37,7 @@ public class ContentHeaderPropertiesFactory
public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
ByteBuffer buffer, int size)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ throws AMQFrameDecodingException
{
ContentHeaderProperties properties;
// AMQP version change: "Hardwired" version to major=8, minor=0
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
index 67b2d16ec0..c4d568ba88 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
@@ -331,7 +331,7 @@ public class EncodingUtils
}
- public static long unsignedIntegerLength()
+ public static int unsignedIntegerLength()
{
return 4;
}
@@ -356,6 +356,7 @@ public class EncodingUtils
}
}
+
public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
{
if (table != null)
@@ -387,6 +388,238 @@ public class EncodingUtils
buffer.put(packedValue);
}
+ public static void writeBooleans(ByteBuffer buffer, boolean value)
+ {
+
+ buffer.put(value ? (byte) 1 : (byte) 0);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+
+ buffer.put(packedValue);
+ }
+
+
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5,
+ boolean value6)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ if (value6)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 6));
+ }
+
+ buffer.put(packedValue);
+ }
+
+ public static void writeBooleans(ByteBuffer buffer,
+ boolean value0,
+ boolean value1,
+ boolean value2,
+ boolean value3,
+ boolean value4,
+ boolean value5,
+ boolean value6,
+ boolean value7)
+ {
+ byte packedValue = value0 ? (byte) 1 : (byte) 0;
+
+ if (value1)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 1));
+ }
+
+ if (value2)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 2));
+ }
+
+ if (value3)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 3));
+ }
+
+ if (value4)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 4));
+ }
+
+ if (value5)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 5));
+ }
+
+ if (value6)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 6));
+ }
+
+ if (value7)
+ {
+ packedValue = (byte) (packedValue | (byte)(1 << 7));
+ }
+
+ buffer.put(packedValue);
+ }
+
+
+
+
/**
* This is used for writing longstrs.
*
@@ -619,7 +852,7 @@ public class EncodingUtils
buffer.put((byte) (aBoolean ? 1 : 0));
}
- public static Boolean readBoolean(ByteBuffer buffer)
+ public static boolean readBoolean(ByteBuffer buffer)
{
byte packedValue = buffer.get();
return (packedValue == 1);
@@ -636,7 +869,7 @@ public class EncodingUtils
buffer.put(aByte);
}
- public static Byte readByte(ByteBuffer buffer)
+ public static byte readByte(ByteBuffer buffer)
{
return buffer.get();
}
@@ -653,7 +886,7 @@ public class EncodingUtils
buffer.putShort(aShort);
}
- public static Short readShort(ByteBuffer buffer)
+ public static short readShort(ByteBuffer buffer)
{
return buffer.getShort();
}
@@ -669,7 +902,7 @@ public class EncodingUtils
buffer.putInt(aInteger);
}
- public static Integer readInteger(ByteBuffer buffer)
+ public static int readInteger(ByteBuffer buffer)
{
return buffer.getInt();
}
@@ -685,7 +918,7 @@ public class EncodingUtils
buffer.putLong(aLong);
}
- public static Long readLong(ByteBuffer buffer)
+ public static long readLong(ByteBuffer buffer)
{
return buffer.getLong();
}
@@ -701,7 +934,7 @@ public class EncodingUtils
buffer.putFloat(aFloat);
}
- public static Float readFloat(ByteBuffer buffer)
+ public static float readFloat(ByteBuffer buffer)
{
return buffer.getFloat();
}
@@ -718,7 +951,7 @@ public class EncodingUtils
buffer.putDouble(aDouble);
}
- public static Double readDouble(ByteBuffer buffer)
+ public static double readDouble(ByteBuffer buffer)
{
return buffer.getDouble();
}
@@ -780,48 +1013,6 @@ public class EncodingUtils
- public static void main(String[] args)
- {
- long[] nums = { 1000000000000000000L,
- 100000000000000000L,
- 10000000000000000L,
- 1000000000000000L,
- 100000000000000L,
- 10000000000000L,
- 1000000000000L,
- 100000000000L,
- 10000000000L,
- 1000000000L,
- 100000000L,
- 10000000L,
- 1000000L,
- 100000L,
- 10000L,
- 1000L,
- 100L,
- 10L,
- 1L,
- 0L,
- 787987932453564535L,
- 543289830889480230L,
- 3748104703875785L,
- 463402485702857L,
- 87402780489392L,
- 1190489015032L,
- 134303883744L
- };
-
-
-
-
- for(int i = 0; i < nums.length; i++)
- {
- ByteBuffer buffer = ByteBuffer.allocate(25);
- writeShortStringBytes(buffer, String.valueOf(nums[i]));
- buffer.flip();
- System.out.println(nums[i] + " : " + readLongAsShortString(buffer));
- }
- }
public static long readLongAsShortString(ByteBuffer buffer)
{
@@ -857,4 +1048,37 @@ public class EncodingUtils
return result;
}
+
+ public static long readUnsignedInteger(ByteBuffer buffer)
+ {
+ long l = 0xFF & buffer.get();
+ l <<=8;
+ l = l | (0xFF & buffer.get());
+ l <<=8;
+ l = l | (0xFF & buffer.get());
+ l <<=8;
+ l = l | (0xFF & buffer.get());
+
+ return l;
+ }
+
+
+ public static void main(String[] args)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ buf.setAutoExpand(true);
+
+ long l = (long) Integer.MAX_VALUE;
+ l += 1024L;
+
+ writeUnsignedInteger(buf, l);
+
+ buf.flip();
+
+ long l2 = readUnsignedInteger(buf);
+
+ System.out.println("before: " + l);
+ System.out.println("after: " + l2);
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index 7a160ef471..ca03f29047 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -27,6 +27,20 @@ public class HeartbeatBody extends AMQBody
public static final byte TYPE = 8;
public static AMQFrame FRAME = new HeartbeatBody().toFrame();
+ public HeartbeatBody()
+ {
+
+ }
+
+ public HeartbeatBody(ByteBuffer buffer, long size)
+ {
+ if(size > 0)
+ {
+ //allow other implementations to have a payload, but ignore it:
+ buffer.skip((int) size);
+ }
+ }
+
protected byte getFrameType()
{
return TYPE;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
index 97bd3d9253..c7ada708dc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
@@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer;
public class HeartbeatBodyFactory implements BodyFactory
{
- public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
return new HeartbeatBody();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
new file mode 100644
index 0000000000..174cb142e0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
@@ -0,0 +1,77 @@
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ private ByteBuffer _encodedBlock;
+
+ private AMQDataBlock _block;
+
+ public SmallCompositeAMQDataBlock(AMQDataBlock block)
+ {
+ _block = block;
+ }
+
+ /**
+ * The encoded block will be logically first before the AMQDataBlocks which are encoded
+ * into the buffer afterwards.
+ * @param encodedBlock already-encoded data
+ * @param block a block to be encoded.
+ */
+ public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block)
+ {
+ this(block);
+ _encodedBlock = encodedBlock;
+ }
+
+ public AMQDataBlock getBlock()
+ {
+ return _block;
+ }
+
+ public ByteBuffer getEncodedBlock()
+ {
+ return _encodedBlock;
+ }
+
+ public long getSize()
+ {
+ long frameSize = _block.getSize();
+
+ if (_encodedBlock != null)
+ {
+ _encodedBlock.rewind();
+ frameSize += _encodedBlock.remaining();
+ }
+ return frameSize;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if (_encodedBlock != null)
+ {
+ buffer.put(_encodedBlock);
+ }
+ _block.writePayload(buffer);
+
+ }
+
+ public String toString()
+ {
+ if (_block == null)
+ {
+ return "No blocks contained in composite frame";
+ }
+ else
+ {
+ StringBuilder buf = new StringBuilder(this.getClass().getName());
+ buf.append("{encodedBlock=").append(_encodedBlock);
+
+ buf.append(" _block=[").append(_block.toString()).append("]");
+
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 0eb43bdf5f..9d3c588fc8 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -105,7 +105,12 @@ public class TxAckTest extends TestCase
long deliveryTag = i + 1;
// TODO: fix hardcoded protocol version data
TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
- (byte)0), txnContext);
+ (byte)0,
+ null,
+ false,
+ false,
+ null,
+ 0), txnContext);
_map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
}
_acked = acked;
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 52afecdb6a..ea576a5661 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -153,8 +153,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0);
- request.routingKey = new AMQShortString(id);
+ BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,null,false,false,new AMQShortString(id),0);
+
return request;
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index d4a8f6c7f9..91a26632a1 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -163,8 +163,14 @@ public class AMQQueueMBeanTest extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
- publish.immediate = immediate;
+ BasicPublishBody publish = new BasicPublishBody((byte)8,
+ (byte)0,
+ null,
+ immediate,
+ false,
+ null,
+ 0);
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = 1000; // in bytes
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 222b2c696a..d10d5acdd0 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -96,9 +96,13 @@ public class AckTest extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0);
- publishBody.routingKey = new AMQShortString("rk");
- publishBody.exchange = new AMQShortString("someExchange");
+ BasicPublishBody publishBody = new BasicPublishBody((byte)8,
+ (byte)0,
+ new AMQShortString("someExchange"),
+ false,
+ false,
+ new AMQShortString("rk"),
+ 0);
AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
if (persistent)
{
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
index da4627411d..6c48bb2bf4 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -61,8 +61,14 @@ class MessageTestHelper extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
- publish.immediate = immediate;
+ BasicPublishBody publish = new BasicPublishBody((byte)8,
+ (byte)0,
+ null,
+ immediate,
+ false,
+ null,
+ 0);
+
return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
new ContentHeaderBody());
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index b874ca9594..e2500d9865 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -52,7 +52,12 @@ public class TestReferenceCounting extends TestCase
createPersistentContentHeader();
// TODO: fix hardcoded protocol version data
AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0),
+ (byte)0,
+ null,
+ false,
+ false,
+ null,
+ 0),
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
@@ -76,7 +81,12 @@ public class TestReferenceCounting extends TestCase
{
// TODO: fix hardcoded protocol version data
AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0),
+ (byte)0,
+ null,
+ false,
+ false,
+ null,
+ 0),
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();