diff options
author | Tess Avitabile <tess.avitabile@mongodb.com> | 2019-03-06 15:41:28 -0500 |
---|---|---|
committer | Tess Avitabile <tess.avitabile@mongodb.com> | 2019-03-13 14:30:38 -0400 |
commit | 84916e817418b3b5627e80730effcd422c15696e (patch) | |
tree | 96e1685150fd897421007533499eea7423c62ec2 /src/mongo/db | |
parent | 035aa5ca9d7e6c4587368ab11c82ead405f6e047 (diff) | |
download | mongo-84916e817418b3b5627e80730effcd422c15696e.tar.gz |
SERVER-38925 When enableMajorityReadConcern=false, after rollback via refetch, do not take stable checkpoint until the local top of oplog from before rollback is reached
Diffstat (limited to 'src/mongo/db')
11 files changed, 91 insertions, 24 deletions
diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h index 102540c77db..82118277986 100644 --- a/src/mongo/db/repl/replication_consistency_markers.h +++ b/src/mongo/db/repl/replication_consistency_markers.h @@ -165,9 +165,12 @@ public: /** * The applied through point is a persistent record of which oplog entries we've applied. * If we crash while applying a batch of oplog entries, this OpTime tells us where to start - * applying operations on startup. + * applying operations on startup. If 'setTimestamp' is true, the write will be timestamped with + * the timestamp from 'optime'. */ - virtual void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) = 0; + virtual void setAppliedThrough(OperationContext* opCtx, + const OpTime& optime, + bool setTimestamp = true) = 0; /** * Unsets the applied through OpTime at the given 'writeTimestamp'. diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index 9bfab91e861..345573f2ff5 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -243,7 +243,8 @@ void ReplicationConsistencyMarkersImpl::setMinValidToAtLeast(OperationContext* o } void ReplicationConsistencyMarkersImpl::setAppliedThrough(OperationContext* opCtx, - const OpTime& optime) { + const OpTime& optime, + bool setTimestamp) { invariant(!optime.isNull()); LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")"; @@ -251,7 +252,9 @@ void ReplicationConsistencyMarkersImpl::setAppliedThrough(OperationContext* opCt // in checkpoints that contain all writes through this timestamp since it indicates the top of // the oplog. TimestampedBSONObj update; - update.timestamp = optime.getTimestamp(); + if (setTimestamp) { + update.timestamp = optime.getTimestamp(); + } update.obj = BSON("$set" << BSON(MinValidDocument::kAppliedThroughFieldName << optime)); _updateMinValidDocument(opCtx, update); diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h index e12d2a7805a..5e64762d490 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.h +++ b/src/mongo/db/repl/replication_consistency_markers_impl.h @@ -72,7 +72,9 @@ public: void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override; Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override; - void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override; + void setAppliedThrough(OperationContext* opCtx, + const OpTime& optime, + bool setTimestamp = true) override; void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override; OpTime getAppliedThrough(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp index ece2150fd44..61f46bf0bef 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp @@ -93,7 +93,8 @@ Timestamp ReplicationConsistencyMarkersMock::getOplogTruncateAfterPoint( } void ReplicationConsistencyMarkersMock::setAppliedThrough(OperationContext* opCtx, - const OpTime& optime) { + const OpTime& optime, + bool setTimestamp) { stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); _appliedThrough = optime; } diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h index b8c7c06c7f8..165c6185c43 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.h +++ b/src/mongo/db/repl/replication_consistency_markers_mock.h @@ -64,7 +64,9 @@ public: void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override; Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override; - void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override; + void setAppliedThrough(OperationContext* opCtx, + const OpTime& optime, + bool setTimestamp = true) override; void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override; OpTime getAppliedThrough(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 0db743bce11..dd136139c4e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3431,6 +3431,8 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_recalculateStableOpTime(Wit return stableOpTime; } +MONGO_FAIL_POINT_DEFINE(disableSnapshotting); + void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { // Get the current stable optime. auto stableOpTime = _recalculateStableOpTime(lk); @@ -3459,7 +3461,9 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { } // Set the stable timestamp regardless of whether the majority commit point moved // forward. - _storage->setStableTimestamp(getServiceContext(), stableOpTime->getTimestamp()); + if (!MONGO_FAIL_POINT(disableSnapshotting)) { + _storage->setStableTimestamp(getServiceContext(), stableOpTime->getTimestamp()); + } } } _cleanupStableOpTimeCandidates(&_stableOpTimeCandidates, stableOpTime.get()); @@ -3692,8 +3696,6 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { return _uncommittedSnapshotsSize.load(); } -MONGO_FAIL_POINT_DEFINE(disableSnapshotting); - bool ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( const OpTime& newCommittedSnapshot) { if (gTestingSnapshotBehaviorInIsolation) { diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index b9cabd2580f..91e4bef476c 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -885,6 +885,7 @@ Status _syncRollback(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked()); FixUpInfo how; + how.localTopOfOplog = replCoord->getMyLastAppliedOpTime(); log() << "Starting rollback. Sync source: " << rollbackSource.getSource() << rsLog; how.rbid = rollbackSource.getRollbackId(); uassert( @@ -1047,6 +1048,16 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, log() << "Finished refetching documents. Total size of documents refetched: " << goodVersions.size(); + // We must start taking unstable checkpoints before rolling back oplog entries. Otherwise, a + // stable checkpoint could include the fixup write (since it is untimestamped) but not the write + // being rolled back (if it is after the stable timestamp), leading to inconsistent state. An + // unstable checkpoint will include both writes. + if (!serverGlobalParams.enableMajorityReadConcern) { + log() << "Setting initialDataTimestamp to 0 so that we start taking unstable checkpoints."; + opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp( + Timestamp::kAllowUnstableCheckpointsSentinel); + } + log() << "Checking the RollbackID and updating the MinValid if necessary"; checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess); @@ -1404,19 +1415,38 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, log() << "Rollback deleted " << deletes << " documents and updated " << updates << " documents."; - // When majority read concern is disabled, the stable timestamp may be ahead of the common - // point. Force the stable timestamp back to the common point. if (!serverGlobalParams.enableMajorityReadConcern) { + // When majority read concern is disabled, the stable timestamp may be ahead of the common + // point. Force the stable timestamp back to the common point, to allow writes after the + // common point. const bool force = true; - log() << "Forcing the stable timestamp to " << fixUpInfo.commonPoint.getTimestamp(); + log() << "Forcing the stable timestamp to the common point: " + << fixUpInfo.commonPoint.getTimestamp(); opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp( fixUpInfo.commonPoint.getTimestamp(), boost::none, force); - // We must wait for a checkpoint before truncating oplog, so that if we crash after - // truncating oplog, we are guaranteed to recover from a checkpoint that includes all of the - // writes performed during the rollback. - log() << "Waiting for a stable checkpoint"; - opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(); + // We must not take a stable checkpoint until it is guaranteed to include all writes from + // before the rollback (i.e. the stable timestamp is at least the local top of oplog). In + // addition, we must not take a stable checkpoint until the stable timestamp reaches the + // sync source top of oplog (minValid), since we must not take a stable checkpoint until we + // are in a consistent state. We control this by seting the initialDataTimestamp to the + // maximum of these two values. No checkpoints are taken until stable timestamp >= + // initialDataTimestamp. + auto syncSourceTopOfOplog = OpTime::parseFromOplogEntry(rollbackSource.getLastOperation()) + .getValue() + .getTimestamp(); + log() << "Setting initialDataTimestamp to the max of local top of oplog and sync source " + "top of oplog. Local top of oplog: " + << fixUpInfo.localTopOfOplog.getTimestamp() + << ", sync source top of oplog: " << syncSourceTopOfOplog; + opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp( + std::max(fixUpInfo.localTopOfOplog.getTimestamp(), syncSourceTopOfOplog)); + + // Take an unstable checkpoint to ensure that all of the writes performed during rollback + // are persisted to disk before truncating oplog. + log() << "Waiting for an unstable checkpoint"; + const bool stableCheckpoint = false; + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint); } log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString() << " (" @@ -1439,6 +1469,28 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false); } + if (!serverGlobalParams.enableMajorityReadConcern) { + // If the server crashes and restarts before a stable checkpoint is taken, it will restart + // from the unstable checkpoint taken at the end of rollback. To ensure replication recovery + // replays all oplog after the common point, we set the appliedThrough to the common point. + // This is done using an untimestamped write, since timestamping the write with the common + // point TS would be incorrect (since this is equal to the stable timestamp), and this write + // will be included in the unstable checkpoint regardless of its timestamp. + log() << "Setting appliedThrough to the common point: " << fixUpInfo.commonPoint; + const bool setTimestamp = false; + replicationProcess->getConsistencyMarkers()->setAppliedThrough( + opCtx, fixUpInfo.commonPoint, setTimestamp); + + // Take an unstable checkpoint to ensure the appliedThrough write is persisted to disk. + log() << "Waiting for an unstable checkpoint"; + const bool stableCheckpoint = false; + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint); + + // Ensure that appliedThrough is unset in the next stable checkpoint. + log() << "Clearing appliedThrough"; + replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp()); + } + Status status = AuthorizationManager::get(opCtx->getServiceContext())->initialize(opCtx); if (!status.isOK()) { severe() << "Failed to reinitialize auth data after rollback: " << redact(status); diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index 56b156e72e1..da364d6ef3e 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -285,6 +285,9 @@ struct FixUpInfo { // after rollback the in-memory transaction table is cleared. bool refetchTransactionDocs = false; + // The local node's top of oplog prior to entering rollback. + OpTime localTopOfOplog; + OpTime commonPoint; RecordId commonPointOurDiskloc; diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 949fb438737..0bd3d6c080e 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -142,13 +142,13 @@ public: /** * Unlike `waitUntilDurable`, this method takes a stable checkpoint, making durable any writes * on unjournaled tables that are behind the current stable timestamp. If the storage engine - * is starting from an "unstable" checkpoint, this method call will turn into an unstable - * checkpoint. + * is starting from an "unstable" checkpoint or 'stableCheckpoint'=false, this method call will + * turn into an unstable checkpoint. * * This must not be called by a system taking user writes until after a stable timestamp is * passed to the storage engine. */ - virtual bool waitUntilUnjournaledWritesDurable() { + virtual bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) { return waitUntilDurable(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index b22a72a1082..e232d1b22ca 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -304,10 +304,9 @@ bool WiredTigerRecoveryUnit::waitUntilDurable() { return true; } -bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable() { +bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable(bool stableCheckpoint) { invariant(!_inUnitOfWork(), toString(_state)); const bool forceCheckpoint = true; - const bool stableCheckpoint = true; // Calling `waitUntilDurable` with `forceCheckpoint` set to false only performs a log // (journal) flush, and thus has no effect on unjournaled writes. Setting `forceCheckpoint` to // true will lock in stable writes to unjournaled tables. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index 44fcc26f90a..2440a79dec3 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -106,7 +106,7 @@ public: bool waitUntilDurable() override; - bool waitUntilUnjournaledWritesDurable() override; + bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) override; void registerChange(Change* change) override; |