diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java | 89 |
1 files changed, 70 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 7e1d57e205..5270f9f740 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -27,6 +27,11 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.queue.QueueRunner; import org.apache.qpid.server.queue.SimpleAMQQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * QueueRunners are Runnables used to process a queue when requiring * asynchronous message delivery to subscriptions, which is necessary @@ -37,33 +42,64 @@ public class QueueRunner implements ReadWriteRunnable { private static final Logger _logger = Logger.getLogger(QueueRunner.class); - private final String _name; private final SimpleAMQQueue _queue; - public QueueRunner(SimpleAMQQueue queue, long count) + private static int IDLE = 0; + private static int SCHEDULED = 1; + private static int RUNNING = 2; + + + private final AtomicInteger _scheduled = new AtomicInteger(IDLE); + + private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; + private final AtomicBoolean _stateChange = new AtomicBoolean(); + + private final AtomicLong _lastRunAgain = new AtomicLong(); + private final AtomicLong _lastRunTime = new AtomicLong(); + + private long _runs; + private long _continues; + + public QueueRunner(SimpleAMQQueue queue) { _queue = queue; - _name = "QueueRunner-" + count + "-" + queue.getLogActor(); } + private int trouble = 0; + public void run() { - String originalName = Thread.currentThread().getName(); - try + if(_scheduled.compareAndSet(SCHEDULED,RUNNING)) { - Thread.currentThread().setName(_name); - CurrentActor.set(_queue.getLogActor()); + long runAgain = Long.MIN_VALUE; + _stateChange.set(false); + try + { + CurrentActor.set(_queue.getLogActor()); + + runAgain = _queue.processQueue(this); + } + catch (AMQException e) + { + _logger.error("Exception during asynchronous delivery by " + toString(), e); + } + finally + { + CurrentActor.remove(); + } + _scheduled.compareAndSet(RUNNING, IDLE); + long stateChangeCount = _queue.getStateChangeCount(); + _lastRunAgain.set(runAgain); + _lastRunTime.set(System.nanoTime()); + if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false)) + { + _continues++; + if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + { + _queue.execute(this); + } + } - _queue.processQueue(this); - } - catch (AMQException e) - { - _logger.error("Exception during asynchronous delivery by " + _name, e); - } - finally - { - CurrentActor.remove(); - Thread.currentThread().setName(originalName); } } @@ -79,6 +115,21 @@ public class QueueRunner implements ReadWriteRunnable public String toString() { - return _name; + return "QueueRunner-" + _queue.getLogActor(); } -}
\ No newline at end of file + + public void execute(Executor executor) + { + _stateChange.set(true); + if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + { + executor.execute(this); + } + } + + public boolean isIdle() + { + return _scheduled.get() == IDLE; + } + +} |