summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_consistency_markers.h7
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp7
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.h4
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp8
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp68
-rw-r--r--src/mongo/db/repl/rs_rollback.h3
-rw-r--r--src/mongo/db/storage/recovery_unit.h6
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h2
-rw-r--r--src/mongo/shell/replsettest.js10
12 files changed, 96 insertions, 29 deletions
diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h
index 102540c77db..82118277986 100644
--- a/src/mongo/db/repl/replication_consistency_markers.h
+++ b/src/mongo/db/repl/replication_consistency_markers.h
@@ -165,9 +165,12 @@ public:
/**
* The applied through point is a persistent record of which oplog entries we've applied.
* If we crash while applying a batch of oplog entries, this OpTime tells us where to start
- * applying operations on startup.
+ * applying operations on startup. If 'setTimestamp' is true, the write will be timestamped with
+ * the timestamp from 'optime'.
*/
- virtual void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) = 0;
+ virtual void setAppliedThrough(OperationContext* opCtx,
+ const OpTime& optime,
+ bool setTimestamp = true) = 0;
/**
* Unsets the applied through OpTime at the given 'writeTimestamp'.
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index 9bfab91e861..345573f2ff5 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -243,7 +243,8 @@ void ReplicationConsistencyMarkersImpl::setMinValidToAtLeast(OperationContext* o
}
void ReplicationConsistencyMarkersImpl::setAppliedThrough(OperationContext* opCtx,
- const OpTime& optime) {
+ const OpTime& optime,
+ bool setTimestamp) {
invariant(!optime.isNull());
LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")";
@@ -251,7 +252,9 @@ void ReplicationConsistencyMarkersImpl::setAppliedThrough(OperationContext* opCt
// in checkpoints that contain all writes through this timestamp since it indicates the top of
// the oplog.
TimestampedBSONObj update;
- update.timestamp = optime.getTimestamp();
+ if (setTimestamp) {
+ update.timestamp = optime.getTimestamp();
+ }
update.obj = BSON("$set" << BSON(MinValidDocument::kAppliedThroughFieldName << optime));
_updateMinValidDocument(opCtx, update);
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h
index e12d2a7805a..5e64762d490 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.h
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.h
@@ -72,7 +72,9 @@ public:
void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override;
- void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
+ void setAppliedThrough(OperationContext* opCtx,
+ const OpTime& optime,
+ bool setTimestamp = true) override;
void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override;
OpTime getAppliedThrough(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
index ece2150fd44..61f46bf0bef 100644
--- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
@@ -93,7 +93,8 @@ Timestamp ReplicationConsistencyMarkersMock::getOplogTruncateAfterPoint(
}
void ReplicationConsistencyMarkersMock::setAppliedThrough(OperationContext* opCtx,
- const OpTime& optime) {
+ const OpTime& optime,
+ bool setTimestamp) {
stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
_appliedThrough = optime;
}
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h
index b8c7c06c7f8..165c6185c43 100644
--- a/src/mongo/db/repl/replication_consistency_markers_mock.h
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.h
@@ -64,7 +64,9 @@ public:
void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override;
- void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
+ void setAppliedThrough(OperationContext* opCtx,
+ const OpTime& optime,
+ bool setTimestamp = true) override;
void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override;
OpTime getAppliedThrough(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 0db743bce11..dd136139c4e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -3431,6 +3431,8 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_recalculateStableOpTime(Wit
return stableOpTime;
}
+MONGO_FAIL_POINT_DEFINE(disableSnapshotting);
+
void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
// Get the current stable optime.
auto stableOpTime = _recalculateStableOpTime(lk);
@@ -3459,7 +3461,9 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
}
// Set the stable timestamp regardless of whether the majority commit point moved
// forward.
- _storage->setStableTimestamp(getServiceContext(), stableOpTime->getTimestamp());
+ if (!MONGO_FAIL_POINT(disableSnapshotting)) {
+ _storage->setStableTimestamp(getServiceContext(), stableOpTime->getTimestamp());
+ }
}
}
_cleanupStableOpTimeCandidates(&_stableOpTimeCandidates, stableOpTime.get());
@@ -3692,8 +3696,6 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() {
return _uncommittedSnapshotsSize.load();
}
-MONGO_FAIL_POINT_DEFINE(disableSnapshotting);
-
bool ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
const OpTime& newCommittedSnapshot) {
if (gTestingSnapshotBehaviorInIsolation) {
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index b9cabd2580f..91e4bef476c 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -885,6 +885,7 @@ Status _syncRollback(OperationContext* opCtx,
invariant(!opCtx->lockState()->isLocked());
FixUpInfo how;
+ how.localTopOfOplog = replCoord->getMyLastAppliedOpTime();
log() << "Starting rollback. Sync source: " << rollbackSource.getSource() << rsLog;
how.rbid = rollbackSource.getRollbackId();
uassert(
@@ -1047,6 +1048,16 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
log() << "Finished refetching documents. Total size of documents refetched: "
<< goodVersions.size();
+ // We must start taking unstable checkpoints before rolling back oplog entries. Otherwise, a
+ // stable checkpoint could include the fixup write (since it is untimestamped) but not the write
+ // being rolled back (if it is after the stable timestamp), leading to inconsistent state. An
+ // unstable checkpoint will include both writes.
+ if (!serverGlobalParams.enableMajorityReadConcern) {
+ log() << "Setting initialDataTimestamp to 0 so that we start taking unstable checkpoints.";
+ opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp(
+ Timestamp::kAllowUnstableCheckpointsSentinel);
+ }
+
log() << "Checking the RollbackID and updating the MinValid if necessary";
checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess);
@@ -1404,19 +1415,38 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
log() << "Rollback deleted " << deletes << " documents and updated " << updates
<< " documents.";
- // When majority read concern is disabled, the stable timestamp may be ahead of the common
- // point. Force the stable timestamp back to the common point.
if (!serverGlobalParams.enableMajorityReadConcern) {
+ // When majority read concern is disabled, the stable timestamp may be ahead of the common
+ // point. Force the stable timestamp back to the common point, to allow writes after the
+ // common point.
const bool force = true;
- log() << "Forcing the stable timestamp to " << fixUpInfo.commonPoint.getTimestamp();
+ log() << "Forcing the stable timestamp to the common point: "
+ << fixUpInfo.commonPoint.getTimestamp();
opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp(
fixUpInfo.commonPoint.getTimestamp(), boost::none, force);
- // We must wait for a checkpoint before truncating oplog, so that if we crash after
- // truncating oplog, we are guaranteed to recover from a checkpoint that includes all of the
- // writes performed during the rollback.
- log() << "Waiting for a stable checkpoint";
- opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable();
+ // We must not take a stable checkpoint until it is guaranteed to include all writes from
+ // before the rollback (i.e. the stable timestamp is at least the local top of oplog). In
+ // addition, we must not take a stable checkpoint until the stable timestamp reaches the
+ // sync source top of oplog (minValid), since we must not take a stable checkpoint until we
+ // are in a consistent state. We control this by seting the initialDataTimestamp to the
+ // maximum of these two values. No checkpoints are taken until stable timestamp >=
+ // initialDataTimestamp.
+ auto syncSourceTopOfOplog = OpTime::parseFromOplogEntry(rollbackSource.getLastOperation())
+ .getValue()
+ .getTimestamp();
+ log() << "Setting initialDataTimestamp to the max of local top of oplog and sync source "
+ "top of oplog. Local top of oplog: "
+ << fixUpInfo.localTopOfOplog.getTimestamp()
+ << ", sync source top of oplog: " << syncSourceTopOfOplog;
+ opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp(
+ std::max(fixUpInfo.localTopOfOplog.getTimestamp(), syncSourceTopOfOplog));
+
+ // Take an unstable checkpoint to ensure that all of the writes performed during rollback
+ // are persisted to disk before truncating oplog.
+ log() << "Waiting for an unstable checkpoint";
+ const bool stableCheckpoint = false;
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint);
}
log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString() << " ("
@@ -1439,6 +1469,28 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false);
}
+ if (!serverGlobalParams.enableMajorityReadConcern) {
+ // If the server crashes and restarts before a stable checkpoint is taken, it will restart
+ // from the unstable checkpoint taken at the end of rollback. To ensure replication recovery
+ // replays all oplog after the common point, we set the appliedThrough to the common point.
+ // This is done using an untimestamped write, since timestamping the write with the common
+ // point TS would be incorrect (since this is equal to the stable timestamp), and this write
+ // will be included in the unstable checkpoint regardless of its timestamp.
+ log() << "Setting appliedThrough to the common point: " << fixUpInfo.commonPoint;
+ const bool setTimestamp = false;
+ replicationProcess->getConsistencyMarkers()->setAppliedThrough(
+ opCtx, fixUpInfo.commonPoint, setTimestamp);
+
+ // Take an unstable checkpoint to ensure the appliedThrough write is persisted to disk.
+ log() << "Waiting for an unstable checkpoint";
+ const bool stableCheckpoint = false;
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint);
+
+ // Ensure that appliedThrough is unset in the next stable checkpoint.
+ log() << "Clearing appliedThrough";
+ replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, Timestamp());
+ }
+
Status status = AuthorizationManager::get(opCtx->getServiceContext())->initialize(opCtx);
if (!status.isOK()) {
severe() << "Failed to reinitialize auth data after rollback: " << redact(status);
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index 56b156e72e1..da364d6ef3e 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -285,6 +285,9 @@ struct FixUpInfo {
// after rollback the in-memory transaction table is cleared.
bool refetchTransactionDocs = false;
+ // The local node's top of oplog prior to entering rollback.
+ OpTime localTopOfOplog;
+
OpTime commonPoint;
RecordId commonPointOurDiskloc;
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 949fb438737..0bd3d6c080e 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -142,13 +142,13 @@ public:
/**
* Unlike `waitUntilDurable`, this method takes a stable checkpoint, making durable any writes
* on unjournaled tables that are behind the current stable timestamp. If the storage engine
- * is starting from an "unstable" checkpoint, this method call will turn into an unstable
- * checkpoint.
+ * is starting from an "unstable" checkpoint or 'stableCheckpoint'=false, this method call will
+ * turn into an unstable checkpoint.
*
* This must not be called by a system taking user writes until after a stable timestamp is
* passed to the storage engine.
*/
- virtual bool waitUntilUnjournaledWritesDurable() {
+ virtual bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) {
return waitUntilDurable();
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index b22a72a1082..e232d1b22ca 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -304,10 +304,9 @@ bool WiredTigerRecoveryUnit::waitUntilDurable() {
return true;
}
-bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable() {
+bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable(bool stableCheckpoint) {
invariant(!_inUnitOfWork(), toString(_state));
const bool forceCheckpoint = true;
- const bool stableCheckpoint = true;
// Calling `waitUntilDurable` with `forceCheckpoint` set to false only performs a log
// (journal) flush, and thus has no effect on unjournaled writes. Setting `forceCheckpoint` to
// true will lock in stable writes to unjournaled tables.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index 44fcc26f90a..2440a79dec3 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -106,7 +106,7 @@ public:
bool waitUntilDurable() override;
- bool waitUntilUnjournaledWritesDurable() override;
+ bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) override;
void registerChange(Change* change) override;
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index 0999489c03a..79965c73b32 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -2286,14 +2286,14 @@ var ReplSetTest = function(opts) {
}
// If restarting a node, use its existing options as the defaults.
+ var baseOptions;
if ((options && options.restart) || restart) {
- const existingOpts =
- _useBridge ? _unbridgedNodes[n].fullOptions : this.nodes[n].fullOptions;
- options = Object.merge(existingOpts, options);
+ baseOptions = _useBridge ? _unbridgedNodes[n].fullOptions : this.nodes[n].fullOptions;
} else {
- options = Object.merge(defaults, options);
+ baseOptions = defaults;
}
- options = Object.merge(options, this.nodeOptions["n" + n]);
+ baseOptions = Object.merge(baseOptions, this.nodeOptions["n" + n]);
+ options = Object.merge(baseOptions, options);
delete options.rsConfig;
options.restart = options.restart || restart;