summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java25
1 files changed, 10 insertions, 15 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 461d493437..f3a9a9dcc7 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -31,6 +31,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import java.util.EnumMap;
import java.util.HashSet;
@@ -59,7 +61,7 @@ public abstract class QueueEntryImpl implements QueueEntry
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener> _stateChangeListeners;
+ private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -231,11 +233,6 @@ public abstract class QueueEntryImpl implements QueueEntry
if(state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
- Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
- if (subscription != null)
- {
- subscription.releaseQueueEntry(this);
- }
}
if(!getQueue().isDeleted())
@@ -320,8 +317,6 @@ public abstract class QueueEntryImpl implements QueueEntry
if (state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
- s = ((SubscriptionAcquiredState) state).getSubscription();
- s.onDequeue(this);
}
getQueue().dequeue(this,s);
@@ -336,7 +331,7 @@ public abstract class QueueEntryImpl implements QueueEntry
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener l : _stateChangeListeners)
+ for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -367,7 +362,7 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
+ public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -412,21 +407,21 @@ public abstract class QueueEntryImpl implements QueueEntry
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);