diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java | 44 |
1 files changed, 33 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index afe4ea95b9..8603113c11 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.message.MessageDecorator; import org.apache.qpid.server.message.jms.JMSMessage; import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class AMQMessage { + private static final Logger _log = Logger.getLogger(AMQMessage.class); + public static final String JMS_MESSAGE = "jms.message"; private final Set<Object> _tokens = new HashSet<Object>(); @@ -61,6 +64,8 @@ public class AMQMessage private final AtomicInteger _referenceCount = new AtomicInteger(1); + private long _arrivalTime; + /** * Keeps a track of how many bytes we have received in body frames */ @@ -157,20 +162,20 @@ public class AMQMessage public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag) { - + AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()]; // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. allFrames[0] = BasicDeliverBody.createAMQFrame(channel, - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag, // consumerTag - deliveryTag, // deliveryTag - getExchangeName(), // exchange - _redelivered, // redelivered - getRoutingKey() // routingKey - ); + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + getExchangeName(), // exchange + _redelivered, // redelivered + getRoutingKey() // routingKey + ); allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody); for (int i = 2; i < allFrames.length; i++) { @@ -201,6 +206,8 @@ public class AMQMessage public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException { _contentHeaderBody = contentHeaderBody; + _arrivalTime = System.currentTimeMillis(); + if (_storeWhenComplete && isAllContentReceived()) { storeMessage(); @@ -223,6 +230,7 @@ public class AMQMessage _bodyLengthReceived += contentBody.getSize(); if (_storeWhenComplete && isAllContentReceived()) { + _arrivalTime = System.currentTimeMillis(); storeMessage(); } } @@ -263,6 +271,12 @@ public class AMQMessage _redelivered = redelivered; } + + public long getArrivalTime() + { + return _arrivalTime; + } + public long getMessageId() { return _messageId; @@ -299,6 +313,7 @@ public class AMQMessage throw new MessageCleanupException(_messageId, e); } } + } public void setPublisher(AMQProtocolSession publisher) @@ -367,11 +382,17 @@ public class AMQMessage return _txnBuffer; } + public long getSize() + { + return getContentHeaderBody().bodySize; + } + /** * Called to enforce the 'immediate' flag. + * * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer + * immediate delivery but has not been marked as delivered to a + * consumer */ public void checkDeliveredToConsumer() throws NoConsumersException { @@ -393,7 +414,8 @@ public class AMQMessage /** * Called selectors to determin if the message has already been sent - * @return _deliveredToConsumer + * + * @return _deliveredToConsumer */ public boolean getDeliveredToConsumer() { |