diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2019-03-07 17:11:12 -0500 |
---|---|---|
committer | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2019-03-21 21:22:24 -0400 |
commit | 78eaa3cf538764d5aa5a09c5997532a4c3b2bca8 (patch) | |
tree | 1b5fcc32ad4b9cc2369b9fcc7ae95be2b09da3f7 /src/mongo | |
parent | 9fa4a356cc1d89adc1edd4321117503ce90e2d4b (diff) | |
download | mongo-78eaa3cf538764d5aa5a09c5997532a4c3b2bca8.tar.gz |
SERVER-39679 Get oldest transaction time when snapshotting
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/db.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_engine.h | 11 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_storage_engine.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_storage_engine.h | 7 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine.h | 31 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 157 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h | 20 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 9 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 89 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 4 |
14 files changed, 354 insertions, 111 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index a3dc258684c..5df53cc0ef3 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -132,6 +132,7 @@ #include "mongo/db/storage/storage_engine_lock_file.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/system_index.h" +#include "mongo/db/transaction_participant.h" #include "mongo/db/ttl.h" #include "mongo/db/wire_version.h" #include "mongo/executor/network_connection_hook.h" @@ -522,9 +523,16 @@ ExitCode _initAndListen(int listenPort) { startFreeMonitoring(serviceContext); + auto replCoord = repl::ReplicationCoordinator::get(startupOpCtx.get()); + invariant(replCoord); + if (replCoord->isReplEnabled()) { + storageEngine->setOldestActiveTransactionTimestampCallback( + TransactionParticipant::getOldestActiveTimestamp); + } + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { // Note: For replica sets, ShardingStateRecovery happens on transition to primary. - if (!repl::ReplicationCoordinator::get(startupOpCtx.get())->isReplEnabled()) { + if (!replCoord->isReplEnabled()) { if (ShardingState::get(startupOpCtx.get())->enabled()) { uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get())); } @@ -553,7 +561,7 @@ ExitCode _initAndListen(int listenPort) { stdx::make_unique<LogicalTimeValidator>(keyManager)); } - repl::ReplicationCoordinator::get(startupOpCtx.get())->startup(startupOpCtx.get()); + replCoord->startup(startupOpCtx.get()); if (getReplSetMemberInStandaloneMode(serviceContext)) { log() << startupWarningsLog; log() << "** WARNING: mongod started without --replSet yet document(s) are present in " diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 91e4bef476c..8a61b54f0b3 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -1423,7 +1423,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, log() << "Forcing the stable timestamp to the common point: " << fixUpInfo.commonPoint.getTimestamp(); opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp( - fixUpInfo.commonPoint.getTimestamp(), boost::none, force); + fixUpInfo.commonPoint.getTimestamp(), force); // We must not take a stable checkpoint until it is guaranteed to include all writes from // before the rollback (i.e. the stable timestamp is at least the local top of oplog). In diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 166a1a08a1c..da758476930 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -1073,7 +1073,7 @@ Status StorageInterfaceImpl::upgradeNonReplicatedUniqueIndexes(OperationContext* } void StorageInterfaceImpl::setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) { - serviceCtx->getStorageEngine()->setStableTimestamp(snapshotName, boost::none); + serviceCtx->getStorageEngine()->setStableTimestamp(snapshotName); } void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx, diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index cdbd34697f9..621314e1a3d 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -39,6 +39,7 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/storage/kv/kv_prefix.h" #include "mongo/db/storage/record_store.h" +#include "mongo/db/storage/storage_engine.h" namespace mongo { @@ -307,9 +308,7 @@ public: /** * See `StorageEngine::setStableTimestamp` */ - virtual void setStableTimestamp(Timestamp stableTimestamp, - boost::optional<Timestamp> maximumTruncationTimestamp, - bool force) {} + virtual void setStableTimestamp(Timestamp stableTimestamp, bool force) {} /** * See `StorageEngine::setInitialDataTimestamp` @@ -322,6 +321,12 @@ public: virtual void setOldestTimestampFromStable() {} /** + * See `StorageEngine::setOldestActiveTransactionTimestampCallback` + */ + virtual void setOldestActiveTransactionTimestampCallback( + StorageEngine::OldestActiveTransactionTimestampCallback callback){}; + + /** * See `StorageEngine::setOldestTimestamp` */ virtual void setOldestTimestamp(Timestamp newOldestTimestamp, bool force) {} diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp index b10de937bb7..655935a3043 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp @@ -716,10 +716,8 @@ void KVStorageEngine::setJournalListener(JournalListener* jl) { _engine->setJournalListener(jl); } -void KVStorageEngine::setStableTimestamp(Timestamp stableTimestamp, - boost::optional<Timestamp> maximumTruncationTimestamp, - bool force) { - _engine->setStableTimestamp(stableTimestamp, maximumTruncationTimestamp, force); +void KVStorageEngine::setStableTimestamp(Timestamp stableTimestamp, bool force) { + _engine->setStableTimestamp(stableTimestamp, force); } void KVStorageEngine::setInitialDataTimestamp(Timestamp initialDataTimestamp) { @@ -736,6 +734,11 @@ void KVStorageEngine::setOldestTimestamp(Timestamp newOldestTimestamp) { _engine->setOldestTimestamp(newOldestTimestamp, force); } +void KVStorageEngine::setOldestActiveTransactionTimestampCallback( + StorageEngine::OldestActiveTransactionTimestampCallback callback) { + _engine->setOldestActiveTransactionTimestampCallback(callback); +} + bool KVStorageEngine::isCacheUnderPressure(OperationContext* opCtx) const { return _engine->isCacheUnderPressure(opCtx); } diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h index d5d4f1dade5..c1c64a51fa8 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.h +++ b/src/mongo/db/storage/kv/kv_storage_engine.h @@ -146,9 +146,7 @@ public: virtual void cleanShutdown(); - virtual void setStableTimestamp(Timestamp stableTimestamp, - boost::optional<Timestamp> maximumTruncationTimestamp, - bool force = false) override; + virtual void setStableTimestamp(Timestamp stableTimestamp, bool force = false) override; virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) override; @@ -156,6 +154,9 @@ public: virtual void setOldestTimestamp(Timestamp newOldestTimestamp) override; + virtual void setOldestActiveTransactionTimestampCallback( + StorageEngine::OldestActiveTransactionTimestampCallback) override; + virtual bool isCacheUnderPressure(OperationContext* opCtx) const override; virtual void setCachePressureForTest(int pressure) override; diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 1f3f111340b..74f6687976c 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -36,6 +36,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" #include "mongo/db/storage/temporary_record_store.h" +#include "mongo/util/functional.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -57,6 +58,15 @@ class StorageEngineMetadata; class StorageEngine { public: /** + * When the storage engine needs to know how much oplog to preserve for the sake of active + * transactions, it executes a callback that returns either the oldest active transaction + * timestamp, or boost::none if there is no active transaction, or an error if it fails. + */ + using OldestActiveTransactionTimestampResult = StatusWith<boost::optional<Timestamp>>; + using OldestActiveTransactionTimestampCallback = + std::function<OldestActiveTransactionTimestampResult(Timestamp stableTimestamp)>; + + /** * The interface for creating new instances of storage engines. * * A storage engine provides an instance of this class (along with an associated @@ -416,19 +426,8 @@ public: * Sets the highest timestamp at which the storage engine is allowed to take a checkpoint. This * timestamp must not decrease unless force=true is set, in which case we force the stable * timestamp, the oldest timestamp, and the commit timestamp backward. - * - * The maximumTruncationTimestamp (and newer) must not be truncated from the oplog in order to - * recover from the `stableTimestamp`. `boost::none` implies there are no additional - * constraints to what may be truncated. - * - * For proper truncation of the oplog, this method requires min(stableTimestamp, - * maximumTruncationTimestamp) to be monotonically increasing (where `min(stableTimestamp, - * boost::none) => stableTimestamp`). Otherwise truncation can race and remove a document - * before a call to this method protects it. */ - virtual void setStableTimestamp(Timestamp stableTimestamp, - boost::optional<Timestamp> maximumTruncationTimestamp, - bool force = false) {} + virtual void setStableTimestamp(Timestamp stableTimestamp, bool force = false) {} /** * Tells the storage engine the timestamp of the data at startup. This is necessary because @@ -455,6 +454,14 @@ public: virtual void setOldestTimestamp(Timestamp timestamp) {} /** + * Sets a callback which returns the timestamp of the oldest oplog entry involved in an + * active MongoDB transaction. The storage engine calls this function to determine how much + * oplog it must preserve. + */ + virtual void setOldestActiveTransactionTimestampCallback( + OldestActiveTransactionTimestampCallback callback){}; + + /** * Indicates whether the storage engine cache is under pressure. * * Retrieves a cache pressure value in the range [0, 100] from the storage engine, and compares diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index d9d07d00cd1..1655591d191 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -260,6 +260,20 @@ private: AtomicWord<bool> _shuttingDown{false}; }; +std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult& r) { + if (r.isOK()) { + if (r.getValue()) { + // Timestamp. + return r.getValue().value().toString(); + } else { + // boost::none. + return "null"; + } + } else { + return r.getStatus().toString(); + } +} + class WiredTigerKVEngine::WiredTigerCheckpointThread : public BackgroundJob { public: explicit WiredTigerCheckpointThread(WiredTigerKVEngine* wiredTigerKVEngine, @@ -285,28 +299,31 @@ public: wiredTigerGlobalOptions.checkpointDelaySecs))); } + // Might have been awakened by another thread shutting us down. + if (_shuttingDown.load()) { + break; + } + const Timestamp stableTimestamp = _wiredTigerKVEngine->getStableTimestamp(); const Timestamp initialDataTimestamp = _wiredTigerKVEngine->getInitialDataTimestamp(); // The amount of oplog to keep is primarily dictated by a user setting. However, in // unexpected cases, durable, recover to a timestamp storage engines may need to play // forward from an oplog entry that would otherwise be truncated by the user - // setting. Furthermore with prepared transactions, oplog entries can refer to - // previous oplog entries. + // setting. Furthermore, the entries in prepared or large transactions can refer to + // previous entries in the same transaction. // - // Live (replication) rollback will replay oplogs from exactly the stable - // timestamp. With prepared transactions, it may require some additional entries prior - // to the stable timestamp. These requirements are summarized in - // `getOplogNeededForRollback`. Truncating the oplog at this point is sufficient for - // in-memory configurations, but could cause an unrecoverable scenario if the node - // crashed and has to play from the last stable checkpoint. + // Live (replication) rollback will replay oplogs from exactly the stable timestamp. + // With prepared or large transactions, it may require some additional entries prior to + // the stable timestamp. These requirements are summarized in getOplogNeededForRollback. + // Truncating the oplog at this point is sufficient for in-memory configurations, but + // could cause an unrecoverable scenario if the node crashed and has to play from the + // last stable checkpoint. // // By recording the oplog needed for rollback "now", then taking a stable checkpoint, // we can safely assume that the oplog needed for crash recovery has caught up to the // recorded value. After the checkpoint, this value will be published such that actors // which truncate the oplog can read an updated value. - const Timestamp oplogNeededForRollback = - _wiredTigerKVEngine->getOplogNeededForRollback(); try { // Three cases: // @@ -331,21 +348,22 @@ public: << stableTimestamp.toString() << " InitialDataTimestamp: " << initialDataTimestamp.toString(); } else { - // 'stableTimestamp' is the smallest possible value at which WT will take a - // stable checkpoint. A newer stable timestamp may be used by WT if one is - // concurrently set. - LOG_FOR_RECOVERY(2) << "Performing stable checkpoint. StableTimestamp: " - << stableTimestamp; + auto oplogNeededForRollback = _wiredTigerKVEngine->getOplogNeededForRollback(); + + LOG_FOR_RECOVERY(2) + << "Performing stable checkpoint. StableTimestamp: " << stableTimestamp + << ", OplogNeededForRollback: " << toString(oplogNeededForRollback); UniqueWiredTigerSession session = _sessionCache->getSession(); WT_SESSION* s = session->getSession(); invariantWTOK(s->checkpoint(s, "use_timestamp=true")); - // Now that the checkpoint is durable, publish the oplog needed to recover - // from it. - { + if (oplogNeededForRollback.isOK()) { + // Now that the checkpoint is durable, publish the oplog needed to recover + // from it. stdx::lock_guard<stdx::mutex> lk(_oplogNeededForCrashRecoveryMutex); - _oplogNeededForCrashRecovery.store(oplogNeededForRollback.asULL()); + _oplogNeededForCrashRecovery.store( + oplogNeededForRollback.getValue().asULL()); } } } catch (const WriteConflictException&) { @@ -639,11 +657,14 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, _journalFlusher->go(); } + // Until the Replication layer installs a real callback, prevent truncating the oplog. + setOldestActiveTransactionTimestampCallback( + [](Timestamp) { return StatusWith(boost::make_optional(Timestamp::min())); }); + if (!_readOnly && !_ephemeral) { if (!_recoveryTimestamp.isNull()) { setInitialDataTimestamp(_recoveryTimestamp); - // The `maximumTruncationTimestamp` is not persisted, so choose a conservative value. - setStableTimestamp(_recoveryTimestamp, Timestamp::min(), false); + setStableTimestamp(_recoveryTimestamp, false); } _checkpointThread = @@ -1046,6 +1067,12 @@ void WiredTigerKVEngine::syncSizeInfo(bool sync) const { } } +void WiredTigerKVEngine::setOldestActiveTransactionTimestampCallback( + StorageEngine::OldestActiveTransactionTimestampCallback callback) { + stdx::lock_guard<stdx::mutex> lk(_oldestActiveTransactionTimestampCallbackMutex); + _oldestActiveTransactionTimestampCallback = std::move(callback); +}; + RecoveryUnit* WiredTigerKVEngine::newRecoveryUnit() { return new WiredTigerRecoveryUnit(_sessionCache.get()); } @@ -1528,9 +1555,7 @@ MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely); } // namespace -void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, - boost::optional<Timestamp> maximumTruncationTimestamp, - bool force) { +void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, bool force) { if (stableTimestamp.isNull()) { return; } @@ -1578,26 +1603,9 @@ void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, invariant(static_cast<std::size_t>(size) < sizeof(stableTSConfigString)); invariantWTOK(_conn->set_timestamp(_conn, stableTSConfigString)); + // After publishing a stable timestamp to WT, we can record the updated stable timestamp value + // for the necessary oplog to keep. _stableTimestamp.store(stableTimestamp.asULL()); - - // After publishing a stable timestamp to WT, we can publish the updated value for the - // necessary oplog to keep. Calls to this method require the min(stableTimestamp, - // maximumTruncationTimestamp) to be monotonically increasing. This allows us to safely record - // and publish a single value without additional concurrency control. - if (maximumTruncationTimestamp) { - // Until we discover otherwise, assume callers expect to obey the contract for proper - // oplog truncation. - DEV invariant(_oplogNeededForRollback.load() <= - std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL())); - _oplogNeededForRollback.store( - std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL())); - } else { - // If there is no maximumTruncationTimestamp at this stable timestamp, WT is free to - // truncate the oplog to any value behind the last stable timestamp, once it is - // checkpointed. - _oplogNeededForRollback.store(stableTimestamp.asULL()); - } - if (_checkpointThread && !_checkpointThread->hasTriggeredFirstStableCheckpoint()) { _checkpointThread->triggerFirstStableCheckpoint( prevStable, Timestamp(_initialDataTimestamp.load()), stableTimestamp); @@ -1840,8 +1848,29 @@ boost::optional<Timestamp> WiredTigerKVEngine::getLastStableRecoveryTimestamp() return boost::none; } -Timestamp WiredTigerKVEngine::getOplogNeededForRollback() const { - return Timestamp(_oplogNeededForRollback.load()); +StatusWith<Timestamp> WiredTigerKVEngine::getOplogNeededForRollback() const { + // Get the current stable timestamp and use it throughout this function, ignoring updates from + // another thread. + auto stableTimestamp = _stableTimestamp.load(); + + // Only one thread can set or execute this callback. + stdx::lock_guard<stdx::mutex> lk(_oldestActiveTransactionTimestampCallbackMutex); + boost::optional<Timestamp> oldestActiveTransactionTimestamp; + if (_oldestActiveTransactionTimestampCallback) { + auto status = _oldestActiveTransactionTimestampCallback(Timestamp(stableTimestamp)); + if (status.isOK()) { + oldestActiveTransactionTimestamp.swap(status.getValue()); + } else { + LOG(1) << "getting oldest active transaction timestamp: " << status.getStatus(); + return status.getStatus(); + } + } + + if (oldestActiveTransactionTimestamp) { + return std::min(oldestActiveTransactionTimestamp.value(), Timestamp(stableTimestamp)); + } else { + return Timestamp(stableTimestamp); + } } boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery() const { @@ -1853,21 +1882,37 @@ boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery() } Timestamp WiredTigerKVEngine::getPinnedOplog() const { - stdx::lock_guard<stdx::mutex> lock(_oplogPinnedByBackupMutex); - if (!storageGlobalParams.allowOplogTruncation) { - // If oplog truncation is not allowed, then return the min timestamp so that no history is - // ever allowed to be deleted. - return Timestamp::min(); - } - if (_oplogPinnedByBackup) { - // All the oplog since `_oplogPinnedByBackup` should remain intact during the backup. - return _oplogPinnedByBackup.get(); + { + stdx::lock_guard<stdx::mutex> lock(_oplogPinnedByBackupMutex); + if (!storageGlobalParams.allowOplogTruncation) { + // If oplog truncation is not allowed, then return the min timestamp so that no history + // is + // ever allowed to be deleted. + return Timestamp::min(); + } + if (_oplogPinnedByBackup) { + // All the oplog since `_oplogPinnedByBackup` should remain intact during the backup. + return _oplogPinnedByBackup.get(); + } } + + auto oplogNeededForCrashRecovery = getOplogNeededForCrashRecovery(); if (!_keepDataHistory) { // We use rollbackViaRefetch, so we only need to pin oplog for crash recovery. - return getOplogNeededForCrashRecovery().value_or(Timestamp::max()); + return oplogNeededForCrashRecovery.value_or(Timestamp::max()); + } + + if (oplogNeededForCrashRecovery) { + return oplogNeededForCrashRecovery.value(); } - return getOplogNeededForCrashRecovery().value_or(getOplogNeededForRollback()); + + auto status = getOplogNeededForRollback(); + if (status.isOK()) { + return status.getValue(); + } + + // If getOplogNeededForRollback fails, don't truncate any oplog right now. + return Timestamp::min(); } bool WiredTigerKVEngine::supportsReadConcernSnapshot() const { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 7c5328476e5..1b1c9af1648 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -39,6 +39,7 @@ #include "mongo/bson/ordering.h" #include "mongo/bson/timestamp.h" #include "mongo/db/storage/kv/kv_engine.h" +#include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h" #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" @@ -94,6 +95,9 @@ public: return _ephemeral; } + void setOldestActiveTransactionTimestampCallback( + StorageEngine::OldestActiveTransactionTimestampCallback callback) override; + RecoveryUnit* newRecoveryUnit() override; Status createRecordStore(OperationContext* opCtx, @@ -193,9 +197,7 @@ public: void setJournalListener(JournalListener* jl) final; - void setStableTimestamp(Timestamp stableTimestamp, - boost::optional<Timestamp> maximumTruncationTimestamp, - bool force) override; + void setStableTimestamp(Timestamp stableTimestamp, bool force) override; void setInitialDataTimestamp(Timestamp initialDataTimestamp) override; @@ -321,10 +323,11 @@ public: /** * Returns the minimum possible Timestamp value in the oplog that replication may need for - * recovery in the event of a rollback. This value gets updated on every `setStableTimestamp` - * call. + * recovery in the event of a rollback. This value depends on the timestamp passed to + * `setStableTimestamp` and on the set of active MongoDB transactions. Returns an error if it + * times out querying the active transctions. */ - Timestamp getOplogNeededForRollback() const; + StatusWith<Timestamp> getOplogNeededForRollback() const; /** * Returns the minimum possible Timestamp value in the oplog that replication may need for @@ -405,6 +408,10 @@ private: std::uint64_t _getCheckpointTimestamp() const; + mutable stdx::mutex _oldestActiveTransactionTimestampCallbackMutex; + StorageEngine::OldestActiveTransactionTimestampCallback + _oldestActiveTransactionTimestampCallback; + WT_CONNECTION* _conn; WiredTigerFileVersion _fileVersion; WiredTigerEventHandler _eventHandler; @@ -458,7 +465,6 @@ private: // Tracks the stable and oldest timestamps we've set on the storage engine. AtomicWord<std::uint64_t> _oldestTimestamp; AtomicWord<std::uint64_t> _stableTimestamp; - AtomicWord<std::uint64_t> _oplogNeededForRollback{Timestamp::min().asULL()}; // Timestamp of data at startup. Used internally to advise checkpointing and recovery to a // timestamp. Provided by replication layer because WT does not persist timestamps. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp index a9b6f3a5d83..9229c4df0f8 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp @@ -240,13 +240,30 @@ TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) { _engine->setInitialDataTimestamp(Timestamp(1, 1)); wiredTigerGlobalOptions.checkpointDelaySecs = 1; + // Simulate the callback that queries config.transactions for the oldest active transaction. + boost::optional<Timestamp> oldestActiveTxnTimestamp; + AtomicWord<bool> callbackShouldFail{false}; + auto callback = [&](Timestamp stableTimestamp) { + using ResultType = StorageEngine::OldestActiveTransactionTimestampResult; + if (callbackShouldFail.load()) { + return ResultType(ErrorCodes::ExceededTimeLimit, "timeout"); + } + + return ResultType(oldestActiveTxnTimestamp); + }; + + _engine->setOldestActiveTransactionTimestampCallback(callback); + // A method that will poll the WiredTigerKVEngine until it sees the amount of oplog necessary // for crash recovery exceeds the input. auto assertPinnedMovesSoon = [this](Timestamp newPinned) { // If the current oplog needed for rollback does not exceed the requested pinned out, we // cannot expect the CheckpointThread to eventually publish a sufficient crash recovery // value. - ASSERT_TRUE(_engine->getOplogNeededForRollback() >= newPinned); + auto needed = _engine->getOplogNeededForRollback(); + if (needed.isOK()) { + ASSERT_TRUE(needed.getValue() >= newPinned); + } // Do 100 iterations that sleep for 100 milliseconds between polls. This will wait for up // to 10 seconds to observe an asynchronous update that iterates once per second. @@ -264,17 +281,31 @@ TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) { FAIL(""); }; - _engine->setStableTimestamp(Timestamp(10, 1), boost::none, false); + oldestActiveTxnTimestamp = boost::none; + _engine->setStableTimestamp(Timestamp(10, 1), false); assertPinnedMovesSoon(Timestamp(10, 1)); - _engine->setStableTimestamp(Timestamp(20, 1), Timestamp(15, 1), false); + oldestActiveTxnTimestamp = Timestamp(15, 1); + _engine->setStableTimestamp(Timestamp(20, 1), false); assertPinnedMovesSoon(Timestamp(15, 1)); - _engine->setStableTimestamp(Timestamp(30, 1), Timestamp(19, 1), false); + oldestActiveTxnTimestamp = Timestamp(19, 1); + _engine->setStableTimestamp(Timestamp(30, 1), false); assertPinnedMovesSoon(Timestamp(19, 1)); - _engine->setStableTimestamp(Timestamp(30, 1), boost::none, false); + oldestActiveTxnTimestamp = boost::none; + _engine->setStableTimestamp(Timestamp(30, 1), false); assertPinnedMovesSoon(Timestamp(30, 1)); + + callbackShouldFail.store(true); + ASSERT_NOT_OK(_engine->getOplogNeededForRollback()); + _engine->setStableTimestamp(Timestamp(40, 1), false); + // Await a new checkpoint. Oplog needed for rollback does not advance. + sleepmillis(1100); + ASSERT_EQ(_engine->getOplogNeededForCrashRecovery().get(), Timestamp(30, 1)); + _engine->setStableTimestamp(Timestamp(30, 1), false); + callbackShouldFail.store(false); + assertPinnedMovesSoon(Timestamp(40, 1)); } std::unique_ptr<KVHarnessHelper> makeHelper() { diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 7e5d3334b3d..16cec7c5e7a 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -36,6 +36,7 @@ #include "mongo/db/transaction_participant.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/commands/test_commands_enabled.h" @@ -332,19 +333,63 @@ void TransactionParticipant::performNoopWrite(OperationContext* opCtx, StringDat } } -boost::optional<Timestamp> TransactionParticipant::getOldestActiveTimestamp( - OperationContext* opCtx) { - DBDirectClient client(opCtx); - Query q(BSON(SessionTxnRecord::kStateFieldName << "prepared")); - q.sort(SessionTxnRecord::kStartOpTimeFieldName.toString()); - auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), q); - if (result.isEmpty()) { - return boost::none; - } +StorageEngine::OldestActiveTransactionTimestampResult +TransactionParticipant::getOldestActiveTimestamp(Timestamp stableTimestamp) { + // Read from config.transactions at the stable timestamp for the oldest active transaction + // timestamp. Use a short timeout: another thread might have the global lock e.g. to shut down + // the server, and it both blocks this thread from querying config.transactions and waits for + // this thread to terminate. + auto client = getGlobalServiceContext()->makeClient("OldestActiveTxnTimestamp"); + AlternativeClientRegion acr(client); + + try { + auto opCtx = cc().makeOperationContext(); + auto nss = NamespaceString::kSessionTransactionsTableNamespace; + auto deadline = Date_t::now() + Milliseconds(100); + Lock::DBLock dbLock(opCtx.get(), nss.db(), MODE_IS, deadline); + Lock::CollectionLock collLock(opCtx.get()->lockState(), nss.toString(), MODE_IS, deadline); + + auto databaseHolder = DatabaseHolder::get(opCtx.get()); + auto db = databaseHolder->getDb(opCtx.get(), nss.db()); + if (!db) { + // There is no config database, so there cannot be any active transactions. + return boost::none; + } + + auto collection = db->getCollection(opCtx.get(), nss); + if (!collection) { + return boost::none; + } - auto txnRecord = - SessionTxnRecord::parse(IDLParserErrorContext("parse oldest active txn record"), result); - return txnRecord.getStartOpTime()->getTimestamp(); + if (!stableTimestamp.isNull()) { + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + stableTimestamp); + } + + // Scan. We guess that occasional scans are cheaper than the write overhead of an index. + boost::optional<Timestamp> oldestTxnTimestamp; + auto cursor = collection->getCursor(opCtx.get()); + while (auto record = cursor->next()) { + auto doc = record.get().data.toBson(); + auto txnRecord = SessionTxnRecord::parse( + IDLParserErrorContext("parse oldest active txn record"), doc); + if (txnRecord.getState() != DurableTxnStateEnum::kPrepared) { + continue; + } + + // A prepared transaction must have a start timestamp. + // TODO(SERVER-40013): Handle entries with state "prepared" and no "startTimestamp". + invariant(txnRecord.getStartOpTime()); + auto ts = txnRecord.getStartOpTime()->getTimestamp(); + if (!oldestTxnTimestamp || ts < oldestTxnTimestamp.value()) { + oldestTxnTimestamp = ts; + } + } + + return oldestTxnTimestamp; + } catch (const DBException&) { + return exceptionToStatus(); + } } const LogicalSessionId& TransactionParticipant::Observer::_sessionId() const { diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 6d05089b284..2cbb0aaaf7d 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -48,6 +48,7 @@ #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/single_transaction_stats.h" #include "mongo/db/storage/recovery_unit.h" +#include "mongo/db/storage/storage_engine.h" #include "mongo/db/transaction_metrics_observer.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/assert_util.h" @@ -774,10 +775,12 @@ public: /** - * Returns the timestamp of the oldest oplog entry written across all open transactions. - * Returns boost::none if there are no active transactions. + * Returns the timestamp of the oldest oplog entry written across all open transactions, at the + * time of the stable timestamp. Returns boost::none if there are no active transactions, or an + * error if it fails. */ - static boost::optional<Timestamp> getOldestActiveTimestamp(OperationContext* opCtx); + static StorageEngine::OldestActiveTransactionTimestampResult getOldestActiveTimestamp( + Timestamp stableTimestamp); /** * Append a no-op to the oplog, for cases where we haven't written in this unit of work but diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 3a20184e9dd..97070fb2fc6 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -29,6 +29,8 @@ #include "mongo/platform/basic.h" +#include <boost/optional/optional_io.hpp> + #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" @@ -3820,5 +3822,92 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfAborted) { ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); } +TEST_F(TxnParticipantTest, OldestActiveTransactionTimestamp) { + auto nss = NamespaceString::kSessionTransactionsTableNamespace; + + auto insertTxnRecord = [&](unsigned i) { + Timestamp ts(1, i); + SessionTxnRecord record; + record.setStartOpTime(repl::OpTime(ts, 0)); + record.setState(DurableTxnStateEnum::kPrepared); + record.setSessionId(makeLogicalSessionIdForTest()); + record.setTxnNum(1); + record.setLastWriteOpTime(repl::OpTime(ts, 0)); + record.setLastWriteDate(Date_t::now()); + + AutoGetOrCreateDb autoDb(opCtx(), nss.db(), MODE_X); + WriteUnitOfWork wuow(opCtx()); + auto coll = autoDb.getDb()->getCollection(opCtx(), nss.ns()); + ASSERT(coll); + OpDebug* const nullOpDebug = nullptr; + ASSERT_OK( + coll->insertDocument(opCtx(), InsertStatement(record.toBSON()), nullOpDebug, false)); + wuow.commit(); + }; + + auto deleteTxnRecord = [&](unsigned i) { + Timestamp ts(1, i); + AutoGetOrCreateDb autoDb(opCtx(), nss.db(), MODE_X); + WriteUnitOfWork wuow(opCtx()); + auto coll = autoDb.getDb()->getCollection(opCtx(), nss.ns()); + ASSERT(coll); + auto cursor = coll->getCursor(opCtx()); + while (auto record = cursor->next()) { + auto bson = record.get().data.toBson(); + if (bson["state"].String() != "prepared"_sd) { + continue; + } + + if (bson["startOpTime"]["ts"].timestamp() == ts) { + coll->deleteDocument(opCtx(), kUninitializedStmtId, record->id, nullptr); + wuow.commit(); + return; + } + } + FAIL(mongoutils::str::stream() << "No prepared transaction with start timestamp (1, " << i + << ")"); + }; + + auto oldestActiveTransactionTS = [&]() { + return TransactionParticipant::getOldestActiveTimestamp(Timestamp()).getValue(); + }; + + auto assertOldestActiveTS = [&](boost::optional<unsigned> i) { + if (i.has_value()) { + ASSERT_EQ(Timestamp(1, i.value()), oldestActiveTransactionTS()); + } else { + ASSERT_EQ(boost::none, oldestActiveTransactionTS()); + } + }; + + assertOldestActiveTS(boost::none); + insertTxnRecord(1); + assertOldestActiveTS(1); + insertTxnRecord(2); + assertOldestActiveTS(1); + deleteTxnRecord(1); + assertOldestActiveTS(2); + deleteTxnRecord(2); + assertOldestActiveTS(boost::none); + + // Add a newer transaction, then an older one, to test that order doesn't matter. + insertTxnRecord(4); + insertTxnRecord(3); + assertOldestActiveTS(3); + deleteTxnRecord(4); + assertOldestActiveTS(3); + deleteTxnRecord(3); + assertOldestActiveTS(boost::none); +}; + +TEST_F(TxnParticipantTest, OldestActiveTransactionTimestampTimeout) { + // Block getOldestActiveTimestamp() by locking the config database. + auto nss = NamespaceString::kSessionTransactionsTableNamespace; + AutoGetOrCreateDb autoDb(opCtx(), nss.db(), MODE_X); + auto statusWith = TransactionParticipant::getOldestActiveTimestamp(Timestamp()); + ASSERT_FALSE(statusWith.isOK()); + ASSERT_TRUE(ErrorCodes::isInterruption(statusWith.getStatus().code())); +}; + } // namespace } // namespace mongo diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 2a326ced824..6a45c1d0223 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -432,8 +432,8 @@ public: void assertOldestActiveTxnTimestampEquals(const boost::optional<Timestamp>& ts, const Timestamp& atTs) { - OneOffRead oor(_opCtx, atTs); - ASSERT_EQ(TransactionParticipant::getOldestActiveTimestamp(_opCtx), ts); + auto oldest = TransactionParticipant::getOldestActiveTimestamp(atTs); + ASSERT_EQ(oldest, ts); } void assertHasStartOpTime() { |