summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java44
1 files changed, 33 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index afe4ea95b9..8603113c11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.message.MessageDecorator;
import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class AMQMessage
{
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
public static final String JMS_MESSAGE = "jms.message";
private final Set<Object> _tokens = new HashSet<Object>();
@@ -61,6 +64,8 @@ public class AMQMessage
private final AtomicInteger _referenceCount = new AtomicInteger(1);
+ private long _arrivalTime;
+
/**
* Keeps a track of how many bytes we have received in body frames
*/
@@ -157,20 +162,20 @@ public class AMQMessage
public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
{
-
+
AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- getExchangeName(), // exchange
- _redelivered, // redelivered
- getRoutingKey() // routingKey
- );
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ getExchangeName(), // exchange
+ _redelivered, // redelivered
+ getRoutingKey() // routingKey
+ );
allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 2; i < allFrames.length; i++)
{
@@ -201,6 +206,8 @@ public class AMQMessage
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
+ _arrivalTime = System.currentTimeMillis();
+
if (_storeWhenComplete && isAllContentReceived())
{
storeMessage();
@@ -223,6 +230,7 @@ public class AMQMessage
_bodyLengthReceived += contentBody.getSize();
if (_storeWhenComplete && isAllContentReceived())
{
+ _arrivalTime = System.currentTimeMillis();
storeMessage();
}
}
@@ -263,6 +271,12 @@ public class AMQMessage
_redelivered = redelivered;
}
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
public long getMessageId()
{
return _messageId;
@@ -299,6 +313,7 @@ public class AMQMessage
throw new MessageCleanupException(_messageId, e);
}
}
+
}
public void setPublisher(AMQProtocolSession publisher)
@@ -367,11 +382,17 @@ public class AMQMessage
return _txnBuffer;
}
+ public long getSize()
+ {
+ return getContentHeaderBody().bodySize;
+ }
+
/**
* Called to enforce the 'immediate' flag.
+ *
* @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException
{
@@ -393,7 +414,8 @@ public class AMQMessage
/**
* Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
+ *
+ * @return _deliveredToConsumer
*/
public boolean getDeliveredToConsumer()
{