diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java | 420 |
1 files changed, 0 insertions, 420 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java deleted file mode 100644 index 1e62e5e9ca..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ /dev/null @@ -1,420 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * This file is auto-generated by Qpid Gentools v.0.1 - do not modify. - * Supported AMQP versions: - * 8-0 - */ -package org.apache.qpid.server.output.amqp0_8; - -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.HeaderPropertiesConverter; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.AMQException; -import org.apache.qpid.transport.DeliveryProperties; - -import java.io.DataOutput; -import java.io.IOException; - -public class ProtocolOutputConverterImpl implements ProtocolOutputConverter -{ - - private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - - public static Factory getInstanceFactory() - { - return new Factory() - { - - public ProtocolOutputConverter newInstance(AMQProtocolSession session) - { - return new ProtocolOutputConverterImpl(session); - } - }; - } - - - private final AMQProtocolSession _protocolSession; - - private ProtocolOutputConverterImpl(AMQProtocolSession session) - { - _protocolSession = session; - } - - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) - throws AMQException - { - AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag); - writeMessageDelivery(entry, channelId, deliverBody); - } - - - private ContentHeaderBody getContentHeaderBody(QueueEntry entry) - throws AMQException - { - if(entry.getMessage() instanceof AMQMessage) - { - return ((AMQMessage)entry.getMessage()).getContentHeaderBody(); - } - else - { - final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); - ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID); - chb.bodySize = message.getSize(); - return chb; - } - } - - - private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody) - throws AMQException - { - writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody); - } - - private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) - throws AMQException - { - - - int bodySize = (int) message.getSize(); - - if(bodySize == 0) - { - SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, - contentHeaderBody); - - writeFrame(compositeBlock); - } - else - { - int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead(); - - - int capacity = bodySize > maxBodySize ? maxBodySize : bodySize; - - int writtenSize = capacity; - - AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); - - CompositeAMQBodyBlock - compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); - writeFrame(compositeBlock); - - while(writtenSize < bodySize) - { - capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; - MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); - writtenSize += capacity; - - writeFrame(new AMQFrame(channelId, body)); - } - } - } - - private class MessageContentSourceBody implements AMQBody - { - public static final byte TYPE = 3; - private int _length; - private MessageContentSource _message; - private int _offset; - - public MessageContentSourceBody(MessageContentSource message, int offset, int length) - { - _message = message; - _offset = offset; - _length = length; - } - - public byte getFrameType() - { - return TYPE; - } - - public int getSize() - { - return _length; - } - - public void writePayload(DataOutput buffer) throws IOException - { - byte[] data = new byte[_length]; - - _message.getContent(java.nio.ByteBuffer.wrap(data), _offset); - - buffer.write(data); - } - - public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException - { - throw new UnsupportedOperationException(); - } - } - - private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody) - { - - AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, - contentHeaderBody); - return contentHeader; - } - - - public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException - { - AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); - writeMessageDelivery(entry, channelId, deliver); - } - - - private AMQBody createEncodedDeliverBody(QueueEntry entry, - final long deliveryTag, - final AMQShortString consumerTag) - throws AMQException - { - - final AMQShortString exchangeName; - final AMQShortString routingKey; - - if(entry.getMessage() instanceof AMQMessage) - { - final AMQMessage message = (AMQMessage) entry.getMessage(); - final MessagePublishInfo pb = message.getMessagePublishInfo(); - exchangeName = pb.getExchange(); - routingKey = pb.getRoutingKey(); - } - else - { - MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); - routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); - } - - final boolean isRedelivered = entry.isRedelivered(); - - final AMQBody returnBlock = new AMQBody() - { - - public AMQBody _underlyingBody; - - public AMQBody createAMQBody() - { - return METHOD_REGISTRY.createBasicDeliverBody(consumerTag, - deliveryTag, - isRedelivered, - exchangeName, - routingKey); - - - - - - } - - public byte getFrameType() - { - return AMQMethodBody.TYPE; - } - - public int getSize() - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - return _underlyingBody.getSize(); - } - - public void writePayload(DataOutput buffer) throws IOException - { - if(_underlyingBody == null) - { - _underlyingBody = createAMQBody(); - } - _underlyingBody.writePayload(buffer); - } - - public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) - throws AMQException - { - throw new AMQException("This block should never be dispatched!"); - } - }; - return returnBlock; - } - - private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize) - throws AMQException - { - final AMQShortString exchangeName; - final AMQShortString routingKey; - - if(entry.getMessage() instanceof AMQMessage) - { - final AMQMessage message = (AMQMessage) entry.getMessage(); - final MessagePublishInfo pb = message.getMessagePublishInfo(); - exchangeName = pb.getExchange(); - routingKey = pb.getRoutingKey(); - } - else - { - MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); - routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); - } - - final boolean isRedelivered = entry.isRedelivered(); - - BasicGetOkBody getOkBody = - METHOD_REGISTRY.createBasicGetOkBody(deliveryTag, - isRedelivered, - exchangeName, - routingKey, - queueSize); - - return getOkBody; - } - - public byte getProtocolMinorVersion() - { - return getProtocolSession().getProtocolMinorVersion(); - } - - public byte getProtocolMajorVersion() - { - return getProtocolSession().getProtocolMajorVersion(); - } - - private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, - int replyCode, - AMQShortString replyText) throws AMQException - { - - BasicReturnBody basicReturnBody = - METHOD_REGISTRY.createBasicReturnBody(replyCode, - replyText, - messagePublishInfo.getExchange(), - messagePublishInfo.getRoutingKey()); - - - return basicReturnBody; - } - - public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText) - throws AMQException - { - - AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText); - - writeMessageDelivery(message, header, channelId, returnFrame); - } - - - public void writeFrame(AMQDataBlock block) - { - getProtocolSession().writeFrame(block); - } - - - public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) - { - - BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag); - writeFrame(basicCancelOkBody.generateFrame(channelId)); - - } - - - public static final class CompositeAMQBodyBlock extends AMQDataBlock - { - public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); - - private final AMQBody _methodBody; - private final AMQBody _headerBody; - private final AMQBody _contentBody; - private final int _channel; - - - public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) - { - _channel = channel; - _methodBody = methodBody; - _headerBody = headerBody; - _contentBody = contentBody; - - } - - public long getSize() - { - return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); - } - - public void writePayload(DataOutput buffer) throws IOException - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); - } - } - - public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock - { - public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); - - private final AMQBody _methodBody; - private final AMQBody _headerBody; - private final int _channel; - - - public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) - { - _channel = channel; - _methodBody = methodBody; - _headerBody = headerBody; - - } - - public long getSize() - { - return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; - } - - public void writePayload(DataOutput buffer) throws IOException - { - AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); - } - } -} |