summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-02-09 14:31:33 +0000
committerKeith Wall <kwall@apache.org>2012-02-09 14:31:33 +0000
commitef7ba8bbf8dbfaa52ed36a184018040856747672 (patch)
tree1f91ec58f872de6d50005017f604ebda8b832979
parent2a4198a70b0cbdd86d51dddc9a908d25ef55e9bd (diff)
downloadqpid-python-ef7ba8bbf8dbfaa52ed36a184018040856747672.tar.gz
QPID-3821: Uncaught exception thrown in QueueRunner.run() could cause QueueRunner to remain stuck in RUNNING state permanently
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242339 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java24
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java27
2 files changed, 30 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index e9d4290174..22a2029494 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -46,7 +46,6 @@ public class QueueRunner implements Runnable
private static int SCHEDULED = 1;
private static int RUNNING = 2;
-
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
private final AtomicBoolean _stateChange = new AtomicBoolean();
@@ -54,8 +53,6 @@ public class QueueRunner implements Runnable
private final AtomicLong _lastRunAgain = new AtomicLong();
private final AtomicLong _lastRunTime = new AtomicLong();
- private long _continues;
-
public QueueRunner(SimpleAMQQueue queue)
{
_queue = queue;
@@ -86,23 +83,22 @@ public class QueueRunner implements Runnable
}
else
{
- _logger.info(errorMessage + transe.getMessage());
+ _logger.info(errorMessage + ' ' + transe.getMessage());
}
}
finally
{
CurrentActor.remove();
- }
- _scheduled.compareAndSet(RUNNING, IDLE);
- long stateChangeCount = _queue.getStateChangeCount();
- _lastRunAgain.set(runAgain);
- _lastRunTime.set(System.nanoTime());
- if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
- {
- _continues++;
- if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ final long stateChangeCount = _queue.getStateChangeCount();
+ _lastRunAgain.set(runAgain);
+ _lastRunTime.set(System.nanoTime());
+ if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
{
- _queue.execute(this);
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ _queue.execute(this);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 857625bd65..8f3b7ae4ce 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.transport.TransportException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,18 +69,30 @@ class SubFlushRunner implements Runnable
}
catch (AMQException e)
{
- _logger.error(e);
+ _logger.error("Exception during asynchronous delivery by " + toString(), e);
}
- finally
+ catch (final TransportException transe)
{
- CurrentActor.remove();
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, transe);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + transe.getMessage());
+ }
}
- _scheduled.compareAndSet(RUNNING, IDLE);
- if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
+ finally
{
- if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ CurrentActor.remove();
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
{
- getQueue().execute(this);
+ if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ {
+ getQueue().execute(this);
+ }
}
}
}