summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java47
1 files changed, 27 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index 0d44fe7cf3..22a2029494 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -20,17 +20,16 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.queue.QueueRunner;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.transport.TransportException;
+
/**
* QueueRunners are Runnables used to process a queue when requiring
* asynchronous message delivery to subscriptions, which is necessary
@@ -47,7 +46,6 @@ public class QueueRunner implements Runnable
private static int SCHEDULED = 1;
private static int RUNNING = 2;
-
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
private final AtomicBoolean _stateChange = new AtomicBoolean();
@@ -55,8 +53,6 @@ public class QueueRunner implements Runnable
private final AtomicLong _lastRunAgain = new AtomicLong();
private final AtomicLong _lastRunTime = new AtomicLong();
- private long _continues;
-
public QueueRunner(SimpleAMQQueue queue)
{
_queue = queue;
@@ -74,24 +70,35 @@ public class QueueRunner implements Runnable
runAgain = _queue.processQueue(this);
}
- catch (AMQException e)
+ catch (final AMQException e)
{
_logger.error("Exception during asynchronous delivery by " + toString(), e);
}
- finally
+ catch (final TransportException transe)
{
- CurrentActor.remove();
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, transe);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + transe.getMessage());
+ }
}
- _scheduled.compareAndSet(RUNNING, IDLE);
- long stateChangeCount = _queue.getStateChangeCount();
- _lastRunAgain.set(runAgain);
- _lastRunTime.set(System.nanoTime());
- if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
+ finally
{
- _continues++;
- if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ CurrentActor.remove();
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ final long stateChangeCount = _queue.getStateChangeCount();
+ _lastRunAgain.set(runAgain);
+ _lastRunTime.set(System.nanoTime());
+ if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
{
- _queue.execute(this);
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ _queue.execute(this);
+ }
}
}