diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java | 95 |
1 files changed, 67 insertions, 28 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index ac82e1f2b3..da4173d5d3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -25,21 +25,22 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.ContentHeaderBodyAdapter; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.List; +import java.nio.ByteBuffer; -public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, BodyContentHolder +public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource { /** Used for debugging purposes. */ @@ -51,9 +52,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; - private AMQMessageHandle _messageHandle; - private final Long _messageId; - /** * Keeps a track of how many bytes we have received in body frames @@ -70,25 +68,29 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes private long _expiration; private Exchange _exchange; - private AMQMessageHeader _messageHeader; private int _receivedChunkCount = 0; private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>(); + // we keep both the original meta data object and the store reference to it just in case the + // store would otherwise flow it to disk + + private MessageMetaData _messageMetaData; + + private StoredMessage<MessageMetaData> _storedMessageHandle; + - public IncomingMessage(final Long messageId, - final MessagePublishInfo info, - final AMQProtocolSession publisher) + public IncomingMessage( + final MessagePublishInfo info + ) { - _messageId = messageId; _messagePublishInfo = info; } public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException { _contentHeaderBody = contentHeaderBody; - _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody); } public void setExpiration() @@ -122,11 +124,10 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes } - public MessageMetaData routingComplete(final MessageStore store, - final MessageHandleFactory factory) throws AMQException + public MessageMetaData headersReceived() { - _messageHandle = factory.createMessageHandle(_messageId, store, isPersistent()); - return _messageHandle.setPublishAndContentHeaderBody(_messagePublishInfo, _contentHeaderBody); + _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0); + return _messageMetaData; } @@ -135,20 +136,15 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _destinationQueues; } - - public AMQMessageHandle getMessageHandle() - { - return _messageHandle; - } - - public int addContentBodyFrame(final ContentChunk contentChunk) throws AMQException { - + _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf()); _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); - _messageHandle.addContentBodyFrame(contentChunk, allContentReceived()); + + + return _receivedChunkCount++; } @@ -192,7 +188,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public AMQMessageHeader getMessageHeader() { - return _messageHeader; + return _messageMetaData.getMessageHeader(); } public boolean isPersistent() @@ -207,6 +203,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return false; } + public long getSize() { return getContentHeader().bodySize; @@ -214,7 +211,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public Long getMessageNumber() { - return _messageId; + return _storedMessageHandle.getMessageNumber(); } public void setExchange(final Exchange e) @@ -258,4 +255,46 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes { return _contentChunks.get(index); } + + + public int getContent(ByteBuffer buf, int offset) + { + int pos = 0; + int written = 0; + for(ContentChunk cb : _contentChunks) + { + ByteBuffer data = cb.getData().buf(); + if(offset+written >= pos && offset < pos + data.limit()) + { + ByteBuffer src = data.duplicate(); + src.position(offset+written - pos); + src = src.slice(); + + if(buf.remaining() < src.limit()) + { + src.limit(buf.remaining()); + } + int count = src.limit(); + buf.put(src); + written += count; + if(buf.remaining() == 0) + { + break; + } + } + pos+=data.limit(); + } + return written; + + } + + public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle) + { + _storedMessageHandle = storedMessageHandle; + } + + public StoredMessage<MessageMetaData> getStoredMessage() + { + return _storedMessageHandle; + } } |