summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java85
1 files changed, 20 insertions, 65 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 87d11a892e..5f79498beb 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -61,9 +61,13 @@ import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+public class SimpleAMQQueue implements AMQQueue,
+ StateChangeListener<Subscription, Subscription.State>,
+ MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -121,10 +125,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
private final AtomicInteger _consumerCountHigh = new AtomicInteger(0);
- private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
- private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
private final AtomicLong _unackedMsgBytes = new AtomicLong();
@@ -165,7 +165,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
private LogSubject _logSubject;
@@ -451,7 +451,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (isDeleted())
{
- subscription.queueDeleted(this);
+ subscription.queueDeleted();
}
}
else
@@ -505,7 +505,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted(this);
+ subscription.queueDeleted();
}
}
@@ -622,18 +622,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
enqueue(message, null);
}
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
{
- enqueue(message, false, action);
- }
-
- public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
- {
-
- if(transactional)
- {
- incrementTxnEnqueueStats(message);
- }
incrementQueueCount();
incrementQueueSize(message);
@@ -715,7 +705,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if(action != null)
{
- action.onEnqueue(entry);
+ action.performAction(entry);
}
}
@@ -810,18 +800,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
getAtomicQueueCount().incrementAndGet();
}
- private void incrementTxnEnqueueStats(final ServerMessage message)
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
-
- private void incrementTxnDequeueStats(QueueEntry entry)
- {
- _msgTxnDequeues.incrementAndGet();
- _byteTxnDequeues.addAndGet(entry.getSize());
- }
-
private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
throws AMQException
{
@@ -900,11 +878,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_deliveredMessages.decrementAndGet();
}
- if(sub != null && sub.isSessionTransactional())
- {
- incrementTxnDequeueStats(entry);
- }
-
checkCapacity();
}
@@ -1039,7 +1012,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(Subscription sub, Subscription.State oldState, Subscription.State newState)
{
if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
{
@@ -1300,12 +1273,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
});
}
- public void addQueueDeleteTask(final Task task)
+ public void addQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.add(task);
}
- public void removeQueueDeleteTask(final Task task)
+ public void removeQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.remove(task);
}
@@ -1322,7 +1295,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if (!_deleted.getAndSet(true))
{
- for (Binding b : _bindings)
+ final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+
+ for (Binding b : bindingCopy)
{
b.getExchange().removeBinding(b);
}
@@ -1334,7 +1309,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
Subscription s = subscriptionIter.getNode().getSubscription();
if (s != null)
{
- s.queueDeleted(this);
+ s.queueDeleted();
}
}
@@ -1375,9 +1350,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- for (Task task : _deleteTaskList)
+ for (Action<AMQQueue> task : _deleteTaskList)
{
- task.doTask(this);
+ task.performAction(this);
}
_deleteTaskList.clear();
@@ -1984,7 +1959,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _notificationChecks;
}
- private final class QueueEntryListener implements QueueEntry.StateChangeListener
+ private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
{
private final Subscription _sub;
@@ -2076,26 +2051,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _dequeueSize.get();
}
- public long getByteTxnEnqueues()
- {
- return _byteTxnEnqueues.get();
- }
-
- public long getByteTxnDequeues()
- {
- return _byteTxnDequeues.get();
- }
-
- public long getMsgTxnEnqueues()
- {
- return _msgTxnEnqueues.get();
- }
-
- public long getMsgTxnDequeues()
- {
- return _msgTxnDequeues.get();
- }
-
public long getPersistentByteEnqueues()
{
return _persistentMessageEnqueueSize.get();