summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-13 11:50:25 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-13 11:50:25 +0000
commitb3b701ae79d88c6b74e023bf07af5a3e1bc59d11 (patch)
tree54257935289a7e99ffc3e980657c8fe1cfcaff4d
parentf79e7bc16014fb017ea8f8955ac97edb06c2599e (diff)
downloadqpid-python-b3b701ae79d88c6b74e023bf07af5a3e1bc59d11.tar.gz
QPID-3546: update the highestDeliveryTag marker during failover to prevent the stale value being used to set the rollback mark on the first rollback after failover.
This commit only fixes the 0-10 client path, as fixing this on the 0-8/9/9-1 path currently would cause undesirable interaction with the issue in QPID 3521. Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1182793 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java11
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java2
3 files changed, 40 insertions, 6 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 44c4e8987a..d34290e007 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -308,7 +308,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected final FlowControllingBlockingQueue _queue;
/** Holds the highest received delivery tag. */
- private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
@@ -856,6 +856,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
//Check that we are clean to commit.
if (_failedOverDirty)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+ }
rollback();
throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
@@ -1814,9 +1818,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
suspendChannel(true);
}
- // Let the dispatcher know that all the incomming messages
- // should be rolled back(reject/release)
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
syncDispatchQueue();
@@ -3202,7 +3204,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
setConnectionStopped(true);
}
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -3351,6 +3353,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message because delivery tag " + deliveryTag
+ + " <= rollback mark " + _rollbackMark.get());
+ }
rejectMessage(message, true);
}
else if (_usingDispatcherForCleanup)
@@ -3412,6 +3419,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Don't reject if we're already closing
if (!_closed.get())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+ + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+ }
rejectMessage(message, true);
}
}
@@ -3542,4 +3554,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
}
+
+ private void setRollbackMark()
+ {
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 86e1fc08de..3812e612aa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1378,4 +1378,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().sync();
}
}
+
+ @Override
+ void resubscribe() throws AMQException
+ {
+ // Also reset the delivery tag tracker, to insure we dont
+ // return the first <total number of msgs received on session>
+ // messages sent by the brokers following the first rollback
+ // after failover
+ _highestDeliveryTag.set(-1);
+ super.resubscribe();
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
index 13a9dd73b8..107c730a7e 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
@@ -94,7 +94,7 @@ public class CancelTest extends QpidBrokerTestCase
browser.close();
MessageConsumer consumer = _clientSession.createConsumer(_queue);
- assertNotNull( consumer.receive() );
+ assertNotNull( consumer.receive(2000l) );
consumer.close();
}
}