diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 203 |
1 files changed, 174 insertions, 29 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 54404e23e7..6225501c72 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -27,8 +27,9 @@ import org.apache.log4j.Logger; import java.util.Set; import java.util.HashSet; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.CopyOnWriteArraySet; public class QueueEntryImpl implements QueueEntry @@ -39,42 +40,77 @@ public class QueueEntryImpl implements QueueEntry */ private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); - private final AMQQueue _queue; + private final SimpleQueueEntryList _queueEntryList; + private final AMQMessage _message; private Set<Subscription> _rejectedBy = null; - private final AtomicReference<Object> _owner = new AtomicReference<Object>(); - private final AtomicLong _entryId = new AtomicLong(); + private volatile EntryState _state = AVAILABLE_STATE; + + private static final + AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState> + _stateUpdater = + AtomicReferenceFieldUpdater.newUpdater + (QueueEntryImpl.class, EntryState.class, "_state"); + + + private volatile Set<StateChangeListener> _stateChangeListeners; + + private static final + AtomicReferenceFieldUpdater<QueueEntryImpl, Set> + _listenersUpdater = + AtomicReferenceFieldUpdater.newUpdater + (QueueEntryImpl.class, Set.class, "_stateChangeListeners"); + + private static final + AtomicLongFieldUpdater<QueueEntryImpl> + _entryIdUpdater = + AtomicLongFieldUpdater.newUpdater + (QueueEntryImpl.class, "_entryId"); - public QueueEntryImpl(AMQQueue queue, AMQMessage message, final long entryId) + + private volatile long _entryId; + + volatile QueueEntryImpl _next; + + + QueueEntryImpl(SimpleQueueEntryList queueEntryList) { - _queue = queue; + this(queueEntryList,null,Long.MIN_VALUE); + _state = DELETED_STATE; + } + + + public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId) + { + _queueEntryList = queueEntryList; _message = message; - _entryId.set(entryId); + + _entryIdUpdater.set(this, entryId); } - public QueueEntryImpl(AMQQueue queue, AMQMessage message) + public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message) { - _queue = queue; + _queueEntryList = queueEntryList; _message = message; } protected void setEntryId(long entryId) { - _entryId.set(entryId); + _entryIdUpdater.set(this, entryId); } protected long getEntryId() { - return _entryId.get(); + return _entryId; } public AMQQueue getQueue() { - return _queue; + return _queueEntryList.getQueue(); } public AMQMessage getMessage() @@ -94,23 +130,39 @@ public class QueueEntryImpl implements QueueEntry public boolean expired() throws AMQException { - return getMessage().expired(_queue); + return getMessage().expired(getQueue()); } public boolean isAcquired() { - return _owner.get() != null; + return _state.getState() == State.ACQUIRED; + } + + public boolean acquire() + { + return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE); + } + + private boolean acquire(final EntryState state) + { + boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state); + if(acquired && _stateChangeListeners != null) + { + notifyStateChange(State.AVAILABLE, State.ACQUIRED); + } + + return acquired; } public boolean acquire(Subscription sub) { - return !(_owner.compareAndSet(null, sub == null ? this : sub)); + return acquire(sub.getOwningState()); } public boolean acquiredBySubscription() { - Object owner = _owner.get(); - return (owner != null) && (owner != this); + + return (_state instanceof SubscriptionAcquiredState); } public void setDeliveredToSubscription() @@ -120,7 +172,7 @@ public class QueueEntryImpl implements QueueEntry public void release() { - _owner.set(null); + _stateUpdater.set(this,AVAILABLE_STATE); } public String debugIdentity() @@ -141,18 +193,16 @@ public class QueueEntryImpl implements QueueEntry public Subscription getDeliveredSubscription() { - synchronized (this) - { - Object owner = _owner.get(); - if (owner instanceof Subscription) + EntryState state = _state; + if (state instanceof SubscriptionAcquiredState) { - return (Subscription) owner; + return ((SubscriptionAcquiredState) state).getSubscription(); } else { return null; } - } + } public void reject() @@ -193,25 +243,44 @@ public class QueueEntryImpl implements QueueEntry public void requeue(final StoreContext storeContext) throws AMQException { - _queue.requeue(storeContext, this); + getQueue().requeue(storeContext, this); + if(_stateChangeListeners != null) + { + notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + } } public void dequeue(final StoreContext storeContext) throws FailedDequeueException { + + getQueue().dequeue(storeContext, this); + if(_stateChangeListeners != null) + { + notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED); + } + } + + private void notifyStateChange(final State oldState, final State newState) + { + for(StateChangeListener l : _stateChangeListeners) + { + l.stateChanged(this, oldState, newState); + } } public void dispose(final StoreContext storeContext) throws MessageCleanupException { getMessage().decrementReference(storeContext); + delete(); } public void restoreCredit() { - Object owner = _owner.get(); - if(owner instanceof Subscription) + EntryState state = _state; + if(state instanceof SubscriptionAcquiredState) { - Subscription s = (Subscription) owner; + Subscription s = ((SubscriptionAcquiredState) _state).getSubscription(); s.restoreCredit(this); } } @@ -232,9 +301,85 @@ public class QueueEntryImpl implements QueueEntry return getQueue().isDeleted(); } + public void addStateChangeListener(StateChangeListener listener) + { + Set<StateChangeListener> listeners = _stateChangeListeners; + if(listeners == null) + { + _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>()); + listeners = _stateChangeListeners; + } + + listeners.add(listener); + } + + public boolean removeStateChangeListener(StateChangeListener listener) + { + Set<StateChangeListener> listeners = _stateChangeListeners; + if(listeners != null) + { + return listeners.remove(listener); + } + + return false; + } + + public int compareTo(final QueueEntry o) { QueueEntryImpl other = (QueueEntryImpl)o; return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; } + + public QueueEntryImpl getNext() + { + + QueueEntryImpl next = nextNode(); + while(next != null && next.isDeleted()) + { + + final QueueEntryImpl newNext = next.nextNode(); + if(newNext != null) + { + SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); + next = nextNode(); + } + else + { + next = null; + } + + } + return next; + } + + QueueEntryImpl nextNode() + { + return _next; + } + + public boolean isDeleted() + { + return _state == DELETED_STATE; + } + + public boolean delete() + { + EntryState state = _state; + + if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) + { + _queueEntryList.advanceHead(); + return true; + } + else + { + return false; + } + } + + public QueueEntryList getQueueEntryList() + { + return _queueEntryList; + } } |