summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java36
1 files changed, 25 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 48f2efb342..8f3b7ae4ce 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -21,14 +21,16 @@ package org.apache.qpid.server.queue;
*/
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.transport.TransportException;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
class SubFlushRunner implements Runnable
@@ -67,18 +69,30 @@ class SubFlushRunner implements Runnable
}
catch (AMQException e)
{
- _logger.error(e);
+ _logger.error("Exception during asynchronous delivery by " + toString(), e);
}
- finally
+ catch (final TransportException transe)
{
- CurrentActor.remove();
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, transe);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + transe.getMessage());
+ }
}
- _scheduled.compareAndSet(RUNNING, IDLE);
- if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
+ finally
{
- if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ CurrentActor.remove();
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
{
- getQueue().execute(this);
+ if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ {
+ getQueue().execute(this);
+ }
}
}
}