diff options
author | Robert Gemmell <robbie@apache.org> | 2011-10-06 16:41:01 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-10-06 16:41:01 +0000 |
commit | d81c553c1f653ae9c536d8f926c231038b4b3532 (patch) | |
tree | f02c9a02f65d2eb887d4d84edb4b64d6d504e7de | |
parent | 371bf976678df3ffba6ebc14a7ff0ed676097ce9 (diff) | |
download | qpid-python-d81c553c1f653ae9c536d8f926c231038b4b3532.tar.gz |
QPID-3525: stop the client from accidentally consuming the messages it is actually trying to recover when using 0-8/9/9-1. Also stops it acking the message from the in-progress onMessage delivery after Session.recover() was called.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179699 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 36 | ||||
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java | 9 |
2 files changed, 28 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a477832892..30c7403a90 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -362,7 +362,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Set when recover is called. This is to handle the case where recover() is called by application code during * onMessage() processing to ensure that an auto ack is not sent. */ - private boolean _inRecovery; + private volatile boolean _sessionInRecovery; + + /** + * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of + * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). + */ + private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; @@ -1703,8 +1709,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // flush any acks we are holding in the buffer. flushAcknowledgments(); - // this is set only here, and the before the consumer's onMessage is called it is set to false - _inRecovery = true; + // this is only set true here, and only set false when the consumers preDeliver method is called + _sessionInRecovery = true; boolean isSuspended = isSuspended(); @@ -1712,9 +1718,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { suspendChannel(true); } - + + // Set to true to short circuit delivery of anything currently + //in the pre-dispatch queue. + _usingDispatcherForCleanup = true; + syncDispatchQueue(); - + + // Set to false before sending the recover as 0-8/9/9-1 will + //send messages back before the recover completes, and we + //probably shouldn't clean those! ;-) + _usingDispatcherForCleanup = false; + if (_dispatcher != null) { _dispatcher.recover(); @@ -1723,10 +1738,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic sendRecover(); markClean(); - - // Set inRecovery to false before you start message flow again again. - _inRecovery = false; - + if (!isSuspended) { suspendChannel(false); @@ -2126,7 +2138,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic boolean isInRecovery() { - return _inRecovery; + return _sessionInRecovery; } boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException @@ -2248,7 +2260,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void setInRecovery(boolean inRecovery) { - _inRecovery = inRecovery; + _sessionInRecovery = inRecovery; } boolean isStarted() @@ -3325,7 +3337,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { rejectMessage(message, true); } - else if (isInRecovery()) + else if (_usingDispatcherForCleanup) { _unacknowledgedMessageTags.add(deliveryTag); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 5e7ba5482d..66ca1d8345 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -46,7 +46,7 @@ public class RecoverTest extends FailoverBaseCase { static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); - private Exception _error; + private volatile Exception _error; private AtomicInteger count; protected AMQConnection _connection; @@ -249,14 +249,13 @@ public class RecoverTest extends FailoverBaseCase { if (!message.getJMSRedelivered()) { - setError( - new Exception("Message not marked as redelivered on what should be second delivery attempt")); + setError(new Exception("Message not marked as redelivered on what should be second delivery attempt")); } } else { - System.err.println(message); - fail("Message delivered too many times!: " + count); + _logger.error(message.toString()); + setError(new Exception("Message delivered too many times!: " + count)); } } catch (JMSException e) |