diff options
author | Tess Avitabile <tess.avitabile@mongodb.com> | 2019-03-06 15:41:28 -0500 |
---|---|---|
committer | Tess Avitabile <tess.avitabile@mongodb.com> | 2019-03-13 14:30:38 -0400 |
commit | 84916e817418b3b5627e80730effcd422c15696e (patch) | |
tree | 96e1685150fd897421007533499eea7423c62ec2 /src/mongo/db/repl/rs_rollback.cpp | |
parent | 035aa5ca9d7e6c4587368ab11c82ead405f6e047 (diff) | |
download | mongo-84916e817418b3b5627e80730effcd422c15696e.tar.gz |
SERVER-38925 When enableMajorityReadConcern=false, after rollback via refetch, do not take stable checkpoint until the local top of oplog from before rollback is reached
Diffstat (limited to 'src/mongo/db/repl/rs_rollback.cpp')
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 68 |
1 files changed, 60 insertions, 8 deletions
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index b9cabd2580f..91e4bef476c 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -885,6 +885,7 @@ Status _syncRollback(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked()); FixUpInfo how; + how.localTopOfOplog = replCoord->getMyLastAppliedOpTime(); log() << "Starting rollback. Sync source: " << rollbackSource.getSource() << rsLog; how.rbid = rollbackSource.getRollbackId(); uassert( @@ -1047,6 +1048,16 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, log() << "Finished refetching documents. Total size of documents refetched: " << goodVersions.size(); + // We must start taking unstable checkpoints before rolling back oplog entries. Otherwise, a + // stable checkpoint could include the fixup write (since it is untimestamped) but not the write + // being rolled back (if it is after the stable timestamp), leading to inconsistent state. An + // unstable checkpoint will include both writes. + if (!serverGlobalParams.enableMajorityReadConcern) { + log() << "Setting initialDataTimestamp to 0 so that we start taking unstable checkpoints."; + opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp( + Timestamp::kAllowUnstableCheckpointsSentinel); + } + log() << "Checking the RollbackID and updating the MinValid if necessary"; checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess); @@ -1404,19 +1415,38 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, log() << "Rollback deleted " << deletes << " documents and updated " << updates << " documents."; - // When majority read concern is disabled, the stable timestamp may be ahead of the common - // point. Force the stable timestamp back to the common point. if (!serverGlobalParams.enableMajorityReadConcern) { + // When majority read concern is disabled, the stable timestamp may be ahead of the common + // point. Force the stable timestamp back to the common point, to allow writes after the + // common point. const bool force = true; - log() << "Forcing the stable timestamp to " << fixUpInfo.commonPoint.getTimestamp(); + log() << "Forcing the stable timestamp to the common point: " + << fixUpInfo.commonPoint.getTimestamp(); opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp( fixUpInfo.commonPoint.getTimestamp(), boost::none, force); - // We must wait for a checkpoint before truncating oplog, so that if we crash after - // truncating oplog, we are guaranteed to recover from a checkpoint that includes all of the - // writes performed during the rollback. - log() << "Waiting for a stable checkpoint"; - opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(); + // We must not take a stable checkpoint until it is guaranteed to include all writes from + // before the rollback (i.e. the stable timestamp is at least the local top of oplog). In + // addition, we must not take a stable checkpoint until the stable timestamp reaches the + // sync source top of oplog (minValid), since we must not take a stable checkpoint until we + // are in a consistent state. We control this by seting the initialDataTimestamp to the + // maximum of these two values. No checkpoints are taken until stable timestamp >= + // initialDataTimestamp. + auto syncSourceTopOfOplog = OpTime::parseFromOplogEntry(rollbackSource.getLastOperation()) + .getValue() + .getTimestamp(); + log() << "Setting initialDataTimestamp to the max of local top of oplog and sync source " + "top of oplog. Local top of oplog: " + << fixUpInfo.localTopOfOplog.getTimestamp() + << ", sync source top of oplog: " << syncSourceTopOfOplog; + opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp( + std::max(fixUpInfo.localTopOfOplog.getTimestamp(), syncSourceTopOfOplog)); + + // Take an unstable checkpoint to ensure that all of the writes performed during rollback + // are persisted to disk before truncating oplog. + log() << "Waiting for an unstable checkpoint"; + const bool stableCheckpoint = false; + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint); } log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString() << " (" @@ -1439,6 +1469,28 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false); } + if (!serverGlobalParams.enableMajorityReadConcern) { + // If the server crashes and restarts before a stable checkpoint is taken, it will restart + // from the unstable checkpoint taken at the end of rollback. To ensure replication recovery + // replays all oplog after the common point, we set the appliedThrough to the common point. + // This is done using an untimestamped write, since timestamping the write with the common + // point TS would be incorrect (since this is equal to the stable timestamp), and this write + // will be included in the unstable checkpoint regardless of its timestamp. + log() << "Setting appliedThrough to the common point: " << fixUpInfo.commonPoint; + const bool setTimestamp = false; + replicationProcess->getConsistencyMarkers()->setAppliedThrough( + opCtx, fixUpInfo.commonPoint, setTimestamp); + + // Take an unstable checkpoint to ensure the appliedThrough write is persisted to disk. + log() << "Waiting for an unstable checkpoint"; + const bool stableCheckpoint = false; + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint); + + // Ensure that appliedThrough is unset in the next stable checkpoint. + log() << "Clearing appliedThrough"; + replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp()); + } + Status status = AuthorizationManager::get(opCtx->getServiceContext())->initialize(opCtx); if (!status.isOK()) { severe() << "Failed to reinitialize auth data after rollback: " << redact(status); |