summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java95
1 files changed, 67 insertions, 28 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index ac82e1f2b3..da4173d5d3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -25,21 +25,22 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.ContentHeaderBodyAdapter;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.nio.ByteBuffer;
-public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, BodyContentHolder
+public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
{
/** Used for debugging purposes. */
@@ -51,9 +52,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private AMQMessageHandle _messageHandle;
- private final Long _messageId;
-
/**
* Keeps a track of how many bytes we have received in body frames
@@ -70,25 +68,29 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private long _expiration;
private Exchange _exchange;
- private AMQMessageHeader _messageHeader;
private int _receivedChunkCount = 0;
private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
+ // we keep both the original meta data object and the store reference to it just in case the
+ // store would otherwise flow it to disk
+
+ private MessageMetaData _messageMetaData;
+
+ private StoredMessage<MessageMetaData> _storedMessageHandle;
+
- public IncomingMessage(final Long messageId,
- final MessagePublishInfo info,
- final AMQProtocolSession publisher)
+ public IncomingMessage(
+ final MessagePublishInfo info
+ )
{
- _messageId = messageId;
_messagePublishInfo = info;
}
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
- _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody);
}
public void setExpiration()
@@ -122,11 +124,10 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
}
- public MessageMetaData routingComplete(final MessageStore store,
- final MessageHandleFactory factory) throws AMQException
+ public MessageMetaData headersReceived()
{
- _messageHandle = factory.createMessageHandle(_messageId, store, isPersistent());
- return _messageHandle.setPublishAndContentHeaderBody(_messagePublishInfo, _contentHeaderBody);
+ _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0);
+ return _messageMetaData;
}
@@ -135,20 +136,15 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _destinationQueues;
}
-
- public AMQMessageHandle getMessageHandle()
- {
- return _messageHandle;
- }
-
-
public int addContentBodyFrame(final ContentChunk contentChunk)
throws AMQException
{
-
+ _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
- _messageHandle.addContentBodyFrame(contentChunk, allContentReceived());
+
+
+
return _receivedChunkCount++;
}
@@ -192,7 +188,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public AMQMessageHeader getMessageHeader()
{
- return _messageHeader;
+ return _messageMetaData.getMessageHeader();
}
public boolean isPersistent()
@@ -207,6 +203,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return false;
}
+
public long getSize()
{
return getContentHeader().bodySize;
@@ -214,7 +211,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public Long getMessageNumber()
{
- return _messageId;
+ return _storedMessageHandle.getMessageNumber();
}
public void setExchange(final Exchange e)
@@ -258,4 +255,46 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
{
return _contentChunks.get(index);
}
+
+
+ public int getContent(ByteBuffer buf, int offset)
+ {
+ int pos = 0;
+ int written = 0;
+ for(ContentChunk cb : _contentChunks)
+ {
+ ByteBuffer data = cb.getData().buf();
+ if(offset+written >= pos && offset < pos + data.limit())
+ {
+ ByteBuffer src = data.duplicate();
+ src.position(offset+written - pos);
+ src = src.slice();
+
+ if(buf.remaining() < src.limit())
+ {
+ src.limit(buf.remaining());
+ }
+ int count = src.limit();
+ buf.put(src);
+ written += count;
+ if(buf.remaining() == 0)
+ {
+ break;
+ }
+ }
+ pos+=data.limit();
+ }
+ return written;
+
+ }
+
+ public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
+ {
+ _storedMessageHandle = storedMessageHandle;
+ }
+
+ public StoredMessage<MessageMetaData> getStoredMessage()
+ {
+ return _storedMessageHandle;
+ }
}