diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 48 |
1 files changed, 29 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index dbad5438dc..4cb07c3006 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageReference; import org.apache.log4j.Logger; import java.util.Set; @@ -42,7 +44,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; + private MessageReference _message; private Set<Subscription> _rejectedBy = null; @@ -75,6 +77,8 @@ public class QueueEntryImpl implements QueueEntry private volatile long _entryId; volatile QueueEntryImpl _next; + private boolean _deliveredToConsumer; + private boolean _redelivered; QueueEntryImpl(SimpleQueueEntryList queueEntryList) @@ -84,18 +88,18 @@ public class QueueEntryImpl implements QueueEntry } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId) + public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) { _queueEntryList = queueEntryList; - _message = message; + _message = message == null ? null : message.newReference(); _entryIdUpdater.set(this, entryId); } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message) + public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; - _message = message; + _message = message == null ? null : message.newReference(); } protected void setEntryId(long entryId) @@ -113,9 +117,9 @@ public class QueueEntryImpl implements QueueEntry return _queueEntryList.getQueue(); } - public AMQMessage getMessage() + public ServerMessage getMessage() { - return _message; + return _message == null ? null : _message.getMessage(); } public long getSize() @@ -125,12 +129,21 @@ public class QueueEntryImpl implements QueueEntry public boolean getDeliveredToConsumer() { - return getMessage().getDeliveredToConsumer(); + return _deliveredToConsumer; } public boolean expired() throws AMQException { - return getMessage().expired(getQueue()); + long expiration = getMessage().getExpiration(); + if (expiration != 0L) + { + long now = System.currentTimeMillis(); + + return (now > expiration); + } + + return false; + } public boolean isAcquired() @@ -167,7 +180,7 @@ public class QueueEntryImpl implements QueueEntry public void setDeliveredToSubscription() { - getMessage().setDeliveredToConsumer(); + _deliveredToConsumer = true; } public void release() @@ -175,20 +188,15 @@ public class QueueEntryImpl implements QueueEntry _stateUpdater.set(this,AVAILABLE_STATE); } - public String debugIdentity() - { - return getMessage().debugIdentity(); - } - public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return getMessage().isImmediate() && !_deliveredToConsumer; } public void setRedelivered(boolean b) { - getMessage().setRedelivered(b); + _redelivered = b; } public Subscription getDeliveredSubscription() @@ -223,7 +231,7 @@ public class QueueEntryImpl implements QueueEntry } else { - _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); + _log.warn("Requesting rejection by null subscriber:" + this); } } @@ -284,7 +292,9 @@ public class QueueEntryImpl implements QueueEntry { if(delete()) { - getMessage().decrementReference(storeContext); + StoreContext sc = StoreContext.setCurrentContext(storeContext); + _message.release(); + StoreContext.setCurrentContext(sc); } } |