summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/rs_rollback.cpp
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2019-03-06 15:41:28 -0500
committerTess Avitabile <tess.avitabile@mongodb.com>2019-03-13 14:30:38 -0400
commit84916e817418b3b5627e80730effcd422c15696e (patch)
tree96e1685150fd897421007533499eea7423c62ec2 /src/mongo/db/repl/rs_rollback.cpp
parent035aa5ca9d7e6c4587368ab11c82ead405f6e047 (diff)
downloadmongo-84916e817418b3b5627e80730effcd422c15696e.tar.gz
SERVER-38925 When enableMajorityReadConcern=false, after rollback via refetch, do not take stable checkpoint until the local top of oplog from before rollback is reached
Diffstat (limited to 'src/mongo/db/repl/rs_rollback.cpp')
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp68
1 files changed, 60 insertions, 8 deletions
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index b9cabd2580f..91e4bef476c 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -885,6 +885,7 @@ Status _syncRollback(OperationContext* opCtx,
invariant(!opCtx->lockState()->isLocked());
FixUpInfo how;
+ how.localTopOfOplog = replCoord->getMyLastAppliedOpTime();
log() << "Starting rollback. Sync source: " << rollbackSource.getSource() << rsLog;
how.rbid = rollbackSource.getRollbackId();
uassert(
@@ -1047,6 +1048,16 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
log() << "Finished refetching documents. Total size of documents refetched: "
<< goodVersions.size();
+ // We must start taking unstable checkpoints before rolling back oplog entries. Otherwise, a
+ // stable checkpoint could include the fixup write (since it is untimestamped) but not the write
+ // being rolled back (if it is after the stable timestamp), leading to inconsistent state. An
+ // unstable checkpoint will include both writes.
+ if (!serverGlobalParams.enableMajorityReadConcern) {
+ log() << "Setting initialDataTimestamp to 0 so that we start taking unstable checkpoints.";
+ opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp(
+ Timestamp::kAllowUnstableCheckpointsSentinel);
+ }
+
log() << "Checking the RollbackID and updating the MinValid if necessary";
checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess);
@@ -1404,19 +1415,38 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
log() << "Rollback deleted " << deletes << " documents and updated " << updates
<< " documents.";
- // When majority read concern is disabled, the stable timestamp may be ahead of the common
- // point. Force the stable timestamp back to the common point.
if (!serverGlobalParams.enableMajorityReadConcern) {
+ // When majority read concern is disabled, the stable timestamp may be ahead of the common
+ // point. Force the stable timestamp back to the common point, to allow writes after the
+ // common point.
const bool force = true;
- log() << "Forcing the stable timestamp to " << fixUpInfo.commonPoint.getTimestamp();
+ log() << "Forcing the stable timestamp to the common point: "
+ << fixUpInfo.commonPoint.getTimestamp();
opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp(
fixUpInfo.commonPoint.getTimestamp(), boost::none, force);
- // We must wait for a checkpoint before truncating oplog, so that if we crash after
- // truncating oplog, we are guaranteed to recover from a checkpoint that includes all of the
- // writes performed during the rollback.
- log() << "Waiting for a stable checkpoint";
- opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable();
+ // We must not take a stable checkpoint until it is guaranteed to include all writes from
+ // before the rollback (i.e. the stable timestamp is at least the local top of oplog). In
+ // addition, we must not take a stable checkpoint until the stable timestamp reaches the
+ // sync source top of oplog (minValid), since we must not take a stable checkpoint until we
+ // are in a consistent state. We control this by seting the initialDataTimestamp to the
+ // maximum of these two values. No checkpoints are taken until stable timestamp >=
+ // initialDataTimestamp.
+ auto syncSourceTopOfOplog = OpTime::parseFromOplogEntry(rollbackSource.getLastOperation())
+ .getValue()
+ .getTimestamp();
+ log() << "Setting initialDataTimestamp to the max of local top of oplog and sync source "
+ "top of oplog. Local top of oplog: "
+ << fixUpInfo.localTopOfOplog.getTimestamp()
+ << ", sync source top of oplog: " << syncSourceTopOfOplog;
+ opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp(
+ std::max(fixUpInfo.localTopOfOplog.getTimestamp(), syncSourceTopOfOplog));
+
+ // Take an unstable checkpoint to ensure that all of the writes performed during rollback
+ // are persisted to disk before truncating oplog.
+ log() << "Waiting for an unstable checkpoint";
+ const bool stableCheckpoint = false;
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint);
}
log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString() << " ("
@@ -1439,6 +1469,28 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false);
}
+ if (!serverGlobalParams.enableMajorityReadConcern) {
+ // If the server crashes and restarts before a stable checkpoint is taken, it will restart
+ // from the unstable checkpoint taken at the end of rollback. To ensure replication recovery
+ // replays all oplog after the common point, we set the appliedThrough to the common point.
+ // This is done using an untimestamped write, since timestamping the write with the common
+ // point TS would be incorrect (since this is equal to the stable timestamp), and this write
+ // will be included in the unstable checkpoint regardless of its timestamp.
+ log() << "Setting appliedThrough to the common point: " << fixUpInfo.commonPoint;
+ const bool setTimestamp = false;
+ replicationProcess->getConsistencyMarkers()->setAppliedThrough(
+ opCtx, fixUpInfo.commonPoint, setTimestamp);
+
+ // Take an unstable checkpoint to ensure the appliedThrough write is persisted to disk.
+ log() << "Waiting for an unstable checkpoint";
+ const bool stableCheckpoint = false;
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint);
+
+ // Ensure that appliedThrough is unset in the next stable checkpoint.
+ log() << "Clearing appliedThrough";
+ replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp());
+ }
+
Status status = AuthorizationManager::get(opCtx->getServiceContext())->initialize(opCtx);
if (!status.isOK()) {
severe() << "Failed to reinitialize auth data after rollback: " << redact(status);