diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 55 |
1 files changed, 7 insertions, 48 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 0e3f7b2625..4890c00047 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1585,7 +1585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void deliverAsync() { - Runner runner = new Runner(_stateChangeCount.incrementAndGet()); + QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1604,52 +1604,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(flusher); } - - private class Runner implements ReadWriteRunnable - { - String _name; - public Runner(long count) - { - _name = "QueueRunner-" + count + "-" + _logActor; - } - - public void run() - { - String originalName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName(_name); - CurrentActor.set(_logActor); - - processQueue(this); - } - catch (AMQException e) - { - _logger.error(e); - } - finally - { - CurrentActor.remove(); - Thread.currentThread().setName(originalName); - } - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - - public String toString() - { - return _name; - } - } - public void flushSubscription(Subscription sub) throws AMQException { // Access control @@ -1834,7 +1788,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener * @param runner the Runner to schedule * @throws AMQException */ - private void processQueue(Runnable runner) throws AMQException + public void processQueue(QueueRunner runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; @@ -2289,4 +2243,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } } + + public LogActor getLogActor() + { + return _logActor; + } } |