diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 85 |
1 files changed, 20 insertions, 65 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 87d11a892e..5f79498beb 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -61,9 +61,13 @@ import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper +public class SimpleAMQQueue implements AMQQueue, + StateChangeListener<Subscription, Subscription.State>, + MessageGroupManager.SubscriptionResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -121,10 +125,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); private final AtomicInteger _consumerCountHigh = new AtomicInteger(0); - private final AtomicLong _msgTxnEnqueues = new AtomicLong(0); - private final AtomicLong _byteTxnEnqueues = new AtomicLong(0); - private final AtomicLong _msgTxnDequeues = new AtomicLong(0); - private final AtomicLong _byteTxnDequeues = new AtomicLong(0); private final AtomicLong _unackedMsgCount = new AtomicLong(0); private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0); private final AtomicLong _unackedMsgBytes = new AtomicLong(); @@ -165,7 +165,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>(); private LogSubject _logSubject; @@ -451,7 +451,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (isDeleted()) { - subscription.queueDeleted(this); + subscription.queueDeleted(); } } else @@ -505,7 +505,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(this); + subscription.queueDeleted(); } } @@ -622,18 +622,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes enqueue(message, null); } - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException { - enqueue(message, false, action); - } - - public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException - { - - if(transactional) - { - incrementTxnEnqueueStats(message); - } incrementQueueCount(); incrementQueueSize(message); @@ -715,7 +705,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if(action != null) { - action.onEnqueue(entry); + action.performAction(entry); } } @@ -810,18 +800,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes getAtomicQueueCount().incrementAndGet(); } - private void incrementTxnEnqueueStats(final ServerMessage message) - { - _msgTxnEnqueues.incrementAndGet(); - _byteTxnEnqueues.addAndGet(message.getSize()); - } - - private void incrementTxnDequeueStats(QueueEntry entry) - { - _msgTxnDequeues.incrementAndGet(); - _byteTxnDequeues.addAndGet(entry.getSize()); - } - private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch) throws AMQException { @@ -900,11 +878,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _deliveredMessages.decrementAndGet(); } - if(sub != null && sub.isSessionTransactional()) - { - incrementTxnDequeueStats(entry); - } - checkCapacity(); } @@ -1039,7 +1012,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) + public void stateChanged(Subscription sub, Subscription.State oldState, Subscription.State newState) { if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) { @@ -1300,12 +1273,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes }); } - public void addQueueDeleteTask(final Task task) + public void addQueueDeleteTask(final Action<AMQQueue> task) { _deleteTaskList.add(task); } - public void removeQueueDeleteTask(final Task task) + public void removeQueueDeleteTask(final Action<AMQQueue> task) { _deleteTaskList.remove(task); } @@ -1322,7 +1295,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if (!_deleted.getAndSet(true)) { - for (Binding b : _bindings) + final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings); + + for (Binding b : bindingCopy) { b.getExchange().removeBinding(b); } @@ -1334,7 +1309,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes Subscription s = subscriptionIter.getNode().getSubscription(); if (s != null) { - s.queueDeleted(this); + s.queueDeleted(); } } @@ -1375,9 +1350,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } - for (Task task : _deleteTaskList) + for (Action<AMQQueue> task : _deleteTaskList) { - task.doTask(this); + task.performAction(this); } _deleteTaskList.clear(); @@ -1984,7 +1959,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _notificationChecks; } - private final class QueueEntryListener implements QueueEntry.StateChangeListener + private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State> { private final Subscription _sub; @@ -2076,26 +2051,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes return _dequeueSize.get(); } - public long getByteTxnEnqueues() - { - return _byteTxnEnqueues.get(); - } - - public long getByteTxnDequeues() - { - return _byteTxnDequeues.get(); - } - - public long getMsgTxnEnqueues() - { - return _msgTxnEnqueues.get(); - } - - public long getMsgTxnDequeues() - { - return _msgTxnDequeues.get(); - } - public long getPersistentByteEnqueues() { return _persistentMessageEnqueueSize.get(); |