diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-02-20 16:20:41 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-02-20 16:20:41 +0000 |
commit | c46d62e6834205408502d89d99f73e47f3ca2eb8 (patch) | |
tree | a0cbe2baaaae2d6eab79e0a4491c7299a9bfedc8 /java/common | |
parent | bc6a142a055071e5b7025cd1022485f26a0011f2 (diff) | |
download | qpid-python-c46d62e6834205408502d89d99f73e47f3ca2eb8.tar.gz |
QPID-325 : Persist durable exchange information in the store
QPID-318 : Remove hardcoding of version numbers (as applies to store)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
10 files changed, 317 insertions, 5 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index c35fc0a6c4..be38695384 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -96,6 +96,8 @@ public class ContentBody extends AMQBody } } + + public static AMQFrame createAMQFrame(int channelId, ContentBody body) { final AMQFrame frame = new AMQFrame(channelId, body); diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 4e3768e4d4..f94cd4934c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -641,8 +641,7 @@ public class EncodingUtils public static void writeTimestamp(ByteBuffer buffer, long timestamp) { - writeUnsignedInteger(buffer, 0/*timestamp msb*/); - writeUnsignedInteger(buffer, timestamp); + writeLong(buffer, timestamp); } public static boolean[] readBooleans(ByteBuffer buffer) @@ -765,8 +764,8 @@ public class EncodingUtils public static long readTimestamp(ByteBuffer buffer) { // Discard msb from AMQ timestamp - buffer.getUnsignedInt(); - return buffer.getUnsignedInt(); + //buffer.getUnsignedInt(); + return buffer.getLong(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java new file mode 100644 index 0000000000..dd93cc97fa --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java @@ -0,0 +1,104 @@ +package org.apache.qpid.framing;
+
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_8_0()
+ {
+ super((byte)8,(byte)0);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion());
+ _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion());
+
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody body = (BasicPublishBody) methodBody;
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return body.getExchange();
+ }
+
+ public boolean isImmediate()
+ {
+ return body.getImmediate();
+ }
+
+ public boolean isMandatory()
+ {
+ return body.getMandatory();
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return body.getRoutingKey();
+ }
+ };
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBody(getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ _basicPublishClassId,
+ _basicPublishMethodId,
+ info.getExchange(),
+ info.isImmediate(),
+ info.isMandatory(),
+ info.getRoutingKey(),
+ 0) ; // ticket
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index 1df62c7b1b..ec371453aa 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -20,6 +20,8 @@ */
package org.apache.qpid.framing;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -36,10 +38,53 @@ public class VersionSpecificRegistry private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+ private ProtocolVersionMethodConverter _protocolVersionConverter;
+
public VersionSpecificRegistry(byte major, byte minor)
{
_protocolMajorVersion = major;
_protocolMinorVersion = minor;
+
+ _protocolVersionConverter = loadProtocolVersionConverters(major, minor);
+ }
+
+ private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, byte protocolMinorVersion)
+ {
+ try
+ {
+ Class<ProtocolVersionMethodConverter> versionMethodConverterClass =
+ (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_"+protocolMajorVersion + "_" + protocolMinorVersion);
+ return versionMethodConverterClass.newInstance();
+
+ }
+ catch (ClassNotFoundException e)
+ {
+ _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion);
+ if(protocolMinorVersion != 0)
+ {
+ protocolMinorVersion--;
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else if (protocolMajorVersion != 0)
+ {
+ protocolMajorVersion--;
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else
+ {
+ return null;
+ }
+
+
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
}
public byte getProtocolMajorVersion()
@@ -138,4 +183,14 @@ public class VersionSpecificRegistry }
+
+ public ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolVersionConverter;
+ }
+
+ public void configure()
+ {
+ _protocolVersionConverter.configure();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java new file mode 100644 index 0000000000..5490d482a1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java @@ -0,0 +1,26 @@ +package org.apache.qpid.framing.abstraction;
+
+public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private final byte _protocolMajorVersion;
+
+
+ private final byte _protocolMinorVersion;
+
+ public AbstractMethodConverter(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+
+ public final byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public final byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java new file mode 100644 index 0000000000..6312e478a8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java @@ -0,0 +1,32 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.mina.common.ByteBuffer;
+
+public interface ContentChunk
+{
+ int getSize();
+ ByteBuffer getData();
+
+ void reduceToFit();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java new file mode 100644 index 0000000000..706499c1b0 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java @@ -0,0 +1,36 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public interface MessagePublishInfo
+{
+
+ public AMQShortString getExchange();
+
+ public boolean isImmediate();
+
+ public boolean isMandatory();
+
+ public AMQShortString getRoutingKey();
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java new file mode 100644 index 0000000000..c9e15f18e3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java @@ -0,0 +1,29 @@ +/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+
+public interface MessagePublishInfoConverter
+{
+ public MessagePublishInfo convertToInfo(AMQMethodBody body);
+ public AMQMethodBody convertToBody(MessagePublishInfo info);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java new file mode 100644 index 0000000000..52e82cdf07 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -0,0 +1,29 @@ +/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQBody;
+
+public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+{
+ AMQBody convertToBody(ContentChunk contentBody);
+ ContentChunk convertToContentChunk(AMQBody body);
+
+ void configure();
+}
diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java index ffbdf730a9..0f706ac553 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -154,7 +154,7 @@ public class BasicContentHeaderPropertiesTest extends TestCase public void testSetGetTimestamp() { - long timestamp = 999999999; + long timestamp = System.currentTimeMillis(); _testProperties.setTimestamp(timestamp); assertEquals(timestamp, _testProperties.getTimestamp()); } |