summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java41
1 files changed, 19 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 0162f1b738..c5a610c7b6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -70,8 +70,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private Exchange _exchange;
-
- private int _receivedChunkCount = 0;
private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
// we keep both the original meta data object and the store reference to it just in case the
@@ -80,13 +78,20 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private MessageMetaData _messageMetaData;
private StoredMessage<MessageMetaData> _storedMessageHandle;
+ private Object _connectionReference;
public IncomingMessage(
final MessagePublishInfo info
)
{
+ this(info, null);
+ }
+
+ public IncomingMessage(MessagePublishInfo info, Object reference)
+ {
_messagePublishInfo = info;
+ _connectionReference = reference;
}
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
@@ -125,12 +130,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
}
- public MessageMetaData headersReceived()
- {
-
- return headersReceived(System.currentTimeMillis());
- }
-
public MessageMetaData headersReceived(long currentTime)
{
_messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
@@ -143,16 +142,10 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _destinationQueues;
}
- public int addContentBodyFrame(final ContentChunk contentChunk)
- throws AMQException
+ public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
-
-
-
- return _receivedChunkCount++;
}
public boolean allContentReceived()
@@ -252,18 +245,12 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _expiration;
}
- public int getReceivedChunkCount()
- {
- return _receivedChunkCount;
- }
-
-
public int getBodyCount() throws AMQException
{
return _contentChunks.size();
}
- public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+ public ContentChunk getContentChunk(int index)
{
return _contentChunks.get(index);
}
@@ -318,4 +305,14 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
{
return _storedMessageHandle;
}
+
+ public Object getConnectionReference()
+ {
+ return _connectionReference;
+ }
+
+ public MessageMetaData getMessageMetaData()
+ {
+ return _messageMetaData;
+ }
}