summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-26 20:37:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-26 20:37:36 +0000
commit73edf329801752fba83376c02fd473b184611a8b (patch)
tree858271ca2918883bb194445c2477314318f5bcec
parent2a9e3ffb12f3b621a2d8d3b4c84627cc0613b9cc (diff)
downloadqpid-python-73edf329801752fba83376c02fd473b184611a8b.tar.gz
Added AMQP 0-9-1 support
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@829944 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java60
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java168
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java383
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rwxr-xr-xqpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java158
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java8
-rw-r--r--qpid/java/common/build.xml4
-rw-r--r--qpid/java/common/protocol-version.xml4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java162
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java23
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java174
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java37
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java132
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java169
-rw-r--r--qpid/specs/amqp0-9-1.stripped.xml477
23 files changed, 1702 insertions, 398 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
index e64eaeae76..d587ef0c16 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
@@ -1,31 +1,34 @@
package org.apache.qpid.server.handler;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
/**
* @author Apache Software Foundation
@@ -54,7 +57,20 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ
// We don't implement access control class, but to keep clients happy that expect it
// always use the "0" ticket.
- AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+ AccessRequestOkBody response;
+ if(methodRegistry instanceof MethodRegistry_0_9)
+ {
+ response = ((MethodRegistry_0_9)methodRegistry).createAccessRequestOkBody(0);
+ }
+ else if(methodRegistry instanceof MethodRegistry_8_0)
+ {
+ response = ((MethodRegistry_8_0)methodRegistry).createAccessRequestOkBody(0);
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
+ }
+
session.writeFrame(response.generateFrame(channelId));
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
index 2c264c3d45..f9feada6fe 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
@@ -1,25 +1,25 @@
package org.apache.qpid.server.handler;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.log4j.Logger;
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.BasicRecoverBody;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -70,6 +71,13 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B
session.writeFrame(recoverOk.generateFrame(channelId));
}
+ else if(session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+ {
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) session.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ session.writeFrame(recoverOk.generateFrame(channelId));
+
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 9918013888..f5c2b93f46 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQConstant;
@@ -102,6 +103,30 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
response = methodRegistry.createChannelOpenOkBody(channelName);
}
+ else if(pv.equals(ProtocolVersion.v0_91))
+ {
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
+ UUID uuid = UUID.randomUUID();
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(output);
+ try
+ {
+ dataOut.writeLong(uuid.getMostSignificantBits());
+ dataOut.writeLong(uuid.getLeastSignificantBits());
+ dataOut.flush();
+ dataOut.close();
+ }
+ catch (IOException e)
+ {
+ // This *really* shouldn't happen as we're not doing any I/O
+ throw new RuntimeException("I/O exception when writing to byte array", e);
+ }
+
+ // should really associate this channelId to the session
+ byte[] channelName = output.toByteArray();
+
+ response = methodRegistry.createChannelOpenOkBody(channelName);
+ }
else
{
throw new AMQException(AMQConstant.INTERNAL_ERROR, "Got channel open for protocol version not catered for: " + pv, null);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index 31bafac0a2..b5194084f2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -22,6 +22,9 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQState;
@@ -86,10 +89,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
if (session.getContextKey() == null)
{
session.setContextKey(generateClientID());
- }
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
stateManager.changeState(AMQState.CONNECTION_OPEN);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
index 9475b83c8f..e290afcde3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
@@ -59,6 +59,14 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return new ServerMethodDispatcherImpl_0_9(stateManager);
}
});
+ _dispatcherFactories.put(ProtocolVersion.v0_91,
+ new DispatcherFactory()
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ServerMethodDispatcherImpl_0_91(stateManager);
+ }
+ });
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java
new file mode 100644
index 0000000000..32cd4c4e9f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+
+public class ServerMethodDispatcherImpl_0_91
+ extends ServerMethodDispatcherImpl
+ implements MethodDispatcher_0_91
+
+{
+
+ private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
+ BasicRecoverSyncMethodHandler.getInstance();
+ private static final QueueUnbindHandler _queueUnbindHandler =
+ QueueUnbindHandler.getInstance();
+
+
+ public ServerMethodDispatcherImpl_0_91(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ return true;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
index 36e7e88fd6..3a94160e22 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
@@ -44,6 +44,7 @@ public class ProtocolOutputConverterRegistry
{
register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
+ register(ProtocolVersion.v0_91, org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory());
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
new file mode 100644
index 0000000000..cffbe445ee
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
@@ -0,0 +1,383 @@
+package org.apache.qpid.server.output.amqp0_9_1;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
+ private static final ProtocolVersionMethodConverter
+ PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
+
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
+ {
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+
+ int writtenSize = 0;
+
+
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
+
+ while(writtenSize < bodySize)
+ {
+ buf = java.nio.ByteBuffer.allocate(capacity);
+
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ }
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
+
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
+
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
+
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
+ }
+
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ BasicGetOkBody getOkBody =
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
+
+ return getOkBody;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ BasicReturnBody basicReturnBody =
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
+
+
+ return basicReturnBody;
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+ writeMessageDelivery(message, header, channelId, returnFrame);
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index b57c834598..461f4d01c4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -308,7 +308,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
- private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
+ private ProtocolVersion _protocolVersion = ProtocolVersion.v0_91; // FIXME TGM, shouldn't need this
protected AMQConnectionDelegate _delegate;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
new file mode 100755
index 0000000000..1bb93f66a3
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+
+public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0
+{
+
+ public AMQConnectionDelegate_9_1(AMQConnection conn)
+ {
+ super(conn);
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index bc1453beaf..862e23385a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -36,6 +36,7 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
@@ -186,6 +187,11 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
}
+ else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
+ {
+ BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
+ _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ }
else
{
throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index 0e3333940a..909ac8aae8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -79,6 +79,16 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
}
});
+
+ _dispatcherFactories.put(ProtocolVersion.v0_91,
+ new DispatcherFactory()
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
+ {
+ return new ClientMethodDispatcherImpl_0_91(session);
+ }
+ });
+
}
public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java
new file mode 100644
index 0000000000..f15340ae00
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+
+public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_91
+{
+ public ClientMethodDispatcherImpl_0_91(AMQProtocolSession session)
+ {
+ super(session);
+ }
+
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 6500a82818..65d3fa9a76 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.qpid.AMQConnectionClosedException;
@@ -124,6 +125,8 @@ public class AMQProtocolHandler implements ProtocolEngine
private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
+ private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
+
/**
* The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
* instances and protocol handler instances.
@@ -736,7 +739,10 @@ public class AMQProtocolHandler implements ProtocolEngine
{
if (_failoverLatch != null)
{
- _failoverLatch.await();
+ if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
+ {
+
+ }
}
}
}
diff --git a/qpid/java/common/build.xml b/qpid/java/common/build.xml
index 52e4506c91..bb31061322 100644
--- a/qpid/java/common/build.xml
+++ b/qpid/java/common/build.xml
@@ -28,8 +28,8 @@
<property name="generated.package" value="org/apache/qpid/framing" />
<property name="generated.dir" location="${module.precompiled}/${generated.package}" />
<property name="xml.spec.dir" location="${project.root}/../specs" />
- <property name="xml.spec.deps" value="amqp.0-8.xml amqp.0-9.xml" />
- <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/amqp.0-9.xml" />
+ <property name="xml.spec.deps" value="amqp.0-8.xml amqp.0-9.xml amqp0-9-1.stripped.xml" />
+ <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/amqp.0-9.xml ${xml.spec.dir}/amqp0-9-1.stripped.xml" />
<property name="gentools.timestamp" location="${generated.dir}/gentools.timestamp" />
<property name="jython.timestamp" location="${generated.dir}/jython.timestamp" />
diff --git a/qpid/java/common/protocol-version.xml b/qpid/java/common/protocol-version.xml
index ee9db6049b..5435a0a582 100644
--- a/qpid/java/common/protocol-version.xml
+++ b/qpid/java/common/protocol-version.xml
@@ -27,8 +27,8 @@
<property name="generated.dir" location="${generated.path}/${generated.package}" />
<property name="generated.timestamp" location="${generated.dir}/timestamp" />
<property name="xml.spec.dir" location="${topDirectoryLocation}/../specs" />
- <property name="xml.spec.deps" value="amqp.0-8.xml amqp.0-9.xml" />
- <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/amqp.0-9.xml" />
+ <property name="xml.spec.deps" value="amqp.0-8.xml amqp.0-9.xml amqp0-9-1.stripped.xml" />
+ <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/amqp.0-9.xml ${xml.spec.dir}/amqp0-9-1.stripped.xml" />
<property name="template.dir" value="${topDirectoryLocation}/common/templates" />
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index ad7f36f790..cd3d721065 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -93,4 +93,166 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
session.methodFrameReceived(channelId, this);
}
+ public int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+
+ protected byte readByte(ByteBuffer buffer)
+ {
+ return buffer.get();
+ }
+
+ protected AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return EncodingUtils.readAMQShortString(buffer);
+ }
+
+ protected int getSizeOf(AMQShortString string)
+ {
+ return EncodingUtils.encodedShortStringLength(string);
+ }
+
+ protected void writeByte(ByteBuffer buffer, byte b)
+ {
+ buffer.put(b);
+ }
+
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, string);
+ }
+
+ protected int readInt(ByteBuffer buffer)
+ {
+ return buffer.getInt();
+ }
+
+ protected void writeInt(ByteBuffer buffer, int i)
+ {
+ buffer.putInt(i);
+ }
+
+ protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ return EncodingUtils.readFieldTable(buffer);
+ }
+
+ protected int getSizeOf(FieldTable table)
+ {
+ return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, table);
+ }
+
+ protected long readLong(ByteBuffer buffer)
+ {
+ return buffer.getLong();
+ }
+
+ protected void writeLong(ByteBuffer buffer, long l)
+ {
+ buffer.putLong(l);
+ }
+
+ protected int getSizeOf(byte[] response)
+ {
+ return (response == null) ? 4 : response.length + 4;
+ }
+
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
+ {
+ EncodingUtils.writeBytes(buffer,data);
+ }
+
+ protected byte[] readBytes(ByteBuffer buffer)
+ {
+ return EncodingUtils.readBytes(buffer);
+ }
+
+ protected short readShort(ByteBuffer buffer)
+ {
+ return EncodingUtils.readShort(buffer);
+ }
+
+ protected void writeShort(ByteBuffer buffer, short s)
+ {
+ EncodingUtils.writeShort(buffer, s);
+ }
+
+ protected Content readContent(ByteBuffer buffer)
+ {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int getSizeOf(Content body)
+ {
+ return 0; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeContent(ByteBuffer buffer, Content body)
+ {
+ //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected byte readBitfield(ByteBuffer buffer)
+ {
+ return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int readUnsignedShort(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+ {
+ buffer.put(bitfield0);
+ }
+
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, s);
+ }
+
+ protected long readUnsignedInteger(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedInt();
+ }
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, i);
+ }
+
+
+ protected short readUnsignedByte(ByteBuffer buffer)
+ {
+ return buffer.getUnsigned();
+ }
+
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+ {
+ EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
+ }
+
+ protected long readTimestamp(ByteBuffer buffer)
+ {
+ return EncodingUtils.readTimestamp(buffer);
+ }
+
+ protected void writeTimestamp(ByteBuffer buffer, long t)
+ {
+ EncodingUtils.writeTimestamp(buffer, t);
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index cf8a866e47..d503ed7ea3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -54,7 +54,10 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
public ProtocolInitiation(ProtocolVersion pv)
{
- this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+ this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS,
+ pv.equals(ProtocolVersion.v0_91) ? 0 : TCP_PROTOCOL_INSTANCE,
+ pv.equals(ProtocolVersion.v0_91) ? 9 : pv.getMajorVersion(),
+ pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
}
public ProtocolInitiation(ByteBuffer in)
@@ -124,7 +127,6 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
{
/**
*
- * @param session the session
* @param in input buffer
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
@@ -162,13 +164,24 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
_protocolClass, null);
}
- if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
+
+ ProtocolVersion pv;
+
+ // Hack for 0-9-1 which changed how the header was defined
+ if(_protocolInstance == 0 && _protocolMajor == 9 && _protocolMinor == 1)
+ {
+ pv = ProtocolVersion.v0_91;
+
+ }
+ else if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
{
throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
_protocolInstance, null);
}
-
- ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+ else
+ {
+ pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+ }
if (!pv.isSupported())
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java
index 948f5baaf6..8d51343507 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java
@@ -21,14 +21,6 @@
package org.apache.qpid.framing.amqp_0_9;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.Content;
-
-import org.apache.mina.common.ByteBuffer;
-
public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMethodBodyImpl
{
@@ -40,170 +32,6 @@ public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMetho
public byte getMinor()
{
return 9;
- }
-
- public int getSize()
- {
- return 2 + 2 + getBodySize();
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- EncodingUtils.writeUnsignedShort(buffer, getClazz());
- EncodingUtils.writeUnsignedShort(buffer, getMethod());
- writeMethodPayload(buffer);
- }
-
-
- protected byte readByte(ByteBuffer buffer)
- {
- return buffer.get();
- }
-
- protected AMQShortString readAMQShortString(ByteBuffer buffer)
- {
- return EncodingUtils.readAMQShortString(buffer);
- }
-
- protected int getSizeOf(AMQShortString string)
- {
- return EncodingUtils.encodedShortStringLength(string);
- }
-
- protected void writeByte(ByteBuffer buffer, byte b)
- {
- buffer.put(b);
- }
-
- protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
- {
- EncodingUtils.writeShortStringBytes(buffer, string);
- }
-
- protected int readInt(ByteBuffer buffer)
- {
- return buffer.getInt();
- }
-
- protected void writeInt(ByteBuffer buffer, int i)
- {
- buffer.putInt(i);
- }
-
- protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
- {
- return EncodingUtils.readFieldTable(buffer);
- }
-
- protected int getSizeOf(FieldTable table)
- {
- return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
- {
- EncodingUtils.writeFieldTableBytes(buffer, table);
- }
-
- protected long readLong(ByteBuffer buffer)
- {
- return buffer.getLong();
- }
-
- protected void writeLong(ByteBuffer buffer, long l)
- {
- buffer.putLong(l);
- }
-
- protected int getSizeOf(byte[] response)
- {
- return (response == null) ? 4 :response.length + 4;
- }
-
- protected void writeBytes(ByteBuffer buffer, byte[] data)
- {
- EncodingUtils.writeBytes(buffer,data);
- }
-
- protected byte[] readBytes(ByteBuffer buffer)
- {
- return EncodingUtils.readBytes(buffer);
- }
-
- protected short readShort(ByteBuffer buffer)
- {
- return EncodingUtils.readShort(buffer);
- }
-
- protected void writeShort(ByteBuffer buffer, short s)
- {
- EncodingUtils.writeShort(buffer, s);
- }
-
- protected Content readContent(ByteBuffer buffer)
- {
- return null; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int getSizeOf(Content body)
- {
- return 0; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeContent(ByteBuffer buffer, Content body)
- {
- //To change body of created methods use File | Settings | File Templates.
- }
-
- protected byte readBitfield(ByteBuffer buffer)
- {
- return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int readUnsignedShort(ByteBuffer buffer)
- {
- return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
- {
- buffer.put(bitfield0);
- }
-
- protected void writeUnsignedShort(ByteBuffer buffer, int s)
- {
- EncodingUtils.writeUnsignedShort(buffer, s);
- }
-
- protected long readUnsignedInteger(ByteBuffer buffer)
- {
- return buffer.getUnsignedInt();
- }
- protected void writeUnsignedInteger(ByteBuffer buffer, long i)
- {
- EncodingUtils.writeUnsignedInteger(buffer, i);
- }
-
-
- protected short readUnsignedByte(ByteBuffer buffer)
- {
- return buffer.getUnsigned();
- }
-
- protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
- {
- EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
- }
-
- protected long readTimestamp(ByteBuffer buffer)
- {
- return EncodingUtils.readTimestamp(buffer);
- }
-
- protected void writeTimestamp(ByteBuffer buffer, long t)
- {
- EncodingUtils.writeTimestamp(buffer, t);
- }
-
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java
new file mode 100644
index 0000000000..60b8a7e1a6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_0_91;
+
+public abstract class AMQMethodBody_0_91 extends org.apache.qpid.framing.AMQMethodBodyImpl
+{
+
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ public byte getMinor()
+ {
+ return 91;
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
new file mode 100644
index 0000000000..6e330574bc
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.amqp_0_91;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.*;
+
+public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_0_91()
+ {
+ super((byte)0,(byte)9);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ if(contentChunk instanceof ContentChunk_0_9)
+ {
+ return ((ContentChunk_0_9)contentChunk).toBody();
+ }
+ else
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk_0_9(contentBodyChunk);
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
+ _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
+
+ }
+
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ {
+ return new ContentBody(ByteBuffer.wrap(buf));
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody publishBody = ((BasicPublishBody) methodBody);
+
+ final AMQShortString exchange = publishBody.getExchange();
+ final AMQShortString routingKey = publishBody.getRoutingKey();
+
+ return new MessagePublishInfoImpl(exchange,
+ publishBody.getImmediate(),
+ publishBody.getMandatory(),
+ routingKey);
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBodyImpl(0,
+ info.getExchange(),
+ info.getRoutingKey(),
+ info.isMandatory(),
+ info.isImmediate()) ;
+
+ }
+
+ private static class ContentChunk_0_9 implements ContentChunk
+ {
+ private final ContentBody _contentBodyChunk;
+
+ public ContentChunk_0_9(final ContentBody contentBodyChunk)
+ {
+ _contentBodyChunk = contentBodyChunk;
+ }
+
+ public int getSize()
+ {
+ return _contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return _contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ _contentBodyChunk.reduceBufferToFit();
+ }
+
+ public AMQBody toBody()
+ {
+ return _contentBodyChunk;
+ }
+ }
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
index e9b4447140..35645854c0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
@@ -21,14 +21,6 @@
package org.apache.qpid.framing.amqp_8_0;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.Content;
-
-import org.apache.mina.common.ByteBuffer;
-
public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl
{
@@ -42,168 +34,7 @@ public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMetho
return 0;
}
- public int getSize()
- {
- return 2 + 2 + getBodySize();
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- EncodingUtils.writeUnsignedShort(buffer, getClazz());
- EncodingUtils.writeUnsignedShort(buffer, getMethod());
- writeMethodPayload(buffer);
- }
-
- protected byte readByte(ByteBuffer buffer)
- {
- return buffer.get();
- }
-
- protected AMQShortString readAMQShortString(ByteBuffer buffer)
- {
- return EncodingUtils.readAMQShortString(buffer);
- }
-
- protected int getSizeOf(AMQShortString string)
- {
- return EncodingUtils.encodedShortStringLength(string);
- }
-
- protected void writeByte(ByteBuffer buffer, byte b)
- {
- buffer.put(b);
- }
-
- protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
- {
- EncodingUtils.writeShortStringBytes(buffer, string);
- }
-
- protected int readInt(ByteBuffer buffer)
- {
- return buffer.getInt();
- }
-
- protected void writeInt(ByteBuffer buffer, int i)
- {
- buffer.putInt(i);
- }
-
- protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
- {
- return EncodingUtils.readFieldTable(buffer);
- }
-
- protected int getSizeOf(FieldTable table)
- {
- return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
- {
- EncodingUtils.writeFieldTableBytes(buffer, table);
- }
-
- protected long readLong(ByteBuffer buffer)
- {
- return buffer.getLong();
- }
-
- protected void writeLong(ByteBuffer buffer, long l)
- {
- buffer.putLong(l);
- }
-
- protected int getSizeOf(byte[] response)
- {
- return (response == null) ? 4 : response.length + 4;
- }
-
- protected void writeBytes(ByteBuffer buffer, byte[] data)
- {
- EncodingUtils.writeBytes(buffer,data);
- }
-
- protected byte[] readBytes(ByteBuffer buffer)
- {
- return EncodingUtils.readBytes(buffer);
- }
-
- protected short readShort(ByteBuffer buffer)
- {
- return EncodingUtils.readShort(buffer);
- }
-
- protected void writeShort(ByteBuffer buffer, short s)
- {
- EncodingUtils.writeShort(buffer, s);
- }
-
- protected Content readContent(ByteBuffer buffer)
- {
- return null; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int getSizeOf(Content body)
- {
- return 0; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeContent(ByteBuffer buffer, Content body)
- {
- //To change body of created methods use File | Settings | File Templates.
- }
-
- protected byte readBitfield(ByteBuffer buffer)
- {
- return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int readUnsignedShort(ByteBuffer buffer)
- {
- return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
- {
- buffer.put(bitfield0);
- }
-
- protected void writeUnsignedShort(ByteBuffer buffer, int s)
- {
- EncodingUtils.writeUnsignedShort(buffer, s);
- }
-
- protected long readUnsignedInteger(ByteBuffer buffer)
- {
- return buffer.getUnsignedInt();
- }
- protected void writeUnsignedInteger(ByteBuffer buffer, long i)
- {
- EncodingUtils.writeUnsignedInteger(buffer, i);
- }
-
-
- protected short readUnsignedByte(ByteBuffer buffer)
- {
- return buffer.getUnsigned();
- }
-
- protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
- {
- EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
- }
-
- protected long readTimestamp(ByteBuffer buffer)
- {
- return EncodingUtils.readTimestamp(buffer);
- }
-
- protected void writeTimestamp(ByteBuffer buffer, long t)
- {
- EncodingUtils.writeTimestamp(buffer, t);
- }
}
diff --git a/qpid/specs/amqp0-9-1.stripped.xml b/qpid/specs/amqp0-9-1.stripped.xml
new file mode 100644
index 0000000000..ec55c8dd7a
--- /dev/null
+++ b/qpid/specs/amqp0-9-1.stripped.xml
@@ -0,0 +1,477 @@
+<?xml version="1.0"?>
+<!--
+Copyright (c) 2009 AMQP Working Group.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+3. The name of the author may not be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+-->
+<amqp major="0" minor="91" port="5672">
+ <constant name="frame-method" value="1"/>
+ <constant name="frame-header" value="2"/>
+ <constant name="frame-body" value="3"/>
+ <constant name="frame-heartbeat" value="8"/>
+ <constant name="frame-min-size" value="4096"/>
+ <constant name="frame-end" value="206"/>
+ <constant name="reply-success" value="200"/>
+ <constant name="content-too-large" value="311" class="soft-error"/>
+ <constant name="no-consumers" value="313" class="soft-error"/>
+ <constant name="connection-forced" value="320" class="hard-error"/>
+ <constant name="invalid-path" value="402" class="hard-error"/>
+ <constant name="access-refused" value="403" class="soft-error"/>
+ <constant name="not-found" value="404" class="soft-error"/>
+ <constant name="resource-locked" value="405" class="soft-error"/>
+ <constant name="precondition-failed" value="406" class="soft-error"/>
+ <constant name="frame-error" value="501" class="hard-error"/>
+ <constant name="syntax-error" value="502" class="hard-error"/>
+ <constant name="command-invalid" value="503" class="hard-error"/>
+ <constant name="channel-error" value="504" class="hard-error"/>
+ <constant name="unexpected-frame" value="505" class="hard-error"/>
+ <constant name="resource-error" value="506" class="hard-error"/>
+ <constant name="not-allowed" value="530" class="hard-error"/>
+ <constant name="not-implemented" value="540" class="hard-error"/>
+ <constant name="internal-error" value="541" class="hard-error"/>
+ <domain name="class-id" type="short"/>
+ <domain name="consumer-tag" type="shortstr"/>
+ <domain name="delivery-tag" type="longlong"/>
+ <domain name="exchange-name" type="shortstr">
+ <assert check="length" value="127"/>
+ <assert check="regexp" value="^[a-zA-Z0-9-_.:]*$"/>
+ </domain>
+ <domain name="method-id" type="short"/>
+ <domain name="no-ack" type="bit"/>
+ <domain name="no-local" type="bit"/>
+ <domain name="nowait" type="bit"/>
+ <!-- Qpid: restore these so that the generation sees the methods aas the same -->
+ <domain name="known-hosts" type="shortstr"/>
+ <domain name="access-ticket" type="short"/>
+ <!-- end Qpid specific -->
+ <domain name="path" type="shortstr">
+ <assert check="notnull"/>
+ <assert check="length" value="127"/>
+ </domain>
+ <domain name="peer-properties" type="table"/>
+ <domain name="queue-name" type="shortstr">
+ <assert check="length" value="127"/>
+ <assert check="regexp" value="^[a-zA-Z0-9-_.:]*$"/>
+ </domain>
+ <domain name="redelivered" type="bit"/>
+ <domain name="message-count" type="long"/>
+ <domain name="reply-code" type="short">
+ <assert check="notnull"/>
+ </domain>
+ <domain name="reply-text" type="shortstr">
+ <assert check="notnull"/>
+ </domain>
+ <domain name="bit" type="bit"/>
+ <domain name="octet" type="octet"/>
+ <domain name="short" type="short"/>
+ <domain name="long" type="long"/>
+ <domain name="longlong" type="longlong"/>
+ <domain name="shortstr" type="shortstr"/>
+ <domain name="longstr" type="longstr"/>
+ <domain name="timestamp" type="timestamp"/>
+ <domain name="table" type="table"/>
+ <class name="connection" handler="connection" index="10">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+ <method name="start" synchronous="1" index="10">
+ <chassis name="client" implement="MUST"/>
+ <response name="start-ok"/>
+ <field name="version-major" domain="octet"/>
+ <field name="version-minor" domain="octet"/>
+ <field name="server-properties" domain="peer-properties"/>
+ <field name="mechanisms" domain="longstr">
+ <assert check="notnull"/>
+ </field>
+ <field name="locales" domain="longstr">
+ <assert check="notnull"/>
+ </field>
+ </method>
+ <method name="start-ok" synchronous="1" index="11">
+ <chassis name="server" implement="MUST"/>
+ <field name="client-properties" domain="peer-properties"/>
+ <field name="mechanism" domain="shortstr">
+ <assert check="notnull"/>
+ </field>
+ <field name="response" domain="longstr">
+ <assert check="notnull"/>
+ </field>
+ <field name="locale" domain="shortstr">
+ <assert check="notnull"/>
+ </field>
+ </method>
+ <method name="secure" synchronous="1" index="20">
+ <chassis name="client" implement="MUST"/>
+ <response name="secure-ok"/>
+ <field name="challenge" domain="longstr"/>
+ </method>
+ <method name="secure-ok" synchronous="1" index="21">
+ <chassis name="server" implement="MUST"/>
+ <field name="response" domain="longstr">
+ <assert check="notnull"/>
+ </field>
+ </method>
+ <method name="tune" synchronous="1" index="30">
+ <chassis name="client" implement="MUST"/>
+ <response name="tune-ok"/>
+ <field name="channel-max" domain="short"/>
+ <field name="frame-max" domain="long"/>
+ <field name="heartbeat" domain="short"/>
+ </method>
+ <method name="tune-ok" synchronous="1" index="31">
+ <chassis name="server" implement="MUST"/>
+ <field name="channel-max" domain="short">
+ <assert check="notnull"/>
+ <assert check="le" method="tune" field="channel-max"/>
+ </field>
+ <field name="frame-max" domain="long"/>
+ <field name="heartbeat" domain="short"/>
+ </method>
+ <method name="open" synchronous="1" index="40">
+ <chassis name="server" implement="MUST"/>
+ <response name="open-ok"/>
+ <field name="virtual-host" domain="path"/>
+ <field name="capabilities" type="shortstr" reserved="1"/>
+ <field name="insist" type="bit" reserved="1"/>
+ </method>
+ <method name="open-ok" synchronous="1" index="41">
+ <chassis name="client" implement="MUST"/>
+ <field name="known-hosts" domain="known-hosts" reserved="1"/>
+ </method>
+ <method name="close" synchronous="1" index="50">
+ <chassis name="client" implement="MUST"/>
+ <chassis name="server" implement="MUST"/>
+ <response name="close-ok"/>
+ <field name="reply-code" domain="reply-code"/>
+ <field name="reply-text" domain="reply-text"/>
+ <field name="class-id" domain="class-id"/>
+ <field name="method-id" domain="method-id"/>
+ </method>
+ <method name="close-ok" synchronous="1" index="51">
+ <chassis name="client" implement="MUST"/>
+ <chassis name="server" implement="MUST"/>
+ </method>
+ </class>
+ <class name="channel" handler="channel" index="20">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+ <method name="open" synchronous="1" index="10">
+ <chassis name="server" implement="MUST"/>
+ <response name="open-ok"/>
+ <field name="out-of-band" type="shortstr" reserved="1"/>
+ </method>
+ <method name="open-ok" synchronous="1" index="11">
+ <chassis name="client" implement="MUST"/>
+ <field name="channel-id" type="longstr" reserved="1"/>
+ </method>
+ <method name="flow" synchronous="1" index="20">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+ <response name="flow-ok"/>
+ <field name="active" domain="bit"/>
+ </method>
+ <method name="flow-ok" index="21">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+ <field name="active" domain="bit"/>
+ </method>
+ <method name="close" synchronous="1" index="40">
+ <chassis name="client" implement="MUST"/>
+ <chassis name="server" implement="MUST"/>
+ <response name="close-ok"/>
+ <field name="reply-code" domain="reply-code"/>
+ <field name="reply-text" domain="reply-text"/>
+ <field name="class-id" domain="class-id"/>
+ <field name="method-id" domain="method-id"/>
+ </method>
+ <method name="close-ok" synchronous="1" index="41">
+ <chassis name="client" implement="MUST"/>
+ <chassis name="server" implement="MUST"/>
+ </method>
+ </class>
+ <class name="exchange" handler="channel" index="40">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+ <method name="declare" synchronous="1" index="10">
+ <chassis name="server" implement="MUST"/>
+ <response name="declare-ok"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="exchange" domain="exchange-name">
+ <assert check="notnull"/>
+ </field>
+ <field name="type" domain="shortstr"/>
+ <field name="passive" domain="bit"/>
+ <field name="durable" domain="bit"/>
+ <field name="auto-delete" type="bit" reserved="1"/>
+ <field name="internal" type="bit" reserved="1"/>
+ <field name="nowait" domain="bit"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name="declare-ok" synchronous="1" index="11">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ <method name="delete" synchronous="1" index="20">
+ <chassis name="server" implement="MUST"/>
+ <response name="delete-ok"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="exchange" domain="exchange-name">
+ <assert check="notnull"/>
+ </field>
+ <field name="if-unused" domain="bit"/>
+ <field name="nowait" domain="bit"/>
+ </method>
+ <method name="delete-ok" synchronous="1" index="21">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ <!-- Qpid : Added Exchange.bound and Exchange.bound-ok -->
+ <method name="bound" synchronous="1" index="22">
+ <chassis name="server" implement="SHOULD"/>
+ <response name="bound-ok"/>
+ <field name="exchange" domain="exchange-name"/>
+ <field name = "routing-key" type = "shortstr"/>
+ <field name = "queue" domain = "queue name"/>
+ </method>
+ <method name="bound-ok" synchronous="1" index="23">
+ <field name="reply-code" domain="reply-code"/>
+ <field name="reply-text" domain="reply-text"/>
+ <chassis name="client" implement="SHOULD"/>
+ </method>
+ <!-- End Qpid addition -->
+ </class>
+ <class name="queue" handler="channel" index="50">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MUST"/>
+ <method name="declare" synchronous="1" index="10">
+ <chassis name="server" implement="MUST"/>
+ <response name="declare-ok"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="queue" domain="queue-name"/>
+ <field name="passive" domain="bit"/>
+ <field name="durable" domain="bit"/>
+ <field name="exclusive" domain="bit"/>
+ <field name="auto-delete" domain="bit"/>
+ <field name="nowait" domain="bit"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name="declare-ok" synchronous="1" index="11">
+ <chassis name="client" implement="MUST"/>
+ <field name="queue" domain="queue-name">
+ <assert check="notnull"/>
+ </field>
+ <field name="message-count" domain="long"/>
+ <field name="consumer-count" domain="long"/>
+ </method>
+ <method name="bind" synchronous="1" index="20">
+ <chassis name="server" implement="MUST"/>
+ <response name="bind-ok"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="queue" domain="queue-name"/>
+ <field name="exchange" domain="exchange-name"/>
+ <field name="routing-key" domain="shortstr"/>
+ <field name="nowait" domain="bit"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name="bind-ok" synchronous="1" index="21">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ <method name="unbind" synchronous="1" index="50">
+ <chassis name="server" implement="MUST"/>
+ <response name="unbind-ok"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="queue" domain="queue-name"/>
+ <field name="exchange" domain="exchange-name"/>
+ <field name="routing-key" domain="shortstr"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name="unbind-ok" synchronous="1" index="51">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ <method name="purge" synchronous="1" index="30">
+ <chassis name="server" implement="MUST"/>
+ <response name="purge-ok"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="queue" domain="queue-name"/>
+ <field name="nowait" domain="bit"/>
+ </method>
+ <method name="purge-ok" synchronous="1" index="31">
+ <chassis name="client" implement="MUST"/>
+ <field name="message-count" domain="long"/>
+ </method>
+ <method name="delete" synchronous="1" index="40">
+ <chassis name="server" implement="MUST"/>
+ <response name="delete-ok"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="queue" domain="queue-name"/>
+ <field name="if-unused" domain="bit"/>
+ <field name="if-empty" domain="bit"/>
+ <field name="nowait" domain="bit"/>
+ </method>
+ <method name="delete-ok" synchronous="1" index="41">
+ <chassis name="client" implement="MUST"/>
+ <field name="message-count" domain="long"/>
+ </method>
+ </class>
+ <class name="basic" handler="channel" index="60">
+ <chassis name="server" implement="MUST"/>
+ <chassis name="client" implement="MAY"/>
+ <field name="content-type" domain="shortstr"/>
+ <field name="content-encoding" domain="shortstr"/>
+ <field name="headers" domain="table"/>
+ <field name="delivery-mode" domain="octet"/>
+ <field name="priority" domain="octet"/>
+ <field name="correlation-id" domain="shortstr"/>
+ <field name="reply-to" domain="shortstr"/>
+ <field name="expiration" domain="shortstr"/>
+ <field name="message-id" domain="shortstr"/>
+ <field name="timestamp" domain="timestamp"/>
+ <field name="type" domain="shortstr"/>
+ <field name="user-id" domain="shortstr"/>
+ <field name="app-id" domain="shortstr"/>
+ <field name="reserved" domain="shortstr"/>
+ <method name="qos" synchronous="1" index="10">
+ <chassis name="server" implement="MUST"/>
+ <response name="qos-ok"/>
+ <field name="prefetch-size" domain="long"/>
+ <field name="prefetch-count" domain="short"/>
+ <field name="global" domain="bit"/>
+ </method>
+ <method name="qos-ok" synchronous="1" index="11">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ <method name="consume" synchronous="1" index="20">
+ <chassis name="server" implement="MUST"/>
+ <response name="consume-ok"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="queue" domain="queue-name"/>
+ <field name="consumer-tag" domain="consumer-tag"/>
+ <field name="no-local" domain="no-local"/>
+ <field name="no-ack" domain="no-ack"/>
+ <field name="exclusive" domain="bit"/>
+ <field name="nowait" domain="bit"/>
+ <field name="arguments" domain="table"/>
+ </method>
+ <method name="consume-ok" synchronous="1" index="21">
+ <chassis name="client" implement="MUST"/>
+ <field name="consumer-tag" domain="consumer-tag"/>
+ </method>
+ <method name="cancel" synchronous="1" index="30">
+ <chassis name="server" implement="MUST"/>
+ <response name="cancel-ok"/>
+ <field name="consumer-tag" domain="consumer-tag"/>
+ <field name="nowait" domain="bit"/>
+ </method>
+ <method name="cancel-ok" synchronous="1" index="31">
+ <chassis name="client" implement="MUST"/>
+ <field name="consumer-tag" domain="consumer-tag"/>
+ </method>
+ <method name="publish" content="1" index="40">
+ <chassis name="server" implement="MUST"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="exchange" domain="exchange-name"/>
+ <field name="routing-key" domain="shortstr"/>
+ <field name="mandatory" domain="bit"/>
+ <field name="immediate" domain="bit"/>
+ </method>
+ <method name="return" content="1" index="50">
+ <chassis name="client" implement="MUST"/>
+ <field name="reply-code" domain="reply-code"/>
+ <field name="reply-text" domain="reply-text"/>
+ <field name="exchange" domain="exchange-name"/>
+ <field name="routing-key" domain="shortstr"/>
+ </method>
+ <method name="deliver" content="1" index="60">
+ <chassis name="client" implement="MUST"/>
+ <field name="consumer-tag" domain="consumer-tag"/>
+ <field name="delivery-tag" domain="delivery-tag"/>
+ <field name="redelivered" domain="redelivered"/>
+ <field name="exchange" domain="exchange-name"/>
+ <field name="routing-key" domain="shortstr"/>
+ </method>
+ <method name="get" synchronous="1" index="70">
+ <response name="get-ok"/>
+ <response name="get-empty"/>
+ <chassis name="server" implement="MUST"/>
+ <field name="ticket" domain="access-ticket" reserved="1"/>
+ <field name="queue" domain="queue-name"/>
+ <field name="no-ack" domain="no-ack"/>
+ </method>
+ <method name="get-ok" synchronous="1" content="1" index="71">
+ <chassis name="client" implement="MAY"/>
+ <field name="delivery-tag" domain="delivery-tag"/>
+ <field name="redelivered" domain="redelivered"/>
+ <field name="exchange" domain="exchange-name"/>
+ <field name="routing-key" domain="shortstr"/>
+ <field name="message-count" domain="long"/>
+ </method>
+ <method name="get-empty" synchronous="1" index="72">
+ <chassis name="client" implement="MAY"/>
+ <field name="cluster-id" type="shortstr" reserved="1"/>
+ </method>
+ <method name="ack" index="80">
+ <chassis name="server" implement="MUST"/>
+ <field name="delivery-tag" domain="delivery-tag"/>
+ <field name="multiple" domain="bit"/>
+ </method>
+ <method name="reject" index="90">
+ <chassis name="server" implement="MUST"/>
+ <field name="delivery-tag" domain="delivery-tag"/>
+ <field name="requeue" domain="bit"/>
+ </method>
+ <method name="recover" index="100" deprecated="1">
+ <chassis name="server" implement="MAY"/>
+ <field name="requeue" domain="bit"/>
+ </method>
+ <method name="recover-sync" index="110">
+ <chassis name="server" implement="MUST"/>
+ <field name="requeue" domain="bit"/>
+ </method>
+ <method name="recover-sync-ok" synchronous="1" index="111">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ </class>
+ <class name="tx" handler="channel" index="90">
+ <chassis name="server" implement="SHOULD"/>
+ <chassis name="client" implement="MAY"/>
+ <method name="select" synchronous="1" index="10">
+ <chassis name="server" implement="MUST"/>
+ <response name="select-ok"/>
+ </method>
+ <method name="select-ok" synchronous="1" index="11">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ <method name="commit" synchronous="1" index="20">
+ <chassis name="server" implement="MUST"/>
+ <response name="commit-ok"/>
+ </method>
+ <method name="commit-ok" synchronous="1" index="21">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ <method name="rollback" synchronous="1" index="30">
+ <chassis name="server" implement="MUST"/>
+ <response name="rollback-ok"/>
+ </method>
+ <method name="rollback-ok" synchronous="1" index="31">
+ <chassis name="client" implement="MUST"/>
+ </method>
+ </class>
+</amqp>