diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 25 |
1 files changed, 10 insertions, 15 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 461d493437..f3a9a9dcc7 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -31,6 +31,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; import java.util.EnumMap; import java.util.HashSet; @@ -59,7 +61,7 @@ public abstract class QueueEntryImpl implements QueueEntry (QueueEntryImpl.class, EntryState.class, "_state"); - private volatile Set<StateChangeListener> _stateChangeListeners; + private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners; private static final AtomicReferenceFieldUpdater<QueueEntryImpl, Set> @@ -231,11 +233,6 @@ public abstract class QueueEntryImpl implements QueueEntry if(state instanceof SubscriptionAcquiredState) { getQueue().decrementUnackedMsgCount(this); - Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription(); - if (subscription != null) - { - subscription.releaseQueueEntry(this); - } } if(!getQueue().isDeleted()) @@ -320,8 +317,6 @@ public abstract class QueueEntryImpl implements QueueEntry if (state instanceof SubscriptionAcquiredState) { getQueue().decrementUnackedMsgCount(this); - s = ((SubscriptionAcquiredState) state).getSubscription(); - s.onDequeue(this); } getQueue().dequeue(this,s); @@ -336,7 +331,7 @@ public abstract class QueueEntryImpl implements QueueEntry private void notifyStateChange(final State oldState, final State newState) { - for(StateChangeListener l : _stateChangeListeners) + for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners) { l.stateChanged(this, oldState, newState); } @@ -367,7 +362,7 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn) + public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); @@ -412,21 +407,21 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue().isDeleted(); } - public void addStateChangeListener(StateChangeListener listener) + public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener) { - Set<StateChangeListener> listeners = _stateChangeListeners; + Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners; if(listeners == null) { - _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>()); + _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>()); listeners = _stateChangeListeners; } listeners.add(listener); } - public boolean removeStateChangeListener(StateChangeListener listener) + public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener) { - Set<StateChangeListener> listeners = _stateChangeListeners; + Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners; if(listeners != null) { return listeners.remove(listener); |