diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 322 |
1 files changed, 238 insertions, 84 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ba14be5580..b6e6365189 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -20,30 +20,29 @@ */ package org.apache.qpid.server.queue; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; -import org.apache.log4j.Logger; -import java.util.Set; import java.util.HashSet; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; - +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; public class QueueEntryImpl implements QueueEntry { - /** - * Used for debugging purposes. - */ + /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; + private AtomicReference<AMQMessage> _messageRef; private boolean _redelivered; @@ -52,44 +51,51 @@ public class QueueEntryImpl implements QueueEntry private volatile EntryState _state = AVAILABLE_STATE; private static final - AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState> + AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState> _stateUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, EntryState.class, "_state"); - + AtomicReferenceFieldUpdater.newUpdater + (QueueEntryImpl.class, EntryState.class, "_state"); private volatile Set<StateChangeListener> _stateChangeListeners; private static final - AtomicReferenceFieldUpdater<QueueEntryImpl, Set> - _listenersUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, Set.class, "_stateChangeListeners"); - + AtomicReferenceFieldUpdater<QueueEntryImpl, Set> + _listenersUpdater = + AtomicReferenceFieldUpdater.newUpdater + (QueueEntryImpl.class, Set.class, "_stateChangeListeners"); private static final - AtomicLongFieldUpdater<QueueEntryImpl> + AtomicLongFieldUpdater<QueueEntryImpl> _entryIdUpdater = - AtomicLongFieldUpdater.newUpdater - (QueueEntryImpl.class, "_entryId"); - + AtomicLongFieldUpdater.newUpdater + (QueueEntryImpl.class, "_entryId"); private volatile long _entryId; volatile QueueEntryImpl _next; + private long _messageSize; + private QueueBackingStore _backingStore; + private AtomicBoolean _flowed; + private Long _messageId; + + private byte _flags = 0; + + private long _expiration; + + private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + private boolean _persistent; + private boolean _hasBeenUnloaded = false; QueueEntryImpl(SimpleQueueEntryList queueEntryList) { - this(queueEntryList,null,Long.MIN_VALUE); + this(queueEntryList, null, Long.MIN_VALUE); _state = DELETED_STATE; } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId) { - _queueEntryList = queueEntryList; - _message = message; + this(queueEntryList, message); _entryIdUpdater.set(this, entryId); } @@ -97,7 +103,21 @@ public class QueueEntryImpl implements QueueEntry public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message) { _queueEntryList = queueEntryList; - _message = message; + _messageRef = new AtomicReference<AMQMessage>(message); + if (message != null) + { + _messageId = message.getMessageId(); + _messageSize = message.getSize(); + + if (message.isImmediate()) + { + _flags |= IMMEDIATE; + } + _expiration = message.getExpiration(); + _persistent = message.isPersistent(); + } + _backingStore = queueEntryList.getBackingStore(); + _flowed = new AtomicBoolean(false); } protected void setEntryId(long entryId) @@ -117,22 +137,50 @@ public class QueueEntryImpl implements QueueEntry public AMQMessage getMessage() { - return _message; + return load(); + } + + public Long getMessageId() + { + return _messageId; } public long getSize() { - return getMessage().getSize(); + return _messageSize; } public boolean getDeliveredToConsumer() { - return getMessage().getDeliveredToConsumer(); + return (_flags & DELIVERED_TO_CONSUMER) != 0; + } + + public void setDeliveredToSubscription() + { + _flags |= DELIVERED_TO_CONSUMER; + + // We have delivered this message so we can unload it if we are flowed. + if (_queueEntryList.isFlowed()) + { + unload(); + } } public boolean expired() throws AMQException { - return getMessage().expired(); + if (_expiration != 0L) + { + long now = System.currentTimeMillis(); + + return (now > _expiration); + } + + return false; + } + + public void setExpiration(final long expiration) + { + _expiration = expiration; } public boolean isAcquired() @@ -140,6 +188,11 @@ public class QueueEntryImpl implements QueueEntry return _state.getState() == State.ACQUIRED; } + public boolean isAvailable() + { + return _state.getState() == State.AVAILABLE; + } + public boolean acquire() { return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE); @@ -147,8 +200,8 @@ public class QueueEntryImpl implements QueueEntry private boolean acquire(final EntryState state) { - boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state); - if(acquired && _stateChangeListeners != null) + boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state); + if (acquired && _stateChangeListeners != null) { notifyStateChange(State.AVAILABLE, State.ACQUIRED); } @@ -167,35 +220,41 @@ public class QueueEntryImpl implements QueueEntry return (_state instanceof SubscriptionAcquiredState); } - public void setDeliveredToSubscription() - { - getMessage().setDeliveredToConsumer(); - } - public void release() { - _stateUpdater.set(this,AVAILABLE_STATE); + _stateUpdater.set(this, AVAILABLE_STATE); } public String debugIdentity() { - return getMessage().debugIdentity(); - } + String entry = "[State:" + _state.getState().name() + "]"; + AMQMessage message = _messageRef.get(); - public boolean immediateAndNotDelivered() + if (message == null) + { + return entry + "(Message Unloaded ID:" + _messageId + ")"; + } + else + { + + return entry + message.debugIdentity(); + } + } + + public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; } public ContentHeaderBody getContentHeaderBody() throws AMQException { - return _message.getContentHeaderBody(); + return getMessage().getContentHeaderBody(); } public boolean isPersistent() throws AMQException { - return _message.isPersistent(); + return _persistent; } public boolean isRedelivered() @@ -206,21 +265,21 @@ public class QueueEntryImpl implements QueueEntry public void setRedelivered(boolean redelivered) { _redelivered = redelivered; - // todo - here we could mark this message as redelivered so we don't have to mark - // all messages on recover as redelivered. + // todo - here we could record this message as redelivered on this queue in the transactionLog + // so we don't have to mark all messages on recover as redelivered. } public Subscription getDeliveredSubscription() { - EntryState state = _state; - if (state instanceof SubscriptionAcquiredState) - { - return ((SubscriptionAcquiredState) state).getSubscription(); - } - else - { - return null; - } + EntryState state = _state; + if (state instanceof SubscriptionAcquiredState) + { + return ((SubscriptionAcquiredState) state).getSubscription(); + } + else + { + return null; + } } @@ -247,7 +306,7 @@ public class QueueEntryImpl implements QueueEntry } public boolean isRejectedBy(Subscription subscription) - { + { if (_rejectedBy != null) // We have subscriptions that rejected this message { @@ -259,11 +318,10 @@ public class QueueEntryImpl implements QueueEntry } } - public void requeue(final StoreContext storeContext) throws AMQException { getQueue().requeue(storeContext, this); - if(_stateChangeListeners != null) + if (_stateChangeListeners != null) { notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); } @@ -273,7 +331,7 @@ public class QueueEntryImpl implements QueueEntry { EntryState state = _state; - if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) + if ((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { if (state instanceof SubscriptionAcquiredState) { @@ -281,41 +339,34 @@ public class QueueEntryImpl implements QueueEntry s.restoreCredit(this); } + _queueEntryList.dequeued(this); + getQueue().dequeue(storeContext, this); - if(_stateChangeListeners != null) + + if (_stateChangeListeners != null) { - notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); + notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED); } - } - } private void notifyStateChange(final State oldState, final State newState) { - for(StateChangeListener l : _stateChangeListeners) + for (StateChangeListener l : _stateChangeListeners) { l.stateChanged(this, oldState, newState); } } - public void dispose(final StoreContext storeContext) throws MessageCleanupException + public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException { - if(delete()) - { - getMessage().decrementReference(storeContext); - } - } - - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException - { - //if the queue is null then the message is waiting to be acked, but has been removed. + //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d; if (getQueue() != null) { dequeue(storeContext); } - dispose(storeContext); + delete(); } public boolean isQueueDeleted() @@ -326,7 +377,7 @@ public class QueueEntryImpl implements QueueEntry public void addStateChangeListener(StateChangeListener listener) { Set<StateChangeListener> listeners = _stateChangeListeners; - if(listeners == null) + if (listeners == null) { _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>()); listeners = _stateChangeListeners; @@ -338,7 +389,7 @@ public class QueueEntryImpl implements QueueEntry public boolean removeStateChangeListener(StateChangeListener listener) { Set<StateChangeListener> listeners = _stateChangeListeners; - if(listeners != null) + if (listeners != null) { return listeners.remove(listener); } @@ -346,10 +397,108 @@ public class QueueEntryImpl implements QueueEntry return false; } + public void unload() + { + //Get the currently loaded message + AMQMessage message = _messageRef.get(); + + // If we have a message in memory and we have a valid backingStore attempt to unload + if (message != null && _backingStore != null) + { + try + { + // The backingStore will now handle concurrent calls to unload and safely synchronize to ensure + // multiple initial unloads are unloads + _backingStore.unload(message); + _hasBeenUnloaded = true; + _messageRef.set(null); + + if (_log.isDebugEnabled()) + { + _log.debug("Unloaded:" + debugIdentity()); + } + + + // Clear the message reference if the loaded message is still the one we are processing. + + //Update the memoryState if this load call resulted in the message being purged from memory + if (!_flowed.getAndSet(true)) + { + _queueEntryList.entryUnloadedUpdateMemory(this); + } + + } + catch (UnableToFlowMessageException utfme) + { + // There is no recovery needed as the memory states remain unchanged. + if (_log.isDebugEnabled()) + { + _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage()); + } + } + } + } + + public AMQMessage load() + { + // MessageId and Backing store are null in test scenarios, normally this is not the case. + if (_messageId != null && _backingStore != null) + { + // See if we have the message currently in memory to return + AMQMessage message = _messageRef.get(); + // if we don't then we need to start a load process. + if (message == null) + { + //Synchronize here to ensure only the first thread that attempts to load will perform the load from the + // backing store. + synchronized (this) + { + // Check again to see if someone else ahead of us loaded the message + message = _messageRef.get(); + // if we still don't have the message then we need to start a load process. + if (message == null) + { + // Load the message and keep a reference to it + message = _backingStore.load(_messageId); + // Set the message reference + _messageRef.set(message); + } + else + { + // If someone else loaded the message then we can jump out here as the Memory Updates will + // have been performed by the loading thread + return message; + } + } + + if (_log.isDebugEnabled()) + { + _log.debug("Loaded:" + debugIdentity()); + } + + //Update the memoryState if this load call resulted in the message comming in to memory + if (_flowed.getAndSet(false)) + { + _queueEntryList.entryLoadedUpdateMemory(this); + } + } + + // Return the message that was either already in memory or the value we just loaded. + return message; + } + // This can be null but only in the case where we have no messageId + // in the case where we have no backingStore then we will never have unloaded the message + return _messageRef.get(); + } + + public boolean isFlowed() + { + return _flowed.get(); + } public int compareTo(final QueueEntry o) { - QueueEntryImpl other = (QueueEntryImpl)o; + QueueEntryImpl other = (QueueEntryImpl) o; return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; } @@ -357,13 +506,13 @@ public class QueueEntryImpl implements QueueEntry { QueueEntryImpl next = nextNode(); - while(next != null && next.isDeleted()) + while (next != null && next.isDeleted()) { final QueueEntryImpl newNext = next.nextNode(); - if(newNext != null) + if (newNext != null) { - SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); + SimpleQueueEntryList._nextUpdater.compareAndSet(this, next, newNext); next = nextNode(); } else @@ -389,9 +538,13 @@ public class QueueEntryImpl implements QueueEntry { EntryState state = _state; - if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) + if (state != DELETED_STATE && _stateUpdater.compareAndSet(this, state, DELETED_STATE)) { - _queueEntryList.advanceHead(); + _queueEntryList.advanceHead(); + if (_backingStore != null && _hasBeenUnloaded) + { + _backingStore.delete(_messageId); + } return true; } else @@ -404,4 +557,5 @@ public class QueueEntryImpl implements QueueEntry { return _queueEntryList; } + } |