diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java | 47 |
1 files changed, 27 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 0d44fe7cf3..22a2029494 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -20,17 +20,16 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.queue.QueueRunner; -import org.apache.qpid.server.queue.SimpleAMQQueue; - import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.transport.TransportException; + /** * QueueRunners are Runnables used to process a queue when requiring * asynchronous message delivery to subscriptions, which is necessary @@ -47,7 +46,6 @@ public class QueueRunner implements Runnable private static int SCHEDULED = 1; private static int RUNNING = 2; - private final AtomicInteger _scheduled = new AtomicInteger(IDLE); private final AtomicBoolean _stateChange = new AtomicBoolean(); @@ -55,8 +53,6 @@ public class QueueRunner implements Runnable private final AtomicLong _lastRunAgain = new AtomicLong(); private final AtomicLong _lastRunTime = new AtomicLong(); - private long _continues; - public QueueRunner(SimpleAMQQueue queue) { _queue = queue; @@ -74,24 +70,35 @@ public class QueueRunner implements Runnable runAgain = _queue.processQueue(this); } - catch (AMQException e) + catch (final AMQException e) { _logger.error("Exception during asynchronous delivery by " + toString(), e); } - finally + catch (final TransportException transe) { - CurrentActor.remove(); + final String errorMessage = "Problem during asynchronous delivery by " + toString(); + if(_logger.isDebugEnabled()) + { + _logger.debug(errorMessage, transe); + } + else + { + _logger.info(errorMessage + ' ' + transe.getMessage()); + } } - _scheduled.compareAndSet(RUNNING, IDLE); - long stateChangeCount = _queue.getStateChangeCount(); - _lastRunAgain.set(runAgain); - _lastRunTime.set(System.nanoTime()); - if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false)) + finally { - _continues++; - if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + CurrentActor.remove(); + _scheduled.compareAndSet(RUNNING, IDLE); + final long stateChangeCount = _queue.getStateChangeCount(); + _lastRunAgain.set(runAgain); + _lastRunTime.set(System.nanoTime()); + if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false)) { - _queue.execute(this); + if(_scheduled.compareAndSet(IDLE, SCHEDULED)) + { + _queue.execute(this); + } } } |