summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java263
1 files changed, 205 insertions, 58 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 2cebec373e..3970e5a2d4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -26,6 +26,7 @@
*/
package org.apache.qpid.server.output.amqp0_8;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
@@ -34,22 +35,18 @@ import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.DeliveryProperties;
-import java.nio.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
- METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
public static Factory getInstanceFactory()
{
return new Factory()
@@ -62,6 +59,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
};
}
+
private final AMQProtocolSession _protocolSession;
private ProtocolOutputConverterImpl(AMQProtocolSession session)
@@ -78,10 +76,11 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
}
+
private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
throws AMQException
{
@@ -93,65 +92,120 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
+ ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
return chb;
}
}
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
{
- AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
}
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
throws AMQException
{
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
-
+ int bodySize = (int) message.getSize();
- final int bodySize = (int) message.getSize();
if(bodySize == 0)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
writeFrame(compositeBlock);
}
else
{
int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
- final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
- ByteBuffer buf = ByteBuffer.allocate(capacity);
- int writtenSize = 0;
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
while(writtenSize < bodySize)
{
- buf = java.nio.ByteBuffer.allocate(capacity);
- writtenSize += message.getContent(buf, writtenSize);
- buf.flip();
- writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
}
+ }
+ }
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
}
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
}
- private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
throws AMQException
{
+
final AMQShortString exchangeName;
final AMQShortString routingKey;
@@ -172,21 +226,58 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final boolean isRedelivered = entry.isRedelivered();
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
- BasicDeliverBody deliverBody =
- METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame;
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
}
- private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
throws AMQException
{
final AMQShortString exchangeName;
@@ -215,9 +306,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
exchangeName,
routingKey,
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame;
+ return getOkBody;
}
public byte getProtocolMinorVersion()
@@ -230,31 +320,28 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
{
+
BasicReturnBody basicReturnBody =
METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
- return returnFrame;
+
+ return basicReturnBody;
}
- public void writeReturn(MessagePublishInfo messagePublishInfo,
- ContentHeaderBody header,
- MessageContentSource content,
- int channelId,
- int replyCode,
- AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
-
- writeMessageDelivery(content, header, channelId, returnFrame);
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+ writeMessageDelivery(message, header, channelId, returnFrame);
}
@@ -266,8 +353,68 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
+
BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutputStream buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
}