summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java56
1 files changed, 7 insertions, 49 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 4d0bef90ae..bcb0c12420 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1230,10 +1230,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
-
public void deliverAsync()
{
- Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+ QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1246,52 +1245,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(new SubFlushRunner(sub));
}
-
- private class Runner implements ReadWriteRunnable
- {
- String _name;
- public Runner(long count)
- {
- _name = "QueueRunner-" + count + "-" + _logActor;
- }
-
- public void run()
- {
- String originalName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(_name);
- CurrentActor.set(_logActor);
-
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
- }
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
- public String toString()
- {
- return _name;
- }
- }
-
private class SubFlushRunner implements ReadWriteRunnable
{
private final Subscription _sub;
@@ -1529,7 +1482,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
* @param runner the Runner to schedule
* @throws AMQException
*/
- private void processQueue(Runnable runner) throws AMQException
+ public void processQueue(QueueRunner runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
@@ -1897,4 +1850,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
}
}
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
}