diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-03-10 19:22:10 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-03-10 19:22:10 +0000 |
commit | 4eaa4e42093e5524d9552d8fa312c214524b6bb4 (patch) | |
tree | a251d57ee92d9c779fe4455c583be0ed90e69a43 /qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java | |
parent | 92be7e8f3163c048a8642d2deeaa921bbb65dc9c (diff) | |
download | qpid-python-4eaa4e42093e5524d9552d8fa312c214524b6bb4.tar.gz |
NO-JIRA : AMQP-1-0 sandbox updates - merge from trunkrg-amqp-1-0-sandbox
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1299257 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java | 64 |
1 files changed, 31 insertions, 33 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 19a7a15ad1..c5a610c7b6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -20,25 +20,26 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.StoredMessage; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.nio.ByteBuffer; public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource { @@ -69,8 +70,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes private Exchange _exchange; - - private int _receivedChunkCount = 0; private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>(); // we keep both the original meta data object and the store reference to it just in case the @@ -79,13 +78,20 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes private MessageMetaData _messageMetaData; private StoredMessage<MessageMetaData> _storedMessageHandle; + private Object _connectionReference; public IncomingMessage( final MessagePublishInfo info ) { + this(info, null); + } + + public IncomingMessage(MessagePublishInfo info, Object reference) + { _messagePublishInfo = info; + _connectionReference = reference; } public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException @@ -124,12 +130,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes } - public MessageMetaData headersReceived() - { - - return headersReceived(System.currentTimeMillis()); - } - public MessageMetaData headersReceived(long currentTime) { _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime); @@ -142,21 +142,15 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _destinationQueues; } - public int addContentBodyFrame(final ContentChunk contentChunk) - throws AMQException + public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException { - _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); - - - - return _receivedChunkCount++; } public boolean allContentReceived() { - return (_bodyLengthReceived == getContentHeader().bodySize); + return (_bodyLengthReceived == getContentHeader().getBodySize()); } public AMQShortString getExchange() @@ -217,7 +211,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public long getSize() { - return getContentHeader().bodySize; + return getContentHeader().getBodySize(); } public long getMessageNumber() @@ -251,18 +245,12 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _expiration; } - public int getReceivedChunkCount() - { - return _receivedChunkCount; - } - - public int getBodyCount() throws AMQException { return _contentChunks.size(); } - public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException + public ContentChunk getContentChunk(int index) { return _contentChunks.get(index); } @@ -317,4 +305,14 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes { return _storedMessageHandle; } + + public Object getConnectionReference() + { + return _connectionReference; + } + + public MessageMetaData getMessageMetaData() + { + return _messageMetaData; + } } |