summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java257
1 files changed, 135 insertions, 122 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index c76c262edd..319b5cc7bd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -1,25 +1,48 @@
package org.apache.qpid.server.output.amqp0_9;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
-import org.apache.mina.common.ByteBuffer;
-import java.util.Iterator;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
+ private static final ProtocolVersionMethodConverter
+ PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -47,20 +70,46 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return _protocolSession;
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
- final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
- final int bodyCount = messageHandle.getBodyCount(storeContext);
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
- if(bodyCount == 0)
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
{
SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
contentHeaderBody);
@@ -69,101 +118,75 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
else
{
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+ int writtenSize = 0;
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
- AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
- CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
+ while(writtenSize < bodySize)
{
- cb = messageHandle.getContentChunk(storeContext, i);
- writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
- }
-
+ buf = java.nio.ByteBuffer.allocate(capacity);
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ }
}
-
}
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
-
+
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
contentHeaderBody);
return contentHeader;
}
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final StoreContext storeContext = message.getStoreContext();
-
- AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
- AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
- final int bodyCount = messageHandle.getBodyCount(storeContext);
- if(bodyCount == 0)
+ if(entry.getMessage() instanceof AMQMessage)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- writeFrame(compositeBlock);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
}
else
{
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(storeContext, i);
- writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
- }
-
-
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
}
-
- }
-
-
- private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
- throws AMQException
- {
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
- final boolean isRedelivered = messageHandle.isRedelivered();
- final AMQShortString exchangeName = pb.getExchange();
- final AMQShortString routingKey = pb.getRoutingKey();
+ final boolean isRedelivered = entry.isRedelivered();
final AMQBody returnBlock = new AMQBody()
{
@@ -216,22 +239,37 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return returnBlock;
}
- private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
throws AMQException
{
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+ final boolean isRedelivered = entry.isRedelivered();
BasicGetOkBody getOkBody =
METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey(),
+ isRedelivered,
+ exchangeName,
+ routingKey,
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame;
+ return getOkBody;
}
public byte getProtocolMinorVersion()
@@ -244,53 +282,28 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
{
BasicReturnBody basicReturnBody =
METHOD_REGISTRY.createBasicReturnBody(replyCode,
replyText,
- message.getMessagePublishInfo().getExchange(),
- message.getMessagePublishInfo().getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
- return returnFrame;
+
+ return basicReturnBody;
}
- public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
- writeFrame(compositeBlock);
- }
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- writeFrame(bodyFrameIterator.next());
- }
+ writeMessageDelivery(message, header, channelId, returnFrame);
}