diff options
author | Robert Gemmell <robbie@apache.org> | 2011-10-13 11:50:25 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-10-13 11:50:25 +0000 |
commit | ade995ffaa85492b0ca22141d86316acdcc879e8 (patch) | |
tree | 92def10ca3f32a99b0f8d6247d759a95a1b6cde6 | |
parent | 53d85ef31d5ebfeba7e157342334dbfcc94e21f2 (diff) | |
download | qpid-python-ade995ffaa85492b0ca22141d86316acdcc879e8.tar.gz |
QPID-3546: update the highestDeliveryTag marker during failover to prevent the stale value being used to set the rollback mark on the first rollback after failover.
This commit only fixes the 0-10 client path, as fixing this on the 0-8/9/9-1 path currently would cause undesirable interaction with the issue in QPID 3521.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1182793 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 40 insertions, 6 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 44c4e8987a..d34290e007 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -308,7 +308,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected final FlowControllingBlockingQueue _queue; /** Holds the highest received delivery tag. */ - private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); /** All the not yet acknowledged message tags */ @@ -856,6 +856,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic //Check that we are clean to commit. if (_failedOverDirty) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back."); + } rollback(); throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." + @@ -1814,9 +1818,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic suspendChannel(true); } - // Let the dispatcher know that all the incomming messages - // should be rolled back(reject/release) - _rollbackMark.set(_highestDeliveryTag.get()); + setRollbackMark(); syncDispatchQueue(); @@ -3202,7 +3204,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic setConnectionStopped(true); } - _rollbackMark.set(_highestDeliveryTag.get()); + setRollbackMark(); _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); @@ -3351,6 +3353,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get())) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting message because delivery tag " + deliveryTag + + " <= rollback mark " + _rollbackMark.get()); + } rejectMessage(message, true); } else if (_usingDispatcherForCleanup) @@ -3412,6 +3419,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Don't reject if we're already closing if (!_closed.get()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag() + + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag)); + } rejectMessage(message, true); } } @@ -3542,4 +3554,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()); } + + private void setRollbackMark() + { + // Let the dispatcher know that all the incomming messages + // should be rolled back(reject/release) + _rollbackMark.set(_highestDeliveryTag.get()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Rollback mark is set to " + _rollbackMark.get()); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 86e1fc08de..3812e612aa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1378,4 +1378,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().sync(); } } + + @Override + void resubscribe() throws AMQException + { + // Also reset the delivery tag tracker, to insure we dont + // return the first <total number of msgs received on session> + // messages sent by the brokers following the first rollback + // after failover + _highestDeliveryTag.set(-1); + super.resubscribe(); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java index 13a9dd73b8..107c730a7e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java @@ -94,7 +94,7 @@ public class CancelTest extends QpidBrokerTestCase browser.close(); MessageConsumer consumer = _clientSession.createConsumer(_queue); - assertNotNull( consumer.receive() ); + assertNotNull( consumer.receive(2000l) ); consumer.close(); } } |