diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java | 82 |
1 files changed, 12 insertions, 70 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java index 0334a54fab..4c9fe81439 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -54,23 +54,11 @@ public class TransientAMQMessage implements AMQMessage protected final Long _messageId; - /** Flag to indicate that this message requires 'immediate' delivery. */ - - private static final byte IMMEDIATE = 0x01; - - /** - * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality - * for messages published with the 'immediate' flag. - */ - - private static final byte DELIVERED_TO_CONSUMER = 0x02; private byte _flags = 0; - private long _expiration; - private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; - private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + private long _expiration; /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory @@ -165,11 +153,16 @@ public class TransientAMQMessage implements AMQMessage return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() +")"; } - public void setExpiration(final long expiration) + public void setExpiration(long expiration) { _expiration = expiration; } + public long getExpiration() + { + return _expiration; + } + public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { return new BodyFrameIterator(protocolSession, channel); @@ -190,57 +183,6 @@ public class TransientAMQMessage implements AMQMessage return _messageId; } - /** - * Called selectors to determin if the message has already been sent - * - * @return _deliveredToConsumer - */ - public boolean getDeliveredToConsumer() - { - return (_flags & DELIVERED_TO_CONSUMER) != 0; - } - - /** - * Called to enforce the 'immediate' flag. - * - * @returns true if the message is marked for immediate delivery but has not been marked as delivered - * to a consumer - */ - public boolean immediateAndNotDelivered() - { - - return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; - - } - - /** - * Checks to see if the message has expired. If it has the message is dequeued. - * - * @return true if the message has expire - * - * @throws AMQException - */ - public boolean expired() throws AMQException - { - - if (_expiration != 0L) - { - long now = System.currentTimeMillis(); - - return (now > _expiration); - } - - return false; - } - - /** - * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). - * And for selector efficiency. - */ - public void setDeliveredToConsumer() - { - _flags |= DELIVERED_TO_CONSUMER; - } public long getSize() { @@ -315,6 +257,11 @@ public class TransientAMQMessage implements AMQMessage return false; } + public boolean isImmediate() + { + return _messagePublishInfo.isImmediate(); + } + /** * This is called when all the content has been received. * @@ -366,11 +313,6 @@ public class TransientAMQMessage implements AMQMessage { _contentBodies = Collections.EMPTY_LIST; } - - if (_messagePublishInfo.isImmediate()) - { - _flags |= IMMEDIATE; - } } public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException |