diff options
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp')
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 157 |
1 files changed, 101 insertions, 56 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index d9d07d00cd1..1655591d191 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -260,6 +260,20 @@ private: AtomicWord<bool> _shuttingDown{false}; }; +std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult& r) { + if (r.isOK()) { + if (r.getValue()) { + // Timestamp. + return r.getValue().value().toString(); + } else { + // boost::none. + return "null"; + } + } else { + return r.getStatus().toString(); + } +} + class WiredTigerKVEngine::WiredTigerCheckpointThread : public BackgroundJob { public: explicit WiredTigerCheckpointThread(WiredTigerKVEngine* wiredTigerKVEngine, @@ -285,28 +299,31 @@ public: wiredTigerGlobalOptions.checkpointDelaySecs))); } + // Might have been awakened by another thread shutting us down. + if (_shuttingDown.load()) { + break; + } + const Timestamp stableTimestamp = _wiredTigerKVEngine->getStableTimestamp(); const Timestamp initialDataTimestamp = _wiredTigerKVEngine->getInitialDataTimestamp(); // The amount of oplog to keep is primarily dictated by a user setting. However, in // unexpected cases, durable, recover to a timestamp storage engines may need to play // forward from an oplog entry that would otherwise be truncated by the user - // setting. Furthermore with prepared transactions, oplog entries can refer to - // previous oplog entries. + // setting. Furthermore, the entries in prepared or large transactions can refer to + // previous entries in the same transaction. // - // Live (replication) rollback will replay oplogs from exactly the stable - // timestamp. With prepared transactions, it may require some additional entries prior - // to the stable timestamp. These requirements are summarized in - // `getOplogNeededForRollback`. Truncating the oplog at this point is sufficient for - // in-memory configurations, but could cause an unrecoverable scenario if the node - // crashed and has to play from the last stable checkpoint. + // Live (replication) rollback will replay oplogs from exactly the stable timestamp. + // With prepared or large transactions, it may require some additional entries prior to + // the stable timestamp. These requirements are summarized in getOplogNeededForRollback. + // Truncating the oplog at this point is sufficient for in-memory configurations, but + // could cause an unrecoverable scenario if the node crashed and has to play from the + // last stable checkpoint. // // By recording the oplog needed for rollback "now", then taking a stable checkpoint, // we can safely assume that the oplog needed for crash recovery has caught up to the // recorded value. After the checkpoint, this value will be published such that actors // which truncate the oplog can read an updated value. - const Timestamp oplogNeededForRollback = - _wiredTigerKVEngine->getOplogNeededForRollback(); try { // Three cases: // @@ -331,21 +348,22 @@ public: << stableTimestamp.toString() << " InitialDataTimestamp: " << initialDataTimestamp.toString(); } else { - // 'stableTimestamp' is the smallest possible value at which WT will take a - // stable checkpoint. A newer stable timestamp may be used by WT if one is - // concurrently set. - LOG_FOR_RECOVERY(2) << "Performing stable checkpoint. StableTimestamp: " - << stableTimestamp; + auto oplogNeededForRollback = _wiredTigerKVEngine->getOplogNeededForRollback(); + + LOG_FOR_RECOVERY(2) + << "Performing stable checkpoint. StableTimestamp: " << stableTimestamp + << ", OplogNeededForRollback: " << toString(oplogNeededForRollback); UniqueWiredTigerSession session = _sessionCache->getSession(); WT_SESSION* s = session->getSession(); invariantWTOK(s->checkpoint(s, "use_timestamp=true")); - // Now that the checkpoint is durable, publish the oplog needed to recover - // from it. - { + if (oplogNeededForRollback.isOK()) { + // Now that the checkpoint is durable, publish the oplog needed to recover + // from it. stdx::lock_guard<stdx::mutex> lk(_oplogNeededForCrashRecoveryMutex); - _oplogNeededForCrashRecovery.store(oplogNeededForRollback.asULL()); + _oplogNeededForCrashRecovery.store( + oplogNeededForRollback.getValue().asULL()); } } } catch (const WriteConflictException&) { @@ -639,11 +657,14 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, _journalFlusher->go(); } + // Until the Replication layer installs a real callback, prevent truncating the oplog. + setOldestActiveTransactionTimestampCallback( + [](Timestamp) { return StatusWith(boost::make_optional(Timestamp::min())); }); + if (!_readOnly && !_ephemeral) { if (!_recoveryTimestamp.isNull()) { setInitialDataTimestamp(_recoveryTimestamp); - // The `maximumTruncationTimestamp` is not persisted, so choose a conservative value. - setStableTimestamp(_recoveryTimestamp, Timestamp::min(), false); + setStableTimestamp(_recoveryTimestamp, false); } _checkpointThread = @@ -1046,6 +1067,12 @@ void WiredTigerKVEngine::syncSizeInfo(bool sync) const { } } +void WiredTigerKVEngine::setOldestActiveTransactionTimestampCallback( + StorageEngine::OldestActiveTransactionTimestampCallback callback) { + stdx::lock_guard<stdx::mutex> lk(_oldestActiveTransactionTimestampCallbackMutex); + _oldestActiveTransactionTimestampCallback = std::move(callback); +}; + RecoveryUnit* WiredTigerKVEngine::newRecoveryUnit() { return new WiredTigerRecoveryUnit(_sessionCache.get()); } @@ -1528,9 +1555,7 @@ MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely); } // namespace -void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, - boost::optional<Timestamp> maximumTruncationTimestamp, - bool force) { +void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, bool force) { if (stableTimestamp.isNull()) { return; } @@ -1578,26 +1603,9 @@ void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, invariant(static_cast<std::size_t>(size) < sizeof(stableTSConfigString)); invariantWTOK(_conn->set_timestamp(_conn, stableTSConfigString)); + // After publishing a stable timestamp to WT, we can record the updated stable timestamp value + // for the necessary oplog to keep. _stableTimestamp.store(stableTimestamp.asULL()); - - // After publishing a stable timestamp to WT, we can publish the updated value for the - // necessary oplog to keep. Calls to this method require the min(stableTimestamp, - // maximumTruncationTimestamp) to be monotonically increasing. This allows us to safely record - // and publish a single value without additional concurrency control. - if (maximumTruncationTimestamp) { - // Until we discover otherwise, assume callers expect to obey the contract for proper - // oplog truncation. - DEV invariant(_oplogNeededForRollback.load() <= - std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL())); - _oplogNeededForRollback.store( - std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL())); - } else { - // If there is no maximumTruncationTimestamp at this stable timestamp, WT is free to - // truncate the oplog to any value behind the last stable timestamp, once it is - // checkpointed. - _oplogNeededForRollback.store(stableTimestamp.asULL()); - } - if (_checkpointThread && !_checkpointThread->hasTriggeredFirstStableCheckpoint()) { _checkpointThread->triggerFirstStableCheckpoint( prevStable, Timestamp(_initialDataTimestamp.load()), stableTimestamp); @@ -1840,8 +1848,29 @@ boost::optional<Timestamp> WiredTigerKVEngine::getLastStableRecoveryTimestamp() return boost::none; } -Timestamp WiredTigerKVEngine::getOplogNeededForRollback() const { - return Timestamp(_oplogNeededForRollback.load()); +StatusWith<Timestamp> WiredTigerKVEngine::getOplogNeededForRollback() const { + // Get the current stable timestamp and use it throughout this function, ignoring updates from + // another thread. + auto stableTimestamp = _stableTimestamp.load(); + + // Only one thread can set or execute this callback. + stdx::lock_guard<stdx::mutex> lk(_oldestActiveTransactionTimestampCallbackMutex); + boost::optional<Timestamp> oldestActiveTransactionTimestamp; + if (_oldestActiveTransactionTimestampCallback) { + auto status = _oldestActiveTransactionTimestampCallback(Timestamp(stableTimestamp)); + if (status.isOK()) { + oldestActiveTransactionTimestamp.swap(status.getValue()); + } else { + LOG(1) << "getting oldest active transaction timestamp: " << status.getStatus(); + return status.getStatus(); + } + } + + if (oldestActiveTransactionTimestamp) { + return std::min(oldestActiveTransactionTimestamp.value(), Timestamp(stableTimestamp)); + } else { + return Timestamp(stableTimestamp); + } } boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery() const { @@ -1853,21 +1882,37 @@ boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery() } Timestamp WiredTigerKVEngine::getPinnedOplog() const { - stdx::lock_guard<stdx::mutex> lock(_oplogPinnedByBackupMutex); - if (!storageGlobalParams.allowOplogTruncation) { - // If oplog truncation is not allowed, then return the min timestamp so that no history is - // ever allowed to be deleted. - return Timestamp::min(); - } - if (_oplogPinnedByBackup) { - // All the oplog since `_oplogPinnedByBackup` should remain intact during the backup. - return _oplogPinnedByBackup.get(); + { + stdx::lock_guard<stdx::mutex> lock(_oplogPinnedByBackupMutex); + if (!storageGlobalParams.allowOplogTruncation) { + // If oplog truncation is not allowed, then return the min timestamp so that no history + // is + // ever allowed to be deleted. + return Timestamp::min(); + } + if (_oplogPinnedByBackup) { + // All the oplog since `_oplogPinnedByBackup` should remain intact during the backup. + return _oplogPinnedByBackup.get(); + } } + + auto oplogNeededForCrashRecovery = getOplogNeededForCrashRecovery(); if (!_keepDataHistory) { // We use rollbackViaRefetch, so we only need to pin oplog for crash recovery. - return getOplogNeededForCrashRecovery().value_or(Timestamp::max()); + return oplogNeededForCrashRecovery.value_or(Timestamp::max()); + } + + if (oplogNeededForCrashRecovery) { + return oplogNeededForCrashRecovery.value(); } - return getOplogNeededForCrashRecovery().value_or(getOplogNeededForRollback()); + + auto status = getOplogNeededForRollback(); + if (status.isOK()) { + return status.getValue(); + } + + // If getOplogNeededForRollback fails, don't truncate any oplog right now. + return Timestamp::min(); } bool WiredTigerKVEngine::supportsReadConcernSnapshot() const { |