summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java64
1 files changed, 31 insertions, 33 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 19a7a15ad1..c5a610c7b6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -20,25 +20,26 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoredMessage;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.nio.ByteBuffer;
public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
{
@@ -69,8 +70,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private Exchange _exchange;
-
- private int _receivedChunkCount = 0;
private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
// we keep both the original meta data object and the store reference to it just in case the
@@ -79,13 +78,20 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private MessageMetaData _messageMetaData;
private StoredMessage<MessageMetaData> _storedMessageHandle;
+ private Object _connectionReference;
public IncomingMessage(
final MessagePublishInfo info
)
{
+ this(info, null);
+ }
+
+ public IncomingMessage(MessagePublishInfo info, Object reference)
+ {
_messagePublishInfo = info;
+ _connectionReference = reference;
}
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
@@ -124,12 +130,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
}
- public MessageMetaData headersReceived()
- {
-
- return headersReceived(System.currentTimeMillis());
- }
-
public MessageMetaData headersReceived(long currentTime)
{
_messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
@@ -142,21 +142,15 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _destinationQueues;
}
- public int addContentBodyFrame(final ContentChunk contentChunk)
- throws AMQException
+ public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
-
-
-
- return _receivedChunkCount++;
}
public boolean allContentReceived()
{
- return (_bodyLengthReceived == getContentHeader().bodySize);
+ return (_bodyLengthReceived == getContentHeader().getBodySize());
}
public AMQShortString getExchange()
@@ -217,7 +211,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public long getSize()
{
- return getContentHeader().bodySize;
+ return getContentHeader().getBodySize();
}
public long getMessageNumber()
@@ -251,18 +245,12 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _expiration;
}
- public int getReceivedChunkCount()
- {
- return _receivedChunkCount;
- }
-
-
public int getBodyCount() throws AMQException
{
return _contentChunks.size();
}
- public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+ public ContentChunk getContentChunk(int index)
{
return _contentChunks.get(index);
}
@@ -317,4 +305,14 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
{
return _storedMessageHandle;
}
+
+ public Object getConnectionReference()
+ {
+ return _connectionReference;
+ }
+
+ public MessageMetaData getMessageMetaData()
+ {
+ return _messageMetaData;
+ }
}