From 84916e817418b3b5627e80730effcd422c15696e Mon Sep 17 00:00:00 2001 From: Tess Avitabile Date: Wed, 6 Mar 2019 15:41:28 -0500 Subject: SERVER-38925 When enableMajorityReadConcern=false, after rollback via refetch, do not take stable checkpoint until the local top of oplog from before rollback is reached --- .../downgrade_after_rollback_via_refetch.js | 65 +++++++++++++++++++++ jstests/replsets/libs/rollback_test.js | 4 +- jstests/replsets/rollback_dup_ids.js | 43 ++++++++++++++ .../db/repl/replication_consistency_markers.h | 7 ++- .../repl/replication_consistency_markers_impl.cpp | 7 ++- .../db/repl/replication_consistency_markers_impl.h | 4 +- .../repl/replication_consistency_markers_mock.cpp | 3 +- .../db/repl/replication_consistency_markers_mock.h | 4 +- src/mongo/db/repl/replication_coordinator_impl.cpp | 8 ++- src/mongo/db/repl/rs_rollback.cpp | 68 +++++++++++++++++++--- src/mongo/db/repl/rs_rollback.h | 3 + src/mongo/db/storage/recovery_unit.h | 6 +- .../wiredtiger/wiredtiger_recovery_unit.cpp | 3 +- .../storage/wiredtiger/wiredtiger_recovery_unit.h | 2 +- src/mongo/shell/replsettest.js | 10 ++-- 15 files changed, 206 insertions(+), 31 deletions(-) create mode 100644 jstests/multiVersion/downgrade_after_rollback_via_refetch.js create mode 100644 jstests/replsets/rollback_dup_ids.js diff --git a/jstests/multiVersion/downgrade_after_rollback_via_refetch.js b/jstests/multiVersion/downgrade_after_rollback_via_refetch.js new file mode 100644 index 00000000000..175ddcdf2df --- /dev/null +++ b/jstests/multiVersion/downgrade_after_rollback_via_refetch.js @@ -0,0 +1,65 @@ +// When enableMajorityReadConcern=false, a node transitions from ROLLBACK to RECOVERING with an +// unstable checkpoint with appliedThrough set to the common point. Test that if the node crashes +// and restarts with 4.0 before its next stable checkpoint, then oplog entries after the common +// point are replayed. +(function() { + "use strict"; + + load("jstests/replsets/libs/rollback_test.js"); + + TestData.rollbackShutdowns = true; + TestData.allowUncleanShutdowns = true; + let name = "downgrade_after_rollback_via_refetch"; + let dbName = "test"; + let sourceCollName = "coll"; + + function testDowngrade(enableMajorityReadConcern) { + jsTest.log("Test downgrade with enableMajorityReadConcern=" + enableMajorityReadConcern); + + // Set up Rollback Test. + let replTest = new ReplSetTest( + {name, nodes: 3, useBridge: true, nodeOptions: {enableMajorityReadConcern: "false"}}); + replTest.startSet(); + let config = replTest.getReplSetConfig(); + config.members[2].priority = 0; + config.settings = {chainingAllowed: false}; + replTest.initiate(config); + let rollbackTest = new RollbackTest(name, replTest); + + // Set the featureCompatibilityVersion to 4.0, so that we can downgrade the rollback node. + assert.commandWorked( + rollbackTest.getPrimary().adminCommand({setFeatureCompatibilityVersion: "4.0"})); + + let rollbackNode = rollbackTest.transitionToRollbackOperations(); + + // Turn off stable checkpoints on the rollback node. + assert.commandWorked(rollbackNode.adminCommand( + {configureFailPoint: "disableSnapshotting", mode: "alwaysOn"})); + + // Wait for a rollback to finish. + rollbackTest.transitionToSyncSourceOperationsBeforeRollback(); + rollbackTest.transitionToSyncSourceOperationsDuringRollback(); + rollbackTest.transitionToSteadyStateOperations(); + + // Replicate a new operation to the rollback node. Replication is disabled on the tiebreaker + // node, so a successful majority write guarantees the write has replicated to the rollback + // node. + assert.commandWorked(rollbackTest.getPrimary().getDB(dbName)[sourceCollName].insert( + {_id: 0}, {writeConcern: {w: "majority"}})); + assert.eq(rollbackNode.getDB(dbName)[sourceCollName].find({_id: 0}).itcount(), 1); + + // Kill and restart the rollback node on 4.0. + rollbackTest.restartNode( + 0, 9, {binVersion: "4.0", enableMajorityReadConcern: enableMajorityReadConcern}); + replTest.awaitSecondaryNodes(); + + // The rollback node should replay the new operation. + rollbackNode = rollbackTest.getSecondary(); + assert.eq(rollbackNode.getDB(dbName)[sourceCollName].find({_id: 0}).itcount(), 1); + + rollbackTest.stop(); + } + + testDowngrade("true"); + testDowngrade("false"); +})(); diff --git a/jstests/replsets/libs/rollback_test.js b/jstests/replsets/libs/rollback_test.js index afdecfcbad0..39b7d13e93a 100644 --- a/jstests/replsets/libs/rollback_test.js +++ b/jstests/replsets/libs/rollback_test.js @@ -384,7 +384,7 @@ function RollbackTest(name = "RollbackTest", replSet, expectPreparedTxnsDuringRo return curSecondary; }; - this.restartNode = function(nodeId, signal) { + this.restartNode = function(nodeId, signal, startOptions) { assert(signal === SIGKILL || signal === SIGTERM, `Received unknown signal: ${signal}`); assert.gte(nodeId, 0, "Invalid argument to RollbackTest.restartNode()"); @@ -415,7 +415,7 @@ function RollbackTest(name = "RollbackTest", replSet, expectPreparedTxnsDuringRo log(`Stopping node ${hostName} with signal ${signal}`); rst.stop(nodeId, signal, opts, {forRestart: true}); log(`Restarting node ${hostName}`); - rst.start(nodeId, {}, true /* restart */); + rst.start(nodeId, startOptions, true /* restart */); // Ensure that the primary is ready to take operations before continuing. If both nodes are // connected to the tiebreaker node, the primary may switch. diff --git a/jstests/replsets/rollback_dup_ids.js b/jstests/replsets/rollback_dup_ids.js new file mode 100644 index 00000000000..a56b2b9bc05 --- /dev/null +++ b/jstests/replsets/rollback_dup_ids.js @@ -0,0 +1,43 @@ +// When run with --majorityReadConcern=off, this test reproduces the bug described in SERVER-38925, +// where rolling back a delete followed by a restart produces documents with duplicate _id. +(function() { + "use strict"; + + load("jstests/replsets/libs/rollback_test.js"); + + TestData.rollbackShutdowns = true; + TestData.allowUncleanShutdowns = true; + let dbName = "test"; + let sourceCollName = "coll"; + + let doc1 = {_id: 1, x: "document_of_interest"}; + + let CommonOps = (node) => { + // Insert a document that will exist on all nodes. + assert.commandWorked(node.getDB(dbName)[sourceCollName].insert(doc1)); + }; + + let RollbackOps = (node) => { + // Delete the document on rollback node so it will be refetched from sync source. + assert.commandWorked(node.getDB(dbName)[sourceCollName].remove({_id: 1})); + }; + + // Set up Rollback Test. + let rollbackTest = new RollbackTest(); + CommonOps(rollbackTest.getPrimary()); + + let rollbackNode = rollbackTest.transitionToRollbackOperations(); + RollbackOps(rollbackNode); + + // Wait for rollback to finish. + rollbackTest.transitionToSyncSourceOperationsBeforeRollback(); + rollbackTest.transitionToSyncSourceOperationsDuringRollback(); + rollbackTest.transitionToSteadyStateOperations(); + + // Kill and restart the node that rolled back. + rollbackTest.restartNode(0, 9); + + // Check the replica set. + rollbackTest.stop(); + +}()); \ No newline at end of file diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h index 102540c77db..82118277986 100644 --- a/src/mongo/db/repl/replication_consistency_markers.h +++ b/src/mongo/db/repl/replication_consistency_markers.h @@ -165,9 +165,12 @@ public: /** * The applied through point is a persistent record of which oplog entries we've applied. * If we crash while applying a batch of oplog entries, this OpTime tells us where to start - * applying operations on startup. + * applying operations on startup. If 'setTimestamp' is true, the write will be timestamped with + * the timestamp from 'optime'. */ - virtual void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) = 0; + virtual void setAppliedThrough(OperationContext* opCtx, + const OpTime& optime, + bool setTimestamp = true) = 0; /** * Unsets the applied through OpTime at the given 'writeTimestamp'. diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index 9bfab91e861..345573f2ff5 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -243,7 +243,8 @@ void ReplicationConsistencyMarkersImpl::setMinValidToAtLeast(OperationContext* o } void ReplicationConsistencyMarkersImpl::setAppliedThrough(OperationContext* opCtx, - const OpTime& optime) { + const OpTime& optime, + bool setTimestamp) { invariant(!optime.isNull()); LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")"; @@ -251,7 +252,9 @@ void ReplicationConsistencyMarkersImpl::setAppliedThrough(OperationContext* opCt // in checkpoints that contain all writes through this timestamp since it indicates the top of // the oplog. TimestampedBSONObj update; - update.timestamp = optime.getTimestamp(); + if (setTimestamp) { + update.timestamp = optime.getTimestamp(); + } update.obj = BSON("$set" << BSON(MinValidDocument::kAppliedThroughFieldName << optime)); _updateMinValidDocument(opCtx, update); diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h index e12d2a7805a..5e64762d490 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.h +++ b/src/mongo/db/repl/replication_consistency_markers_impl.h @@ -72,7 +72,9 @@ public: void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override; Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override; - void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override; + void setAppliedThrough(OperationContext* opCtx, + const OpTime& optime, + bool setTimestamp = true) override; void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override; OpTime getAppliedThrough(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp index ece2150fd44..61f46bf0bef 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp @@ -93,7 +93,8 @@ Timestamp ReplicationConsistencyMarkersMock::getOplogTruncateAfterPoint( } void ReplicationConsistencyMarkersMock::setAppliedThrough(OperationContext* opCtx, - const OpTime& optime) { + const OpTime& optime, + bool setTimestamp) { stdx::lock_guard lock(_minValidBoundariesMutex); _appliedThrough = optime; } diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h index b8c7c06c7f8..165c6185c43 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.h +++ b/src/mongo/db/repl/replication_consistency_markers_mock.h @@ -64,7 +64,9 @@ public: void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override; Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override; - void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override; + void setAppliedThrough(OperationContext* opCtx, + const OpTime& optime, + bool setTimestamp = true) override; void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override; OpTime getAppliedThrough(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 0db743bce11..dd136139c4e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3431,6 +3431,8 @@ boost::optional ReplicationCoordinatorImpl::_recalculateStableOpTime(Wit return stableOpTime; } +MONGO_FAIL_POINT_DEFINE(disableSnapshotting); + void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { // Get the current stable optime. auto stableOpTime = _recalculateStableOpTime(lk); @@ -3459,7 +3461,9 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { } // Set the stable timestamp regardless of whether the majority commit point moved // forward. - _storage->setStableTimestamp(getServiceContext(), stableOpTime->getTimestamp()); + if (!MONGO_FAIL_POINT(disableSnapshotting)) { + _storage->setStableTimestamp(getServiceContext(), stableOpTime->getTimestamp()); + } } } _cleanupStableOpTimeCandidates(&_stableOpTimeCandidates, stableOpTime.get()); @@ -3692,8 +3696,6 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { return _uncommittedSnapshotsSize.load(); } -MONGO_FAIL_POINT_DEFINE(disableSnapshotting); - bool ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( const OpTime& newCommittedSnapshot) { if (gTestingSnapshotBehaviorInIsolation) { diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index b9cabd2580f..91e4bef476c 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -885,6 +885,7 @@ Status _syncRollback(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked()); FixUpInfo how; + how.localTopOfOplog = replCoord->getMyLastAppliedOpTime(); log() << "Starting rollback. Sync source: " << rollbackSource.getSource() << rsLog; how.rbid = rollbackSource.getRollbackId(); uassert( @@ -1047,6 +1048,16 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, log() << "Finished refetching documents. Total size of documents refetched: " << goodVersions.size(); + // We must start taking unstable checkpoints before rolling back oplog entries. Otherwise, a + // stable checkpoint could include the fixup write (since it is untimestamped) but not the write + // being rolled back (if it is after the stable timestamp), leading to inconsistent state. An + // unstable checkpoint will include both writes. + if (!serverGlobalParams.enableMajorityReadConcern) { + log() << "Setting initialDataTimestamp to 0 so that we start taking unstable checkpoints."; + opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp( + Timestamp::kAllowUnstableCheckpointsSentinel); + } + log() << "Checking the RollbackID and updating the MinValid if necessary"; checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess); @@ -1404,19 +1415,38 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, log() << "Rollback deleted " << deletes << " documents and updated " << updates << " documents."; - // When majority read concern is disabled, the stable timestamp may be ahead of the common - // point. Force the stable timestamp back to the common point. if (!serverGlobalParams.enableMajorityReadConcern) { + // When majority read concern is disabled, the stable timestamp may be ahead of the common + // point. Force the stable timestamp back to the common point, to allow writes after the + // common point. const bool force = true; - log() << "Forcing the stable timestamp to " << fixUpInfo.commonPoint.getTimestamp(); + log() << "Forcing the stable timestamp to the common point: " + << fixUpInfo.commonPoint.getTimestamp(); opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp( fixUpInfo.commonPoint.getTimestamp(), boost::none, force); - // We must wait for a checkpoint before truncating oplog, so that if we crash after - // truncating oplog, we are guaranteed to recover from a checkpoint that includes all of the - // writes performed during the rollback. - log() << "Waiting for a stable checkpoint"; - opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(); + // We must not take a stable checkpoint until it is guaranteed to include all writes from + // before the rollback (i.e. the stable timestamp is at least the local top of oplog). In + // addition, we must not take a stable checkpoint until the stable timestamp reaches the + // sync source top of oplog (minValid), since we must not take a stable checkpoint until we + // are in a consistent state. We control this by seting the initialDataTimestamp to the + // maximum of these two values. No checkpoints are taken until stable timestamp >= + // initialDataTimestamp. + auto syncSourceTopOfOplog = OpTime::parseFromOplogEntry(rollbackSource.getLastOperation()) + .getValue() + .getTimestamp(); + log() << "Setting initialDataTimestamp to the max of local top of oplog and sync source " + "top of oplog. Local top of oplog: " + << fixUpInfo.localTopOfOplog.getTimestamp() + << ", sync source top of oplog: " << syncSourceTopOfOplog; + opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp( + std::max(fixUpInfo.localTopOfOplog.getTimestamp(), syncSourceTopOfOplog)); + + // Take an unstable checkpoint to ensure that all of the writes performed during rollback + // are persisted to disk before truncating oplog. + log() << "Waiting for an unstable checkpoint"; + const bool stableCheckpoint = false; + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint); } log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString() << " (" @@ -1439,6 +1469,28 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false); } + if (!serverGlobalParams.enableMajorityReadConcern) { + // If the server crashes and restarts before a stable checkpoint is taken, it will restart + // from the unstable checkpoint taken at the end of rollback. To ensure replication recovery + // replays all oplog after the common point, we set the appliedThrough to the common point. + // This is done using an untimestamped write, since timestamping the write with the common + // point TS would be incorrect (since this is equal to the stable timestamp), and this write + // will be included in the unstable checkpoint regardless of its timestamp. + log() << "Setting appliedThrough to the common point: " << fixUpInfo.commonPoint; + const bool setTimestamp = false; + replicationProcess->getConsistencyMarkers()->setAppliedThrough( + opCtx, fixUpInfo.commonPoint, setTimestamp); + + // Take an unstable checkpoint to ensure the appliedThrough write is persisted to disk. + log() << "Waiting for an unstable checkpoint"; + const bool stableCheckpoint = false; + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint); + + // Ensure that appliedThrough is unset in the next stable checkpoint. + log() << "Clearing appliedThrough"; + replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp()); + } + Status status = AuthorizationManager::get(opCtx->getServiceContext())->initialize(opCtx); if (!status.isOK()) { severe() << "Failed to reinitialize auth data after rollback: " << redact(status); diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index 56b156e72e1..da364d6ef3e 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -285,6 +285,9 @@ struct FixUpInfo { // after rollback the in-memory transaction table is cleared. bool refetchTransactionDocs = false; + // The local node's top of oplog prior to entering rollback. + OpTime localTopOfOplog; + OpTime commonPoint; RecordId commonPointOurDiskloc; diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 949fb438737..0bd3d6c080e 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -142,13 +142,13 @@ public: /** * Unlike `waitUntilDurable`, this method takes a stable checkpoint, making durable any writes * on unjournaled tables that are behind the current stable timestamp. If the storage engine - * is starting from an "unstable" checkpoint, this method call will turn into an unstable - * checkpoint. + * is starting from an "unstable" checkpoint or 'stableCheckpoint'=false, this method call will + * turn into an unstable checkpoint. * * This must not be called by a system taking user writes until after a stable timestamp is * passed to the storage engine. */ - virtual bool waitUntilUnjournaledWritesDurable() { + virtual bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) { return waitUntilDurable(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index b22a72a1082..e232d1b22ca 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -304,10 +304,9 @@ bool WiredTigerRecoveryUnit::waitUntilDurable() { return true; } -bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable() { +bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable(bool stableCheckpoint) { invariant(!_inUnitOfWork(), toString(_state)); const bool forceCheckpoint = true; - const bool stableCheckpoint = true; // Calling `waitUntilDurable` with `forceCheckpoint` set to false only performs a log // (journal) flush, and thus has no effect on unjournaled writes. Setting `forceCheckpoint` to // true will lock in stable writes to unjournaled tables. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index 44fcc26f90a..2440a79dec3 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -106,7 +106,7 @@ public: bool waitUntilDurable() override; - bool waitUntilUnjournaledWritesDurable() override; + bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) override; void registerChange(Change* change) override; diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index 0999489c03a..79965c73b32 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -2286,14 +2286,14 @@ var ReplSetTest = function(opts) { } // If restarting a node, use its existing options as the defaults. + var baseOptions; if ((options && options.restart) || restart) { - const existingOpts = - _useBridge ? _unbridgedNodes[n].fullOptions : this.nodes[n].fullOptions; - options = Object.merge(existingOpts, options); + baseOptions = _useBridge ? _unbridgedNodes[n].fullOptions : this.nodes[n].fullOptions; } else { - options = Object.merge(defaults, options); + baseOptions = defaults; } - options = Object.merge(options, this.nodeOptions["n" + n]); + baseOptions = Object.merge(baseOptions, this.nodeOptions["n" + n]); + options = Object.merge(baseOptions, options); delete options.rsConfig; options.restart = options.restart || restart; -- cgit v1.2.1