summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java89
1 files changed, 70 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index 7e1d57e205..5270f9f740 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -27,6 +27,11 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.queue.QueueRunner;
import org.apache.qpid.server.queue.SimpleAMQQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* QueueRunners are Runnables used to process a queue when requiring
* asynchronous message delivery to subscriptions, which is necessary
@@ -37,33 +42,64 @@ public class QueueRunner implements ReadWriteRunnable
{
private static final Logger _logger = Logger.getLogger(QueueRunner.class);
- private final String _name;
private final SimpleAMQQueue _queue;
- public QueueRunner(SimpleAMQQueue queue, long count)
+ private static int IDLE = 0;
+ private static int SCHEDULED = 1;
+ private static int RUNNING = 2;
+
+
+ private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
+
+ private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+ private final AtomicBoolean _stateChange = new AtomicBoolean();
+
+ private final AtomicLong _lastRunAgain = new AtomicLong();
+ private final AtomicLong _lastRunTime = new AtomicLong();
+
+ private long _runs;
+ private long _continues;
+
+ public QueueRunner(SimpleAMQQueue queue)
{
_queue = queue;
- _name = "QueueRunner-" + count + "-" + queue.getLogActor();
}
+ private int trouble = 0;
+
public void run()
{
- String originalName = Thread.currentThread().getName();
- try
+ if(_scheduled.compareAndSet(SCHEDULED,RUNNING))
{
- Thread.currentThread().setName(_name);
- CurrentActor.set(_queue.getLogActor());
+ long runAgain = Long.MIN_VALUE;
+ _stateChange.set(false);
+ try
+ {
+ CurrentActor.set(_queue.getLogActor());
+
+ runAgain = _queue.processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Exception during asynchronous delivery by " + toString(), e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ long stateChangeCount = _queue.getStateChangeCount();
+ _lastRunAgain.set(runAgain);
+ _lastRunTime.set(System.nanoTime());
+ if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
+ {
+ _continues++;
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ _queue.execute(this);
+ }
+ }
- _queue.processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error("Exception during asynchronous delivery by " + _name, e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
}
}
@@ -79,6 +115,21 @@ public class QueueRunner implements ReadWriteRunnable
public String toString()
{
- return _name;
+ return "QueueRunner-" + _queue.getLogActor();
}
-} \ No newline at end of file
+
+ public void execute(Executor executor)
+ {
+ _stateChange.set(true);
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ executor.execute(this);
+ }
+ }
+
+ public boolean isIdle()
+ {
+ return _scheduled.get() == IDLE;
+ }
+
+}