summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp')
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp157
1 files changed, 101 insertions, 56 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index d9d07d00cd1..1655591d191 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -260,6 +260,20 @@ private:
AtomicWord<bool> _shuttingDown{false};
};
+std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult& r) {
+ if (r.isOK()) {
+ if (r.getValue()) {
+ // Timestamp.
+ return r.getValue().value().toString();
+ } else {
+ // boost::none.
+ return "null";
+ }
+ } else {
+ return r.getStatus().toString();
+ }
+}
+
class WiredTigerKVEngine::WiredTigerCheckpointThread : public BackgroundJob {
public:
explicit WiredTigerCheckpointThread(WiredTigerKVEngine* wiredTigerKVEngine,
@@ -285,28 +299,31 @@ public:
wiredTigerGlobalOptions.checkpointDelaySecs)));
}
+ // Might have been awakened by another thread shutting us down.
+ if (_shuttingDown.load()) {
+ break;
+ }
+
const Timestamp stableTimestamp = _wiredTigerKVEngine->getStableTimestamp();
const Timestamp initialDataTimestamp = _wiredTigerKVEngine->getInitialDataTimestamp();
// The amount of oplog to keep is primarily dictated by a user setting. However, in
// unexpected cases, durable, recover to a timestamp storage engines may need to play
// forward from an oplog entry that would otherwise be truncated by the user
- // setting. Furthermore with prepared transactions, oplog entries can refer to
- // previous oplog entries.
+ // setting. Furthermore, the entries in prepared or large transactions can refer to
+ // previous entries in the same transaction.
//
- // Live (replication) rollback will replay oplogs from exactly the stable
- // timestamp. With prepared transactions, it may require some additional entries prior
- // to the stable timestamp. These requirements are summarized in
- // `getOplogNeededForRollback`. Truncating the oplog at this point is sufficient for
- // in-memory configurations, but could cause an unrecoverable scenario if the node
- // crashed and has to play from the last stable checkpoint.
+ // Live (replication) rollback will replay oplogs from exactly the stable timestamp.
+ // With prepared or large transactions, it may require some additional entries prior to
+ // the stable timestamp. These requirements are summarized in getOplogNeededForRollback.
+ // Truncating the oplog at this point is sufficient for in-memory configurations, but
+ // could cause an unrecoverable scenario if the node crashed and has to play from the
+ // last stable checkpoint.
//
// By recording the oplog needed for rollback "now", then taking a stable checkpoint,
// we can safely assume that the oplog needed for crash recovery has caught up to the
// recorded value. After the checkpoint, this value will be published such that actors
// which truncate the oplog can read an updated value.
- const Timestamp oplogNeededForRollback =
- _wiredTigerKVEngine->getOplogNeededForRollback();
try {
// Three cases:
//
@@ -331,21 +348,22 @@ public:
<< stableTimestamp.toString()
<< " InitialDataTimestamp: " << initialDataTimestamp.toString();
} else {
- // 'stableTimestamp' is the smallest possible value at which WT will take a
- // stable checkpoint. A newer stable timestamp may be used by WT if one is
- // concurrently set.
- LOG_FOR_RECOVERY(2) << "Performing stable checkpoint. StableTimestamp: "
- << stableTimestamp;
+ auto oplogNeededForRollback = _wiredTigerKVEngine->getOplogNeededForRollback();
+
+ LOG_FOR_RECOVERY(2)
+ << "Performing stable checkpoint. StableTimestamp: " << stableTimestamp
+ << ", OplogNeededForRollback: " << toString(oplogNeededForRollback);
UniqueWiredTigerSession session = _sessionCache->getSession();
WT_SESSION* s = session->getSession();
invariantWTOK(s->checkpoint(s, "use_timestamp=true"));
- // Now that the checkpoint is durable, publish the oplog needed to recover
- // from it.
- {
+ if (oplogNeededForRollback.isOK()) {
+ // Now that the checkpoint is durable, publish the oplog needed to recover
+ // from it.
stdx::lock_guard<stdx::mutex> lk(_oplogNeededForCrashRecoveryMutex);
- _oplogNeededForCrashRecovery.store(oplogNeededForRollback.asULL());
+ _oplogNeededForCrashRecovery.store(
+ oplogNeededForRollback.getValue().asULL());
}
}
} catch (const WriteConflictException&) {
@@ -639,11 +657,14 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
_journalFlusher->go();
}
+ // Until the Replication layer installs a real callback, prevent truncating the oplog.
+ setOldestActiveTransactionTimestampCallback(
+ [](Timestamp) { return StatusWith(boost::make_optional(Timestamp::min())); });
+
if (!_readOnly && !_ephemeral) {
if (!_recoveryTimestamp.isNull()) {
setInitialDataTimestamp(_recoveryTimestamp);
- // The `maximumTruncationTimestamp` is not persisted, so choose a conservative value.
- setStableTimestamp(_recoveryTimestamp, Timestamp::min(), false);
+ setStableTimestamp(_recoveryTimestamp, false);
}
_checkpointThread =
@@ -1046,6 +1067,12 @@ void WiredTigerKVEngine::syncSizeInfo(bool sync) const {
}
}
+void WiredTigerKVEngine::setOldestActiveTransactionTimestampCallback(
+ StorageEngine::OldestActiveTransactionTimestampCallback callback) {
+ stdx::lock_guard<stdx::mutex> lk(_oldestActiveTransactionTimestampCallbackMutex);
+ _oldestActiveTransactionTimestampCallback = std::move(callback);
+};
+
RecoveryUnit* WiredTigerKVEngine::newRecoveryUnit() {
return new WiredTigerRecoveryUnit(_sessionCache.get());
}
@@ -1528,9 +1555,7 @@ MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely);
} // namespace
-void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp,
- boost::optional<Timestamp> maximumTruncationTimestamp,
- bool force) {
+void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, bool force) {
if (stableTimestamp.isNull()) {
return;
}
@@ -1578,26 +1603,9 @@ void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp,
invariant(static_cast<std::size_t>(size) < sizeof(stableTSConfigString));
invariantWTOK(_conn->set_timestamp(_conn, stableTSConfigString));
+ // After publishing a stable timestamp to WT, we can record the updated stable timestamp value
+ // for the necessary oplog to keep.
_stableTimestamp.store(stableTimestamp.asULL());
-
- // After publishing a stable timestamp to WT, we can publish the updated value for the
- // necessary oplog to keep. Calls to this method require the min(stableTimestamp,
- // maximumTruncationTimestamp) to be monotonically increasing. This allows us to safely record
- // and publish a single value without additional concurrency control.
- if (maximumTruncationTimestamp) {
- // Until we discover otherwise, assume callers expect to obey the contract for proper
- // oplog truncation.
- DEV invariant(_oplogNeededForRollback.load() <=
- std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL()));
- _oplogNeededForRollback.store(
- std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL()));
- } else {
- // If there is no maximumTruncationTimestamp at this stable timestamp, WT is free to
- // truncate the oplog to any value behind the last stable timestamp, once it is
- // checkpointed.
- _oplogNeededForRollback.store(stableTimestamp.asULL());
- }
-
if (_checkpointThread && !_checkpointThread->hasTriggeredFirstStableCheckpoint()) {
_checkpointThread->triggerFirstStableCheckpoint(
prevStable, Timestamp(_initialDataTimestamp.load()), stableTimestamp);
@@ -1840,8 +1848,29 @@ boost::optional<Timestamp> WiredTigerKVEngine::getLastStableRecoveryTimestamp()
return boost::none;
}
-Timestamp WiredTigerKVEngine::getOplogNeededForRollback() const {
- return Timestamp(_oplogNeededForRollback.load());
+StatusWith<Timestamp> WiredTigerKVEngine::getOplogNeededForRollback() const {
+ // Get the current stable timestamp and use it throughout this function, ignoring updates from
+ // another thread.
+ auto stableTimestamp = _stableTimestamp.load();
+
+ // Only one thread can set or execute this callback.
+ stdx::lock_guard<stdx::mutex> lk(_oldestActiveTransactionTimestampCallbackMutex);
+ boost::optional<Timestamp> oldestActiveTransactionTimestamp;
+ if (_oldestActiveTransactionTimestampCallback) {
+ auto status = _oldestActiveTransactionTimestampCallback(Timestamp(stableTimestamp));
+ if (status.isOK()) {
+ oldestActiveTransactionTimestamp.swap(status.getValue());
+ } else {
+ LOG(1) << "getting oldest active transaction timestamp: " << status.getStatus();
+ return status.getStatus();
+ }
+ }
+
+ if (oldestActiveTransactionTimestamp) {
+ return std::min(oldestActiveTransactionTimestamp.value(), Timestamp(stableTimestamp));
+ } else {
+ return Timestamp(stableTimestamp);
+ }
}
boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery() const {
@@ -1853,21 +1882,37 @@ boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery()
}
Timestamp WiredTigerKVEngine::getPinnedOplog() const {
- stdx::lock_guard<stdx::mutex> lock(_oplogPinnedByBackupMutex);
- if (!storageGlobalParams.allowOplogTruncation) {
- // If oplog truncation is not allowed, then return the min timestamp so that no history is
- // ever allowed to be deleted.
- return Timestamp::min();
- }
- if (_oplogPinnedByBackup) {
- // All the oplog since `_oplogPinnedByBackup` should remain intact during the backup.
- return _oplogPinnedByBackup.get();
+ {
+ stdx::lock_guard<stdx::mutex> lock(_oplogPinnedByBackupMutex);
+ if (!storageGlobalParams.allowOplogTruncation) {
+ // If oplog truncation is not allowed, then return the min timestamp so that no history
+ // is
+ // ever allowed to be deleted.
+ return Timestamp::min();
+ }
+ if (_oplogPinnedByBackup) {
+ // All the oplog since `_oplogPinnedByBackup` should remain intact during the backup.
+ return _oplogPinnedByBackup.get();
+ }
}
+
+ auto oplogNeededForCrashRecovery = getOplogNeededForCrashRecovery();
if (!_keepDataHistory) {
// We use rollbackViaRefetch, so we only need to pin oplog for crash recovery.
- return getOplogNeededForCrashRecovery().value_or(Timestamp::max());
+ return oplogNeededForCrashRecovery.value_or(Timestamp::max());
+ }
+
+ if (oplogNeededForCrashRecovery) {
+ return oplogNeededForCrashRecovery.value();
}
- return getOplogNeededForCrashRecovery().value_or(getOplogNeededForRollback());
+
+ auto status = getOplogNeededForRollback();
+ if (status.isOK()) {
+ return status.getValue();
+ }
+
+ // If getOplogNeededForRollback fails, don't truncate any oplog right now.
+ return Timestamp::min();
}
bool WiredTigerKVEngine::supportsReadConcernSnapshot() const {