diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2018-09-17 16:07:11 -0400 |
---|---|---|
committer | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2018-09-17 16:07:11 -0400 |
commit | beba8d70803cc14768c577bc7ec1aff5c0c352ed (patch) | |
tree | 2ce61feed4d61c42a437245edfa475354ab5a429 /src/mongo | |
parent | 9f634fbb3b49e1a591f054b097caf862f192f0c2 (diff) | |
download | mongo-beba8d70803cc14768c577bc7ec1aff5c0c352ed.tar.gz |
SERVER-36811: Save oplog dating back to oldest actively prepared transaction.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_engine.h | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_storage_engine.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_storage_engine.h | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine.h | 12 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h | 49 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp | 61 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h | 4 |
10 files changed, 187 insertions, 71 deletions
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 0ff07324b01..044ae209c1c 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -1053,7 +1053,7 @@ Status StorageInterfaceImpl::upgradeNonReplicatedUniqueIndexes(OperationContext* } void StorageInterfaceImpl::setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) { - serviceCtx->getStorageEngine()->setStableTimestamp(snapshotName); + serviceCtx->getStorageEngine()->setStableTimestamp(snapshotName, boost::none); } void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx, diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index 5c078be40e0..48e7986c9ea 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -300,7 +300,8 @@ public: /** * See `StorageEngine::setStableTimestamp` */ - virtual void setStableTimestamp(Timestamp stableTimestamp) {} + virtual void setStableTimestamp(Timestamp stableTimestamp, + boost::optional<Timestamp> maximumTruncationTimestamp) {} /** * See `StorageEngine::setInitialDataTimestamp` diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp index 8980b43ab43..fab3545f636 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp @@ -629,8 +629,9 @@ void KVStorageEngine::setJournalListener(JournalListener* jl) { _engine->setJournalListener(jl); } -void KVStorageEngine::setStableTimestamp(Timestamp stableTimestamp) { - _engine->setStableTimestamp(stableTimestamp); +void KVStorageEngine::setStableTimestamp(Timestamp stableTimestamp, + boost::optional<Timestamp> maximumTruncationTimestamp) { + _engine->setStableTimestamp(stableTimestamp, maximumTruncationTimestamp); } void KVStorageEngine::setInitialDataTimestamp(Timestamp initialDataTimestamp) { diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h index 97eacf584eb..a25b5438c30 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.h +++ b/src/mongo/db/storage/kv/kv_storage_engine.h @@ -122,7 +122,8 @@ public: virtual void cleanShutdown(); - virtual void setStableTimestamp(Timestamp stableTimestamp) override; + virtual void setStableTimestamp(Timestamp stableTimestamp, + boost::optional<Timestamp> maximumTruncationTimestamp) override; virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) override; diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 1f97ad2d0d1..6452044ce30 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -372,8 +372,18 @@ public: /** * Sets the highest timestamp at which the storage engine is allowed to take a checkpoint. * This timestamp can never decrease, and thus should be a timestamp that can never roll back. + * + * The maximumTruncationTimestamp (and newer) must not be truncated from the oplog in order to + * recover from the `stableTimestamp`. `boost::none` implies there are no additional + * constraints to what may be truncated. + * + * For proper truncation of the oplog, this method requires min(stableTimestamp, + * maximumTruncationTimestamp) to be monotonically increasing (where `min(stableTimestamp, + * boost::none) => stableTimestamp`). Otherwise truncation can race and remove a document + * before a call to this method protects it. */ - virtual void setStableTimestamp(Timestamp timestamp) {} + virtual void setStableTimestamp(Timestamp stableTimestamp, + boost::optional<Timestamp> maximumTruncationTimestamp) {} /** * Tells the storage engine the timestamp of the data at startup. This is necessary because diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 94eea809a29..79326f3f1cc 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -282,6 +282,26 @@ public: const Timestamp stableTimestamp = _wiredTigerKVEngine->getStableTimestamp(); const Timestamp initialDataTimestamp = _wiredTigerKVEngine->getInitialDataTimestamp(); + + // The amount of oplog to keep is primarily dictated by a user setting. However, in + // unexpected cases, durable, recover to a timestamp storage engines may need to play + // forward from an oplog entry that would otherwise be truncated by the user + // setting. Furthermore with prepared transactions, oplog entries can refer to + // previous oplog entries. + // + // Live (replication) rollback will replay oplogs from exactly the stable + // timestamp. With prepared transactions, it may require some additional entries prior + // to the stable timestamp. These requirements are summarized in + // `getOplogNeededForRollback`. Truncating the oplog at this point is sufficient for + // in-memory configurations, but could cause an unrecoverable scenario if the node + // crashed and has to play from the last stable checkpoint. + // + // By recording the oplog needed for rollback "now", then taking a stable checkpoint, + // we can safely assume that the oplog needed for crash recovery has caught up to the + // recorded value. After the checkpoint, this value will be published such that actors + // which truncate the oplog can read an updated value. + const Timestamp oplogNeededForRollback = + _wiredTigerKVEngine->getOplogNeededForRollback(); try { // Three cases: // @@ -316,8 +336,10 @@ public: WT_SESSION* s = session->getSession(); invariantWTOK(s->checkpoint(s, "use_timestamp=true")); - // Publish the checkpoint time after the checkpoint becomes durable. + // Now that the checkpoint is durable, publish the previously recorded stable + // timestamp and oplog needed to recover from it. _lastStableCheckpointTimestamp.store(stableTimestamp.asULL()); + _oplogNeededForCrashRecovery.store(oplogNeededForRollback.asULL()); } } catch (const WriteConflictException&) { // Temporary: remove this after WT-3483 @@ -363,6 +385,10 @@ public: return _lastStableCheckpointTimestamp.load(); } + std::uint64_t getOplogNeededForCrashRecovery() const { + return _oplogNeededForCrashRecovery.load(); + } + void shutdown() { _shuttingDown.store(true); { @@ -391,6 +417,7 @@ private: // checkpoint might have used a newer stable timestamp if stable was updated concurrently with // checkpointing. AtomicWord<std::uint64_t> _lastStableCheckpointTimestamp; + AtomicWord<std::uint64_t> _oplogNeededForCrashRecovery; }; namespace { @@ -569,7 +596,8 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, if (!_readOnly && !_ephemeral) { if (!_recoveryTimestamp.isNull()) { setInitialDataTimestamp(_recoveryTimestamp); - setStableTimestamp(_recoveryTimestamp); + // The `maximumTruncationTimestamp` is not persisted, so choose a conservative value. + setStableTimestamp(_recoveryTimestamp, Timestamp::min()); } _checkpointThread = @@ -1314,7 +1342,8 @@ MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely); } // namespace -void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp) { +void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, + boost::optional<Timestamp> maximumTruncationTimestamp) { if (stableTimestamp.isNull()) { return; } @@ -1353,6 +1382,24 @@ void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp) { _stableTimestamp.store(stableTimestamp.asULL()); } + // After publishing a stable timestamp to WT, we can publish the updated value for the + // necessary oplog to keep. Calls to this method require the min(stableTimestamp, + // maximumTruncationTimestamp) to be monotonically increasing. This allows us to safely record + // and publish a single value without additional concurrency control. + if (maximumTruncationTimestamp) { + // Until we discover otherwise, assume callers expect to obey the contract for proper + // oplog truncation. + DEV invariant(_oplogNeededForRollback.load() <= + std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL())); + _oplogNeededForRollback.store( + std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL())); + } else { + // If there is no maximumTruncationTimestamp at this stable timestamp, WT is free to + // truncate the oplog to any value behind the last stable timestamp, once it is + // checkpointed. + _oplogNeededForRollback.store(stableTimestamp.asULL()); + } + if (_checkpointThread && !_checkpointThread->hasTriggeredFirstStableCheckpoint()) { _checkpointThread->triggerFirstStableCheckpoint( prevStable, Timestamp(_initialDataTimestamp.load()), stableTimestamp); @@ -1576,6 +1623,26 @@ boost::optional<Timestamp> WiredTigerKVEngine::getLastStableRecoveryTimestamp() return boost::none; } +Timestamp WiredTigerKVEngine::getOplogNeededForRollback() const { + // TODO: SERVER-36982 intends to allow holding onto minimum history (in front of the stable + // timestamp). If that results in never calling `StorageEngine::setStableTimestamp`, oplog + // will never be truncated. This method will need to be updated to accomodate that case, most + // simply by having this return `Timestamp::max()`. + return Timestamp(_oplogNeededForRollback.load()); +} + +boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery() const { + if (_ephemeral) { + return boost::none; + } + + return Timestamp(_checkpointThread->getOplogNeededForCrashRecovery()); +} + +Timestamp WiredTigerKVEngine::getPinnedOplog() const { + return getOplogNeededForCrashRecovery().value_or(getOplogNeededForRollback()); +} + bool WiredTigerKVEngine::supportsReadConcernSnapshot() const { return true; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 8367830e231..f466a4eb35e 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -188,7 +188,8 @@ public: void setJournalListener(JournalListener* jl) final; - virtual void setStableTimestamp(Timestamp stableTimestamp) override; + virtual void setStableTimestamp(Timestamp stableTimestamp, + boost::optional<Timestamp> maximumTruncationTimestamp) override; virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) override; @@ -218,7 +219,15 @@ public: virtual Timestamp getAllCommittedTimestamp() const override; - bool supportsReadConcernSnapshot() const final; + bool supportsReadConcernSnapshot() const final override; + + /* + * This function is called when replication has completed a batch. In this function, we + * refresh our oplog visiblity read-at-timestamp value. + */ + void replicationBatchIsComplete() const override; + + bool isCacheUnderPressure(OperationContext* opCtx) const override; // wiredtiger specific // Calls WT_CONNECTION::reconfigure on the underlying WT_CONNECTION @@ -263,12 +272,6 @@ public: return _oplogManager.get(); } - /* - * This function is called when replication has completed a batch. In this function, we - * refresh our oplog visiblity read-at-timestamp value. - */ - void replicationBatchIsComplete() const override; - /** * Sets the implementation for `initRsOplogBackgroundThread` (allowing tests to skip the * background job, for example). Intended to be called from a MONGO_INITIALIZER and therefore in @@ -286,8 +289,6 @@ public: static void appendGlobalStats(BSONObjBuilder& b); - bool isCacheUnderPressure(OperationContext* opCtx) const override; - /** * These are timestamp access functions for serverStatus to be able to report the actual * snapshot window size. @@ -304,6 +305,33 @@ public: */ boost::optional<boost::filesystem::path> getDataFilePathForIdent(StringData ident) const; + /** + * Returns the minimum possible Timestamp value in the oplog that replication may need for + * recovery in the event of a rollback. This value gets updated on every `setStableTimestamp` + * call. + */ + Timestamp getOplogNeededForRollback() const; + + /** + * Returns the minimum possible Timestamp value in the oplog that replication may need for + * recovery in the event of a crash. This value gets updated every time a checkpoint is + * completed. This value is typically a lagged version of what's needed for rollback. + * + * Returns boost::none when called on an ephemeral database. + */ + boost::optional<Timestamp> getOplogNeededForCrashRecovery() const; + + /** + * Returns oplog that may not be truncated. This method is a function of oplog needed for + * rollback and oplog needed for crash recovery. This method considers different states the + * storage engine can be running in, such as running in in-memory mode. + * + * This method returning Timestamp::min() implies no oplog should be truncated and + * Timestamp::max() means oplog can be truncated freely based on user oplog size + * configuration. + */ + Timestamp getPinnedOplog() const; + private: class WiredTigerJournalFlusher; class WiredTigerCheckpointThread; @@ -395,6 +423,7 @@ private: // Tracks the stable and oldest timestamps we've set on the storage engine. AtomicWord<std::uint64_t> _oldestTimestamp; AtomicWord<std::uint64_t> _stableTimestamp; + AtomicWord<std::uint64_t> _oplogNeededForRollback{Timestamp::min().asULL()}; // Timestamp of data at startup. Used internally to advise checkpointing and recovery to a // timestamp. Provided by replication layer because WT does not persist timestamps. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp index a0e0dcfc1f7..b196f25b629 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" #include "mongo/stdx/memory.h" @@ -252,6 +253,51 @@ TEST_F(WiredTigerKVEngineRepairTest, UnrecoverableOrphanedDataFilesAreRebuilt) { #endif } +TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) { + auto opCtxPtr = makeOperationContext(); + // The initial data timestamp has to be set to take stable checkpoints. The first stable + // timestamp greater than this will also trigger a checkpoint. The following loop of the + // CheckpointThread will observe the new `checkpointDelaySecs` value. + _engine->setInitialDataTimestamp(Timestamp(1, 1)); + wiredTigerGlobalOptions.checkpointDelaySecs = 1; + + // A method that will poll the WiredTigerKVEngine until it sees the amount of oplog necessary + // for crash recovery exceeds the input. + auto assertPinnedMovesSoon = [this](Timestamp newPinned) { + // If the current oplog needed for rollback does not exceed the requested pinned out, we + // cannot expect the CheckpointThread to eventually publish a sufficient crash recovery + // value. + ASSERT_TRUE(_engine->getOplogNeededForRollback() >= newPinned); + + // Do 100 iterations that sleep for 100 milliseconds between polls. This will wait for up + // to 10 seconds to observe an asynchronous update that iterates once per second. + for (auto iterations = 0; iterations < 100; ++iterations) { + if (_engine->getPinnedOplog() >= newPinned) { + ASSERT_TRUE(_engine->getOplogNeededForCrashRecovery().get() >= newPinned); + return; + } + + sleepmillis(100); + } + + unittest::log() << "Expected the pinned oplog to advance. Expected value: " << newPinned + << " Published value: " << _engine->getOplogNeededForCrashRecovery(); + FAIL(""); + }; + + _engine->setStableTimestamp(Timestamp(10, 1), boost::none); + assertPinnedMovesSoon(Timestamp(10, 1)); + + _engine->setStableTimestamp(Timestamp(20, 1), Timestamp(15, 1)); + assertPinnedMovesSoon(Timestamp(15, 1)); + + _engine->setStableTimestamp(Timestamp(30, 1), Timestamp(19, 1)); + assertPinnedMovesSoon(Timestamp(19, 1)); + + _engine->setStableTimestamp(Timestamp(30, 1), boost::none); + assertPinnedMovesSoon(Timestamp(30, 1)); +} + std::unique_ptr<KVHarnessHelper> makeHelper() { return stdx::make_unique<WiredTigerKVHarnessHelper>(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 82606cd83b5..b3eb7e9a9f7 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -196,38 +196,17 @@ void WiredTigerRecordStore::OplogStones::awaitHasExcessStonesOrDead() { MONGO_IDLE_THREAD_BLOCK; stdx::lock_guard<stdx::mutex> lk(_mutex); if (hasExcessStones_inlock()) { - // There are now excess oplog stones. - - // We can always truncate the oplog on non recover to stable timestamp storage - // engines. Replication does not need the history. - if (!_rs->supportsRecoverToStableTimestamp()) { - break; - } - - // However, for recover to stable timestamp supporting engines, we cannot delete - // oplog entries newer than the last stable recovery timestamp. + // There are now excess oplog stones. However, there it may be necessary to keep + // additional oplog. // - // Recoverable rollback on the replication layer requires oplog history back to the - // stable timestamp. The storage engine will delete all regular data newer than - // stable on recoverToStableTimestamp, then replication must catch up the rest from - // that point via the oplog. - // - // Furthermore, for the durable engines, replication will need oplog back to the - // last stable checkpoint for crash recovery without resync. Replication must play - // the oplog history forward from the last checkpoint to the present, because the - // engine is not set to journal regular data and thus will only recover checkpointed - // data on startup. - // - // The recovery timestamp contains the above contraints based on the engine in use. - auto optionalLastStableRecoveryTimestamp = _rs->getLastStableRecoveryTimestamp(); - auto lastStableRecoveryTimestamp = optionalLastStableRecoveryTimestamp - ? *optionalLastStableRecoveryTimestamp - : Timestamp::min(); - + // During startup or after rollback, the current state of the data goes "back in + // time" and replication recovery replays oplog entries to bring the data to a + // desired state. This process may require more oplog than the user dictated oplog + // size allotment. auto stone = _stones.front(); invariant(stone.lastRecord.isValid()); if (static_cast<std::uint64_t>(stone.lastRecord.repr()) < - lastStableRecoveryTimestamp.asULL()) { + _rs->getPinnedOplog().asULL()) { break; } } @@ -976,12 +955,8 @@ int64_t WiredTigerRecordStore::_cappedDeleteAsNeeded(OperationContext* opCtx, return _cappedDeleteAsNeeded_inlock(opCtx, justInserted); } -boost::optional<Timestamp> WiredTigerRecordStore::getLastStableRecoveryTimestamp() const { - return _kvEngine->getLastStableRecoveryTimestamp(); -} - -bool WiredTigerRecordStore::supportsRecoverToStableTimestamp() const { - return _kvEngine->supportsRecoverToStableTimestamp(); +Timestamp WiredTigerRecordStore::getPinnedOplog() const { + return _kvEngine->getPinnedOplog(); } void WiredTigerRecordStore::_positionAtFirstRecordId(OperationContext* opCtx, @@ -1193,27 +1168,15 @@ bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext* } void WiredTigerRecordStore::reclaimOplog(OperationContext* opCtx) { - if (!_kvEngine->supportsRecoverToStableTimestamp()) { - // For non-RTT storage engines, the oplog can always be truncated. They do not need the - // history for recoverable rollback or crash recovery. - reclaimOplog(opCtx, Timestamp::max()); - return; - } - - auto optionalLastStableRecoveryTimestamp = _kvEngine->getLastStableRecoveryTimestamp(); - Timestamp lastStableRecoveryTimestamp = optionalLastStableRecoveryTimestamp - ? *optionalLastStableRecoveryTimestamp - : Timestamp::min(); - - reclaimOplog(opCtx, lastStableRecoveryTimestamp); + reclaimOplog(opCtx, _kvEngine->getPinnedOplog()); } -void WiredTigerRecordStore::reclaimOplog(OperationContext* opCtx, Timestamp recoveryTimestamp) { +void WiredTigerRecordStore::reclaimOplog(OperationContext* opCtx, Timestamp mayTruncateUpTo) { Timer timer; while (auto stone = _oplogStones->peekOldestStoneIfNeeded()) { invariant(stone->lastRecord.isValid()); - if (static_cast<std::uint64_t>(stone->lastRecord.repr()) >= recoveryTimestamp.asULL()) { + if (static_cast<std::uint64_t>(stone->lastRecord.repr()) >= mayTruncateUpTo.asULL()) { // Do not truncate oplogs needed for replication recovery. return; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 38b720e0da7..defda35461c 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -186,9 +186,7 @@ public: return true; } - virtual boost::optional<Timestamp> getLastStableRecoveryTimestamp() const final; - - virtual bool supportsRecoverToStableTimestamp() const final; + virtual Timestamp getPinnedOplog() const final; virtual Status compact(OperationContext* opCtx, RecordStoreCompactAdaptor* adaptor, |