summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java203
1 files changed, 174 insertions, 29 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 54404e23e7..6225501c72 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -27,8 +27,9 @@ import org.apache.log4j.Logger;
import java.util.Set;
import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.CopyOnWriteArraySet;
public class QueueEntryImpl implements QueueEntry
@@ -39,42 +40,77 @@ public class QueueEntryImpl implements QueueEntry
*/
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final AMQQueue _queue;
+ private final SimpleQueueEntryList _queueEntryList;
+
private final AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
- private final AtomicReference<Object> _owner = new AtomicReference<Object>();
- private final AtomicLong _entryId = new AtomicLong();
+ private volatile EntryState _state = AVAILABLE_STATE;
+
+ private static final
+ AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
+ _stateUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, EntryState.class, "_state");
+
+
+ private volatile Set<StateChangeListener> _stateChangeListeners;
+
+ private static final
+ AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+ _listenersUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
+
+ private static final
+ AtomicLongFieldUpdater<QueueEntryImpl>
+ _entryIdUpdater =
+ AtomicLongFieldUpdater.newUpdater
+ (QueueEntryImpl.class, "_entryId");
- public QueueEntryImpl(AMQQueue queue, AMQMessage message, final long entryId)
+
+ private volatile long _entryId;
+
+ volatile QueueEntryImpl _next;
+
+
+ QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
- _queue = queue;
+ this(queueEntryList,null,Long.MIN_VALUE);
+ _state = DELETED_STATE;
+ }
+
+
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+ {
+ _queueEntryList = queueEntryList;
_message = message;
- _entryId.set(entryId);
+
+ _entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(AMQQueue queue, AMQMessage message)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
{
- _queue = queue;
+ _queueEntryList = queueEntryList;
_message = message;
}
protected void setEntryId(long entryId)
{
- _entryId.set(entryId);
+ _entryIdUpdater.set(this, entryId);
}
protected long getEntryId()
{
- return _entryId.get();
+ return _entryId;
}
public AMQQueue getQueue()
{
- return _queue;
+ return _queueEntryList.getQueue();
}
public AMQMessage getMessage()
@@ -94,23 +130,39 @@ public class QueueEntryImpl implements QueueEntry
public boolean expired() throws AMQException
{
- return getMessage().expired(_queue);
+ return getMessage().expired(getQueue());
}
public boolean isAcquired()
{
- return _owner.get() != null;
+ return _state.getState() == State.ACQUIRED;
+ }
+
+ public boolean acquire()
+ {
+ return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
+ }
+
+ private boolean acquire(final EntryState state)
+ {
+ boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
+ if(acquired && _stateChangeListeners != null)
+ {
+ notifyStateChange(State.AVAILABLE, State.ACQUIRED);
+ }
+
+ return acquired;
}
public boolean acquire(Subscription sub)
{
- return !(_owner.compareAndSet(null, sub == null ? this : sub));
+ return acquire(sub.getOwningState());
}
public boolean acquiredBySubscription()
{
- Object owner = _owner.get();
- return (owner != null) && (owner != this);
+
+ return (_state instanceof SubscriptionAcquiredState);
}
public void setDeliveredToSubscription()
@@ -120,7 +172,7 @@ public class QueueEntryImpl implements QueueEntry
public void release()
{
- _owner.set(null);
+ _stateUpdater.set(this,AVAILABLE_STATE);
}
public String debugIdentity()
@@ -141,18 +193,16 @@ public class QueueEntryImpl implements QueueEntry
public Subscription getDeliveredSubscription()
{
- synchronized (this)
- {
- Object owner = _owner.get();
- if (owner instanceof Subscription)
+ EntryState state = _state;
+ if (state instanceof SubscriptionAcquiredState)
{
- return (Subscription) owner;
+ return ((SubscriptionAcquiredState) state).getSubscription();
}
else
{
return null;
}
- }
+
}
public void reject()
@@ -193,25 +243,44 @@ public class QueueEntryImpl implements QueueEntry
public void requeue(final StoreContext storeContext) throws AMQException
{
- _queue.requeue(storeContext, this);
+ getQueue().requeue(storeContext, this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
}
public void dequeue(final StoreContext storeContext) throws FailedDequeueException
{
+
+
getQueue().dequeue(storeContext, this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED);
+ }
+ }
+
+ private void notifyStateChange(final State oldState, final State newState)
+ {
+ for(StateChangeListener l : _stateChangeListeners)
+ {
+ l.stateChanged(this, oldState, newState);
+ }
}
public void dispose(final StoreContext storeContext) throws MessageCleanupException
{
getMessage().decrementReference(storeContext);
+ delete();
}
public void restoreCredit()
{
- Object owner = _owner.get();
- if(owner instanceof Subscription)
+ EntryState state = _state;
+ if(state instanceof SubscriptionAcquiredState)
{
- Subscription s = (Subscription) owner;
+ Subscription s = ((SubscriptionAcquiredState) _state).getSubscription();
s.restoreCredit(this);
}
}
@@ -232,9 +301,85 @@ public class QueueEntryImpl implements QueueEntry
return getQueue().isDeleted();
}
+ public void addStateChangeListener(StateChangeListener listener)
+ {
+ Set<StateChangeListener> listeners = _stateChangeListeners;
+ if(listeners == null)
+ {
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+ listeners = _stateChangeListeners;
+ }
+
+ listeners.add(listener);
+ }
+
+ public boolean removeStateChangeListener(StateChangeListener listener)
+ {
+ Set<StateChangeListener> listeners = _stateChangeListeners;
+ if(listeners != null)
+ {
+ return listeners.remove(listener);
+ }
+
+ return false;
+ }
+
+
public int compareTo(final QueueEntry o)
{
QueueEntryImpl other = (QueueEntryImpl)o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
+
+ public QueueEntryImpl getNext()
+ {
+
+ QueueEntryImpl next = nextNode();
+ while(next != null && next.isDeleted())
+ {
+
+ final QueueEntryImpl newNext = next.nextNode();
+ if(newNext != null)
+ {
+ SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ next = nextNode();
+ }
+ else
+ {
+ next = null;
+ }
+
+ }
+ return next;
+ }
+
+ QueueEntryImpl nextNode()
+ {
+ return _next;
+ }
+
+ public boolean isDeleted()
+ {
+ return _state == DELETED_STATE;
+ }
+
+ public boolean delete()
+ {
+ EntryState state = _state;
+
+ if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
+ {
+ _queueEntryList.advanceHead();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntryList getQueueEntryList()
+ {
+ return _queueEntryList;
+ }
}