diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java | 569 |
1 files changed, 284 insertions, 285 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index d4cb7c878f..2b55d294b5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -1,285 +1,284 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. - * Supported AMQP versions: - * 8-0 - */ -package org.apache.qpid.server.output.amqp0_8; - -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQMessageHandle; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.AMQException; - -import org.apache.mina.common.ByteBuffer; - -import java.util.Iterator; - -public class ProtocolOutputConverterImpl implements ProtocolOutputConverter -{ - - - public static Factory getInstanceFactory() - { - return new Factory() - { - - public ProtocolOutputConverter newInstance(AMQProtocolSession session) - { - return new ProtocolOutputConverterImpl(session); - } - }; - } - - private final AMQProtocolSession _protocolSession; - - private ProtocolOutputConverterImpl(AMQProtocolSession session) - { - _protocolSession = session; - } - - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag); - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - message.getContentHeaderBody()); - - final AMQMessageHandle messageHandle = message.getMessageHandle(); - final StoreContext storeContext = message.getStoreContext(); - final Long messageId = message.getMessageId(); - - final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); - - if(bodyCount == 0) - { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - - writeFrame(compositeBlock); - } - else - { - - - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); - writeFrame(compositeBlock); - - // - // Now start writing out the other content bodies - // - for(int i = 1; i < bodyCount; i++) - { - cb = messageHandle.getContentChunk(storeContext,messageId, i); - writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); - } - - - } - - - } - - - public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException - { - - final AMQMessageHandle messageHandle = message.getMessageHandle(); - final StoreContext storeContext = message.getStoreContext(); - final long messageId = message.getMessageId(); - - AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize); - - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - message.getContentHeaderBody()); - - final int bodyCount = messageHandle.getBodyCount(storeContext,messageId); - if(bodyCount == 0) - { - SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, - contentHeader); - writeFrame(compositeBlock); - } - else - { - - - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0); - - AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); - AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); - writeFrame(compositeBlock); - - // - // Now start writing out the other content bodies - // - for(int i = 1; i < bodyCount; i++) - { - cb = messageHandle.getContentChunk(storeContext, messageId, i); - writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); - } - - - } - - - } - - - private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - final MessagePublishInfo pb = message.getMessagePublishInfo(); - final AMQMessageHandle messageHandle = message.getMessageHandle(); - - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - BasicDeliverBody deliverBody = - methodRegistry.createBasicDeliverBody(consumerTag, - deliveryTag, - messageHandle.isRedelivered(), - pb.getExchange(), - pb.getRoutingKey()); - AMQFrame deliverFrame = deliverBody.generateFrame(channelId); - - - return deliverFrame; - } - - private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize) - throws AMQException - { - final MessagePublishInfo pb = message.getMessagePublishInfo(); - final AMQMessageHandle messageHandle = message.getMessageHandle(); - - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - BasicGetOkBody getOkBody = - methodRegistry.createBasicGetOkBody(deliveryTag, - messageHandle.isRedelivered(), - pb.getExchange(), - pb.getRoutingKey(), - queueSize); - AMQFrame getOkFrame = getOkBody.generateFrame(channelId); - - return getOkFrame; - } - - public byte getProtocolMinorVersion() - { - return getProtocolSession().getProtocolMinorVersion(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolSession().getProtocolMajorVersion(); - } - - private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException - { - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - BasicReturnBody basicReturnBody = - methodRegistry.createBasicReturnBody(replyCode, - replyText, - message.getMessagePublishInfo().getExchange(), - message.getMessagePublishInfo().getRoutingKey()); - AMQFrame returnFrame = basicReturnBody.generateFrame(channelId); - - return returnFrame; - } - - public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText); - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - message.getContentHeaderBody()); - - Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId); - // - // Optimise the case where we have a single content body. In that case we create a composite block - // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. - // - if (bodyFrameIterator.hasNext()) - { - AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); - writeFrame(compositeBlock); - } - else - { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader}); - - writeFrame(compositeBlock); - } - - // - // Now start writing out the other content bodies - // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded - // - while (bodyFrameIterator.hasNext()) - { - writeFrame(bodyFrameIterator.next()); - } - } - - - public void writeFrame(AMQDataBlock block) - { - getProtocolSession().writeFrame(block); - } - - - public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) - { - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag); - writeFrame(basicCancelOkBody.generateFrame(channelId)); - - } -} +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Iterator;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext);
+
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+
+ AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext);
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+ BasicDeliverBody deliverBody =
+ methodRegistry.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ messageHandle.isRedelivered(),
+ pb.getExchange(),
+ pb.getRoutingKey());
+ AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+
+
+ return deliverFrame;
+ }
+
+ private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+ BasicGetOkBody getOkBody =
+ methodRegistry.createBasicGetOkBody(deliveryTag,
+ messageHandle.isRedelivered(),
+ pb.getExchange(),
+ pb.getRoutingKey(),
+ queueSize);
+ AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
+
+ return getOkFrame;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+ BasicReturnBody basicReturnBody =
+ methodRegistry.createBasicReturnBody(replyCode,
+ replyText,
+ message.getMessagePublishInfo().getExchange(),
+ message.getMessagePublishInfo().getRoutingKey());
+ AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+
+ return returnFrame;
+ }
+
+ public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
+
+ writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ writeFrame(bodyFrameIterator.next());
+ }
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+ BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+}
|