From f10e0ad7caf897b6444580d618e4a1e1577793d3 Mon Sep 17 00:00:00 2001 From: Gregory Wlodarek Date: Wed, 9 Sep 2020 03:08:07 -0400 Subject: SERVER-29418 Create a storage-engine agnostic checkpointing thread --- src/mongo/db/mongod_options.cpp | 3 + src/mongo/db/repl/storage_interface_impl.cpp | 14 +- src/mongo/db/storage/SConscript | 15 + src/mongo/db/storage/checkpointer.cpp | 168 +++++++++++ src/mongo/db/storage/checkpointer.h | 114 +++++++ src/mongo/db/storage/control/storage_control.cpp | 15 + .../ephemeral_for_test_kv_engine.h | 4 + src/mongo/db/storage/kv/kv_engine.h | 14 +- src/mongo/db/storage/storage_engine.h | 11 + src/mongo/db/storage/storage_engine_impl.cpp | 10 +- src/mongo/db/storage/storage_engine_impl.h | 4 + src/mongo/db/storage/storage_engine_mock.h | 4 + src/mongo/db/storage/storage_options.cpp | 1 + src/mongo/db/storage/storage_options.h | 4 + src/mongo/db/storage/wiredtiger/SConscript | 1 + .../wiredtiger/wiredtiger_global_options.cpp | 5 - .../storage/wiredtiger/wiredtiger_global_options.h | 2 - .../db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 330 +++++---------------- .../db/storage/wiredtiger/wiredtiger_kv_engine.h | 8 +- .../wiredtiger/wiredtiger_kv_engine_test.cpp | 32 +- .../wiredtiger/wiredtiger_recovery_unit_test.cpp | 7 - 21 files changed, 458 insertions(+), 308 deletions(-) create mode 100644 src/mongo/db/storage/checkpointer.cpp create mode 100644 src/mongo/db/storage/checkpointer.h diff --git a/src/mongo/db/mongod_options.cpp b/src/mongo/db/mongod_options.cpp index f0722782157..e499d04881a 100644 --- a/src/mongo/db/mongod_options.cpp +++ b/src/mongo/db/mongod_options.cpp @@ -404,6 +404,9 @@ Status storeMongodOptions(const moe::Environment& params) { if (params.count("storage.syncPeriodSecs")) { storageGlobalParams.syncdelay = params["storage.syncPeriodSecs"].as(); + storageGlobalParams.checkpointDelaySecs = + static_cast(params["storage.syncPeriodSecs"].as()); + if (storageGlobalParams.syncdelay < 0 || storageGlobalParams.syncdelay > StorageGlobalParams::kMaxSyncdelaySecs) { return Status(ErrorCodes::BadValue, diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 159179530a9..371a2c6af5f 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -74,6 +74,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/rollback_gen.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/checkpointer.h" #include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/control/storage_control.h" #include "mongo/db/storage/durable_catalog.h" @@ -1271,7 +1272,18 @@ void StorageInterfaceImpl::setStableTimestamp(ServiceContext* serviceCtx, Timest "holdStableTimestamp"_attr = holdStableTimestamp); } }); - serviceCtx->getStorageEngine()->setStableTimestamp(newStableTimestamp); + + StorageEngine* storageEngine = serviceCtx->getStorageEngine(); + Timestamp prevStableTimestamp = storageEngine->getStableTimestamp(); + + storageEngine->setStableTimestamp(newStableTimestamp); + + Checkpointer* checkpointer = Checkpointer::get(serviceCtx); + if (checkpointer && !checkpointer->hasTriggeredFirstStableCheckpoint()) { + checkpointer->triggerFirstStableCheckpoint(prevStableTimestamp, + storageEngine->getInitialDataTimestamp(), + storageEngine->getStableTimestamp()); + } } void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx, diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 53ac37b0e30..f60d463a976 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -121,11 +121,13 @@ env.Library( 'control/storage_control.cpp', ], LIBDEPS=[ + 'checkpointer', 'journal_flusher', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/service_context', + 'storage_options', ], ) @@ -512,6 +514,19 @@ env.Library( ], ) +env.Library( + target='checkpointer', + source=[ + 'checkpointer.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/util/background_job', + 'storage_options', + ], +) + env.Library( target='two_phase_index_build_knobs_idl', source=[ diff --git a/src/mongo/db/storage/checkpointer.cpp b/src/mongo/db/storage/checkpointer.cpp new file mode 100644 index 00000000000..825e914d062 --- /dev/null +++ b/src/mongo/db/storage/checkpointer.cpp @@ -0,0 +1,168 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "mongo/db/storage/checkpointer.h" + +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/db/storage/kv/kv_engine.h" +#include "mongo/logv2/log.h" +#include "mongo/util/concurrency/idle_thread_block.h" +#include "mongo/util/fail_point.h" + +namespace mongo { + +namespace { + +const auto getCheckpointer = ServiceContext::declareDecoration>(); + +MONGO_FAIL_POINT_DEFINE(pauseCheckpointThread); + +} // namespace + +Checkpointer* Checkpointer::get(ServiceContext* serviceCtx) { + return getCheckpointer(serviceCtx).get(); +} + +Checkpointer* Checkpointer::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +void Checkpointer::set(ServiceContext* serviceCtx, std::unique_ptr newCheckpointer) { + auto& checkpointer = getCheckpointer(serviceCtx); + if (checkpointer) { + invariant(!checkpointer->running(), + "Tried to reset the Checkpointer without shutting down the original instance."); + } + checkpointer = std::move(newCheckpointer); +} + +void Checkpointer::run() { + ThreadClient tc(name(), getGlobalServiceContext()); + LOGV2_DEBUG(22307, 1, "Starting thread", "threadName"_attr = name()); + + while (true) { + auto opCtx = tc->makeOperationContext(); + + { + stdx::unique_lock lock(_mutex); + MONGO_IDLE_THREAD_BLOCK; + + // Wait for 'storageGlobalParams.checkpointDelaySecs' seconds; or until either shutdown + // is signaled or a checkpoint is triggered. + _sleepCV.wait_for(lock, + stdx::chrono::seconds(static_cast( + storageGlobalParams.checkpointDelaySecs)), + [&] { return _shuttingDown || _triggerCheckpoint; }); + + // If the checkpointDelaySecs is set to 0, that means we should skip checkpointing. + // However, checkpointDelaySecs is adjustable by a runtime server parameter, so we + // need to wake up to check periodically. The wakeup to check period is arbitrary. + while (storageGlobalParams.checkpointDelaySecs == 0 && !_shuttingDown && + !_triggerCheckpoint) { + _sleepCV.wait_for(lock, stdx::chrono::seconds(static_cast(3)), [&] { + return _shuttingDown || _triggerCheckpoint; + }); + } + + if (_shuttingDown) { + invariant(!_shutdownReason.isOK()); + LOGV2_DEBUG(22309, + 1, + "Stopping thread", + "threadName"_attr = name(), + "reason"_attr = _shutdownReason); + return; + } + + // Clear the trigger so we do not immediately checkpoint again after this. + _triggerCheckpoint = false; + } + + pauseCheckpointThread.pauseWhileSet(); + + const Date_t startTime = Date_t::now(); + + // TODO SERVER-50861: Access the storage engine via the ServiceContext. + _kvEngine->checkpoint(); + + const auto secondsElapsed = durationCount(Date_t::now() - startTime); + if (secondsElapsed >= 30) { + LOGV2_DEBUG(22308, + 1, + "Checkpoint was slow to complete", + "secondsElapsed"_attr = secondsElapsed); + } + } +} + +void Checkpointer::triggerFirstStableCheckpoint(Timestamp prevStable, + Timestamp initialData, + Timestamp currStable) { + stdx::unique_lock lock(_mutex); + invariant(!_hasTriggeredFirstStableCheckpoint); + if (prevStable < initialData && currStable >= initialData) { + LOGV2(22310, + "Triggering the first stable checkpoint", + "initialDataTimestamp"_attr = initialData, + "prevStableTimestamp"_attr = prevStable, + "currStableTimestamp"_attr = currStable); + _hasTriggeredFirstStableCheckpoint = true; + _triggerCheckpoint = true; + _sleepCV.notify_one(); + } +} + +bool Checkpointer::hasTriggeredFirstStableCheckpoint() { + stdx::unique_lock lock(_mutex); + return _hasTriggeredFirstStableCheckpoint; +} + +void Checkpointer::shutdown(const Status& reason) { + LOGV2(22322, "Shutting down checkpoint thread"); + + { + stdx::unique_lock lock(_mutex); + _shuttingDown = true; + _shutdownReason = reason; + + // Wake up the checkpoint thread early, to take a final checkpoint before shutting down, if + // one has not coincidentally just been taken. + _sleepCV.notify_one(); + } + + wait(); + LOGV2(22323, "Finished shutting down checkpoint thread"); +} + +} // namespace mongo diff --git a/src/mongo/db/storage/checkpointer.h b/src/mongo/db/storage/checkpointer.h new file mode 100644 index 00000000000..6c50974c2ba --- /dev/null +++ b/src/mongo/db/storage/checkpointer.h @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/background.h" + +namespace mongo { + +class KVEngine; +class OperationContext; +class ServiceContext; +class Timestamp; + +class Checkpointer : public BackgroundJob { +public: + Checkpointer(KVEngine* kvEngine) + : BackgroundJob(false /* deleteSelf */), + _kvEngine(kvEngine), + _shuttingDown(false), + _shutdownReason(Status::OK()), + _hasTriggeredFirstStableCheckpoint(false), + _triggerCheckpoint(false) {} + + static Checkpointer* get(ServiceContext* serviceCtx); + static Checkpointer* get(OperationContext* opCtx); + static void set(ServiceContext* serviceCtx, std::unique_ptr newCheckpointer); + + std::string name() const override { + return "Checkpointer"; + } + + /** + * Starts the checkpoint thread that runs every storageGlobalParams.checkpointDelaySecs seconds. + */ + void run() override; + + /** + * Triggers taking the first stable checkpoint if the stable timestamp has advanced past the + * initial data timestamp. + * + * The checkpoint thread runs automatically every storageGlobalParams.checkpointDelaySecs + * seconds. This function avoids potentially waiting that full duration for a stable checkpoint, + * initiating one immediately. + * + * Do not call this function if hasTriggeredFirstStableCheckpoint() returns true. + */ + void triggerFirstStableCheckpoint(Timestamp prevStable, + Timestamp initialData, + Timestamp currStable); + + /** + * Returns whether the first stable checkpoint has already been triggered. + */ + bool hasTriggeredFirstStableCheckpoint(); + + /** + * Blocks until the checkpoint thread has been fully shutdown. + */ + void shutdown(const Status& reason); + +private: + // A pointer to the KVEngine is maintained only due to unit testing limitations that don't fully + // setup the ServiceContext. + // TODO SERVER-50861: Remove this pointer. + KVEngine* const _kvEngine; + + // Protects the state below. + Mutex _mutex = MONGO_MAKE_LATCH("Checkpointer::_mutex"); + + // The checkpoint thread idles on this condition variable for a particular time duration between + // taking checkpoints. It can be triggered early to expedite either: immediate checkpointing if + // _triggerCheckpoint is set; or shutdown cleanup if _shuttingDown is set. + stdx::condition_variable _sleepCV; + + bool _shuttingDown; + Status _shutdownReason; + + // This flag ensures the first stable checkpoint is only triggered once. + bool _hasTriggeredFirstStableCheckpoint; + + // This flag allows the checkpoint thread to wake up early when _sleepCV is signaled. + bool _triggerCheckpoint; +}; + +} // namespace mongo diff --git a/src/mongo/db/storage/control/storage_control.cpp b/src/mongo/db/storage/control/storage_control.cpp index f0b7e7d825f..50213d44dfc 100644 --- a/src/mongo/db/storage/control/storage_control.cpp +++ b/src/mongo/db/storage/control/storage_control.cpp @@ -35,7 +35,9 @@ #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/checkpointer.h" #include "mongo/db/storage/control/journal_flusher.h" +#include "mongo/db/storage/storage_options.h" #include "mongo/logv2/log.h" namespace mongo { @@ -73,12 +75,25 @@ void startStorageControls(ServiceContext* serviceContext, bool forTestOnly) { journalFlusher->go(); JournalFlusher::set(serviceContext, std::move(journalFlusher)); + if (storageEngine->supportsCheckpoints() && !storageEngine->isEphemeral() && + !storageGlobalParams.readOnly) { + std::unique_ptr checkpointer = + std::make_unique(storageEngine->getEngine()); + checkpointer->go(); + Checkpointer::set(serviceContext, std::move(checkpointer)); + } + areControlsStarted = true; } void stopStorageControls(ServiceContext* serviceContext, const Status& reason) { if (areControlsStarted) { JournalFlusher::get(serviceContext)->shutdown(reason); + + auto checkpointer = Checkpointer::get(serviceContext); + if (checkpointer) { + checkpointer->shutdown(reason); + } } } diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h index b3da8bb0085..fd243b0c8c1 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h @@ -173,6 +173,10 @@ public: Timestamp getOldestTimestamp() const override; + Timestamp getStableTimestamp() const override { + return Timestamp(); + } + void setOldestTimestamp(Timestamp newOldestTimestamp, bool force) override; std::map> getHistory_forTest(); diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index 46dad070544..6c8c67df3c4 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -52,18 +52,6 @@ class SnapshotManager; class KVEngine { public: - /** - * This function should only be called after the StorageEngine is set on the ServiceContext. - * - * Starts asycnhronous threads for a storage engine's integration layer. Any such thread - * generating an OperationContext should be initialized here. - * - * In order for OperationContexts to be generated with real Locker objects, the generation must - * occur after the StorageEngine is instantiated and set on the ServiceContext. Otherwise, - * OperationContexts are created with LockerNoops. - */ - virtual void startAsyncThreads() {} - /** * During the startup process, the storage engine is one of the first components to be started * up and fully initialized. But that fully initialized storage engine may not be recognized as @@ -275,6 +263,8 @@ public: return false; } + virtual void checkpoint() {} + virtual bool isDurable() const = 0; /** diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 72c09e125b6..edf31b874fe 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -472,6 +472,12 @@ public: const NamespaceString& nss, std::shared_ptr ident) = 0; + /** + * Called when the checkpoint thread instructs the storage engine to take a checkpoint. The + * underlying storage engine must take a checkpoint at this point. + */ + virtual void checkpoint() = 0; + /** * Recovers the storage engine state to the last stable timestamp. "Stable" in this case * refers to a timestamp that is guaranteed to never be rolled back. The stable timestamp @@ -516,6 +522,11 @@ public: */ virtual void setStableTimestamp(Timestamp stableTimestamp, bool force = false) = 0; + /** + * Returns the stable timestamp. + */ + virtual Timestamp getStableTimestamp() const = 0; + /** * Tells the storage engine the timestamp of the data at startup. This is necessary because * timestamps are not persisted in the storage layer. diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp index e8efa8ce88d..22c82a09eba 100644 --- a/src/mongo/db/storage/storage_engine_impl.cpp +++ b/src/mongo/db/storage/storage_engine_impl.cpp @@ -670,8 +670,6 @@ void StorageEngineImpl::finishInit() { // A storage engine may need to start threads that require OperationsContexts with real Lockers, // as opposed to LockerNoops. Placing the start logic here, after the StorageEngine has been // instantiated, causes makeOperationContext() to create LockerImpls instead of LockerNoops. - _engine->startAsyncThreads(); - if (_engine->supportsRecoveryTimestamp()) { _timestampMonitor = std::make_unique( _engine.get(), getGlobalServiceContext()->getPeriodicRunner()); @@ -893,6 +891,10 @@ void StorageEngineImpl::setStableTimestamp(Timestamp stableTimestamp, bool force _engine->setStableTimestamp(stableTimestamp, force); } +Timestamp StorageEngineImpl::getStableTimestamp() const { + return _engine->getStableTimestamp(); +} + void StorageEngineImpl::setInitialDataTimestamp(Timestamp initialDataTimestamp) { _engine->setInitialDataTimestamp(initialDataTimestamp); } @@ -1033,6 +1035,10 @@ void StorageEngineImpl::addDropPendingIdent(const Timestamp& dropTimestamp, _dropPendingIdentReaper.addDropPendingIdent(dropTimestamp, nss, ident); } +void StorageEngineImpl::checkpoint() { + _engine->checkpoint(); +} + void StorageEngineImpl::_onMinOfCheckpointAndOldestTimestampChanged(const Timestamp& timestamp) { if (timestamp.isNull()) { return; diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h index 7d71e5de128..fed128f9b59 100644 --- a/src/mongo/db/storage/storage_engine_impl.h +++ b/src/mongo/db/storage/storage_engine_impl.h @@ -123,6 +123,8 @@ public: virtual void setStableTimestamp(Timestamp stableTimestamp, bool force = false) override; + virtual Timestamp getStableTimestamp() const override; + virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) override; virtual Timestamp getInitialDataTimestamp() const override; @@ -315,6 +317,8 @@ public: const NamespaceString& nss, std::shared_ptr ident) override; + void checkpoint() override; + DurableCatalog* getCatalog() override { return _catalog.get(); } diff --git a/src/mongo/db/storage/storage_engine_mock.h b/src/mongo/db/storage/storage_engine_mock.h index 5c7078e2a2d..96eb8020b1d 100644 --- a/src/mongo/db/storage/storage_engine_mock.h +++ b/src/mongo/db/storage/storage_engine_mock.h @@ -138,6 +138,9 @@ public: MONGO_UNREACHABLE; } void setStableTimestamp(Timestamp stableTimestamp, bool force = false) final {} + Timestamp getStableTimestamp() const override { + return Timestamp(); + } void setInitialDataTimestamp(Timestamp timestamp) final {} Timestamp getInitialDataTimestamp() const override { return Timestamp(); @@ -172,6 +175,7 @@ public: void addDropPendingIdent(const Timestamp& dropTimestamp, const NamespaceString& nss, std::shared_ptr ident) final {} + void checkpoint() final {} Status currentFilesCompatible(OperationContext* opCtx) const final { return Status::OK(); } diff --git a/src/mongo/db/storage/storage_options.cpp b/src/mongo/db/storage/storage_options.cpp index 7ba94afde29..431698a807d 100644 --- a/src/mongo/db/storage/storage_options.cpp +++ b/src/mongo/db/storage/storage_options.cpp @@ -58,6 +58,7 @@ void StorageGlobalParams::reset() { oplogMinRetentionHours.store(0.0); allowOplogTruncation = true; disableLockFreeReads = true; + checkpointDelaySecs = 0; } StorageGlobalParams storageGlobalParams; diff --git a/src/mongo/db/storage/storage_options.h b/src/mongo/db/storage/storage_options.h index f6284a06244..e7fe5331f96 100644 --- a/src/mongo/db/storage/storage_options.h +++ b/src/mongo/db/storage/storage_options.h @@ -123,6 +123,10 @@ struct StorageGlobalParams { // settings with which lock-free reads are incompatible: standalone mode; and // enableMajorityReadConcern=false. bool disableLockFreeReads; + + // Delay in seconds between triggering the next checkpoint after the completion of the previous + // one. A value of 0 indicates that checkpointing will be skipped. + size_t checkpointDelaySecs; }; extern StorageGlobalParams storageGlobalParams; diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript index 0cf7d92ce08..5d24feec685 100644 --- a/src/mongo/db/storage/wiredtiger/SConscript +++ b/src/mongo/db/storage/wiredtiger/SConscript @@ -139,6 +139,7 @@ if wiredtiger: '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/service_context_d', '$BUILD_DIR/mongo/db/service_context_test_fixture', + '$BUILD_DIR/mongo/db/storage/checkpointer', '$BUILD_DIR/mongo/db/storage/durable_catalog_impl', '$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness', '$BUILD_DIR/mongo/db/storage/recovery_unit_test_harness', diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp index d7bba3ee94d..8149bab8757 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp @@ -43,11 +43,6 @@ WiredTigerGlobalOptions wiredTigerGlobalOptions; Status WiredTigerGlobalOptions::store(const moe::Environment& params) { // WiredTiger storage engine options - if (params.count("storage.syncPeriodSecs")) { - wiredTigerGlobalOptions.checkpointDelaySecs = - static_cast(params["storage.syncPeriodSecs"].as()); - } - if (!wiredTigerGlobalOptions.engineConfig.empty()) { LOGV2(22293, "Engine custom option: {wiredTigerGlobalOptions_engineConfig}", diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h index 21d4c522f3b..51546164c39 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h @@ -40,7 +40,6 @@ class WiredTigerGlobalOptions { public: WiredTigerGlobalOptions() : cacheSizeGB(0), - checkpointDelaySecs(0), statisticsLogDelaySecs(0), directoryForIndexes(false), maxCacheOverflowFileSizeGBDeprecated(0), @@ -50,7 +49,6 @@ public: Status store(const optionenvironment::Environment& params); double cacheSizeGB; - size_t checkpointDelaySecs; size_t statisticsLogDelaySecs; std::string journalCompressor; bool directoryForIndexes; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 1553c1740fe..f169f952e05 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -119,8 +119,6 @@ namespace { MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely); MONGO_FAIL_POINT_DEFINE(WTSetOldestTSToStableTS); -MONGO_FAIL_POINT_DEFINE(pauseCheckpointThread); - } // namespace bool WiredTigerFileVersion::shouldDowngrade(bool readOnly, @@ -255,231 +253,6 @@ std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult } } -class WiredTigerKVEngine::WiredTigerCheckpointThread : public BackgroundJob { -public: - explicit WiredTigerCheckpointThread(WiredTigerKVEngine* wiredTigerKVEngine, - WiredTigerSessionCache* sessionCache) - : BackgroundJob(false /* deleteSelf */), - _wiredTigerKVEngine(wiredTigerKVEngine), - _sessionCache(sessionCache) {} - - virtual string name() const { - return "WTCheckpointThread"; - } - - virtual void run() { - ThreadClient tc(name(), getGlobalServiceContext()); - LOGV2_DEBUG(22307, 1, "Starting thread", "threadName"_attr = name()); - - while (true) { - auto opCtx = tc->makeOperationContext(); - - { - stdx::unique_lock lock(_mutex); - MONGO_IDLE_THREAD_BLOCK; - - // Wait for 'wiredTigerGlobalOptions.checkpointDelaySecs' seconds; or until either - // shutdown is signaled or a checkpoint is triggered. - _condvar.wait_for(lock, - stdx::chrono::seconds(static_cast( - wiredTigerGlobalOptions.checkpointDelaySecs)), - [&] { return _shuttingDown || _triggerCheckpoint; }); - - // If the checkpointDelaySecs is set to 0, that means we should skip checkpointing. - // However, checkpointDelaySecs is adjustable by a runtime server parameter, so we - // need to wake up to check periodically. The wakeup to check period is arbitrary. - while (wiredTigerGlobalOptions.checkpointDelaySecs == 0 && !_shuttingDown && - !_triggerCheckpoint) { - _condvar.wait_for(lock, - stdx::chrono::seconds(static_cast(3)), - [&] { return _shuttingDown || _triggerCheckpoint; }); - } - - if (_shuttingDown) { - LOGV2_DEBUG(22309, 1, "Stopping thread", "threadName"_attr = name()); - return; - } - - // Clear the trigger so we do not immediately checkpoint again after this. - _triggerCheckpoint = false; - } - - pauseCheckpointThread.pauseWhileSet(); - - const Date_t startTime = Date_t::now(); - - const Timestamp stableTimestamp = _wiredTigerKVEngine->getStableTimestamp(); - const Timestamp initialDataTimestamp = _wiredTigerKVEngine->getInitialDataTimestamp(); - - // The amount of oplog to keep is primarily dictated by a user setting. However, in - // unexpected cases, durable, recover to a timestamp storage engines may need to play - // forward from an oplog entry that would otherwise be truncated by the user - // setting. Furthermore, the entries in prepared or large transactions can refer to - // previous entries in the same transaction. - // - // Live (replication) rollback will replay oplogs from exactly the stable timestamp. - // With prepared or large transactions, it may require some additional entries prior to - // the stable timestamp. These requirements are summarized in getOplogNeededForRollback. - // Truncating the oplog at this point is sufficient for in-memory configurations, but - // could cause an unrecoverable scenario if the node crashed and has to play from the - // last stable checkpoint. - // - // By recording the oplog needed for rollback "now", then taking a stable checkpoint, - // we can safely assume that the oplog needed for crash recovery has caught up to the - // recorded value. After the checkpoint, this value will be published such that actors - // which truncate the oplog can read an updated value. - try { - // Three cases: - // - // First, initialDataTimestamp is Timestamp(0, 1) -> Take full checkpoint. This is - // when there is no consistent view of the data (i.e: during initial sync). - // - // Second, stableTimestamp < initialDataTimestamp: Skip checkpoints. The data on - // disk is prone to being rolled back. Hold off on checkpoints. Hope that the - // stable timestamp surpasses the data on disk, allowing storage to persist newer - // copies to disk. - // - // Third, stableTimestamp >= initialDataTimestamp: Take stable checkpoint. Steady - // state case. - if (initialDataTimestamp.asULL() <= 1) { - UniqueWiredTigerSession session = _sessionCache->getSession(); - WT_SESSION* s = session->getSession(); - invariantWTOK(s->checkpoint(s, "use_timestamp=false")); - } else if (stableTimestamp < initialDataTimestamp) { - LOGV2_FOR_RECOVERY( - 23985, - 2, - "Stable timestamp is behind the initial data timestamp, skipping " - "a checkpoint. StableTimestamp: {stableTimestamp} InitialDataTimestamp: " - "{initialDataTimestamp}", - "stableTimestamp"_attr = stableTimestamp.toString(), - "initialDataTimestamp"_attr = initialDataTimestamp.toString()); - } else { - auto oplogNeededForRollback = _wiredTigerKVEngine->getOplogNeededForRollback(); - - LOGV2_FOR_RECOVERY( - 23986, - 2, - "Performing stable checkpoint. StableTimestamp: {stableTimestamp}, " - "OplogNeededForRollback: {oplogNeededForRollback}", - "stableTimestamp"_attr = stableTimestamp, - "oplogNeededForRollback"_attr = toString(oplogNeededForRollback)); - - UniqueWiredTigerSession session = _sessionCache->getSession(); - WT_SESSION* s = session->getSession(); - invariantWTOK(s->checkpoint(s, "use_timestamp=true")); - - if (oplogNeededForRollback.isOK()) { - // Now that the checkpoint is durable, publish the oplog needed to recover - // from it. - stdx::lock_guard lk(_oplogNeededForCrashRecoveryMutex); - _oplogNeededForCrashRecovery.store( - oplogNeededForRollback.getValue().asULL()); - } - } - - const auto secondsElapsed = durationCount(Date_t::now() - startTime); - if (secondsElapsed >= 30) { - LOGV2_DEBUG(22308, - 1, - "Checkpoint took {secondsElapsed} seconds to complete.", - "secondsElapsed"_attr = secondsElapsed); - } - } catch (const WriteConflictException&) { - // Temporary: remove this after WT-3483 - LOGV2_WARNING(22346, "Checkpoint encountered a write conflict exception."); - } catch (const AssertionException& exc) { - invariant(ErrorCodes::isShutdownError(exc.code()), exc.what()); - } - } - } - - /** - * Returns true if we have already triggered taking the first checkpoint. - */ - bool hasTriggeredFirstStableCheckpoint() { - stdx::unique_lock lock(_mutex); - return _hasTriggeredFirstStableCheckpoint; - } - - /** - * Triggers taking the first stable checkpoint, which is when the stable timestamp advances past - * the initial data timestamp. - * - * The checkpoint thread runs automatically every wiredTigerGlobalOptions.checkpointDelaySecs - * seconds. This function avoids potentially waiting that full duration for a stable checkpoint, - * initiating one immediately. - * - * Do not call this function if hasTriggeredFirstStableCheckpoint() returns true. - */ - void triggerFirstStableCheckpoint(Timestamp prevStable, - Timestamp initialData, - Timestamp currStable) { - stdx::unique_lock lock(_mutex); - invariant(!_hasTriggeredFirstStableCheckpoint); - if (prevStable < initialData && currStable >= initialData) { - LOGV2(22310, - "Triggering the first stable checkpoint. Initial Data: {initialData} PrevStable: " - "{prevStable} CurrStable: {currStable}", - "Triggering the first stable checkpoint", - "initialData"_attr = initialData, - "prevStable"_attr = prevStable, - "currStable"_attr = currStable); - _hasTriggeredFirstStableCheckpoint = true; - _triggerCheckpoint = true; - _condvar.notify_one(); - } - } - - std::uint64_t getOplogNeededForCrashRecovery() const { - return _oplogNeededForCrashRecovery.load(); - } - - /* - * Atomically assign _oplogNeededForCrashRecovery to a variable. - * _oplogNeededForCrashRecovery will not change during assignment. - */ - void assignOplogNeededForCrashRecoveryTo(boost::optional* timestamp) { - stdx::lock_guard lk(_oplogNeededForCrashRecoveryMutex); - *timestamp = Timestamp(_oplogNeededForCrashRecovery.load()); - } - - void shutdown() { - { - stdx::unique_lock lock(_mutex); - _shuttingDown = true; - // Wake up the checkpoint thread early, to take a final checkpoint before shutting - // down, if one has not coincidentally just been taken. - _condvar.notify_one(); - } - wait(); - } - -private: - WiredTigerKVEngine* _wiredTigerKVEngine; - WiredTigerSessionCache* _sessionCache; - - Mutex _oplogNeededForCrashRecoveryMutex = - MONGO_MAKE_LATCH("WiredTigerCheckpointThread::_oplogNeededForCrashRecoveryMutex"); - AtomicWord _oplogNeededForCrashRecovery; - - // Protects the state below. - Mutex _mutex = MONGO_MAKE_LATCH("WiredTigerCheckpointThread::_mutex"); - - // The checkpoint thread idles on this condition variable for a particular time duration between - // taking checkpoints. It can be triggered early to expedite either: immediate checkpointing if - // _triggerCheckpoint is set; or shutdown cleanup if _shuttingDown is set. - stdx::condition_variable _condvar; - - bool _shuttingDown = false; - - // This flag ensures the first stable checkpoint is only triggered once. - bool _hasTriggeredFirstStableCheckpoint = false; - - // This flag allows the checkpoint thread to wake up early when _condvar is signaled. - bool _triggerCheckpoint = false; -}; - namespace { TicketHolder openWriteTransaction(128); TicketHolder openReadTransaction(128); @@ -759,16 +532,6 @@ WiredTigerKVEngine::~WiredTigerKVEngine() { _sessionCache.reset(nullptr); } -void WiredTigerKVEngine::startAsyncThreads() { - if (!_ephemeral) { - if (!_readOnly) { - _checkpointThread = - std::make_unique(this, _sessionCache.get()); - _checkpointThread->go(); - } - } -} - void WiredTigerKVEngine::notifyStartupComplete() { WiredTigerUtil::notifyStartupComplete(); } @@ -898,11 +661,6 @@ void WiredTigerKVEngine::cleanShutdown() { _sessionSweeper->shutdown(); LOGV2(22319, "Finished shutting down session sweeper thread"); } - if (_checkpointThread) { - LOGV2(22322, "Shutting down checkpoint thread"); - _checkpointThread->shutdown(); - LOGV2(22323, "Finished shutting down checkpoint thread"); - } LOGV2_FOR_RECOVERY(23988, 2, "Shutdown timestamps.", @@ -1385,7 +1143,7 @@ WiredTigerKVEngine::beginNonBlockingBackup(OperationContext* opCtx, // Oplog truncation thread won't remove oplog since the checkpoint pinned by the backup cursor. stdx::lock_guard lock(_oplogPinnedByBackupMutex); - _checkpointThread->assignOplogNeededForCrashRecoveryTo(&_oplogPinnedByBackup); + _oplogPinnedByBackup = Timestamp(_oplogNeededForCrashRecovery.load()); auto pinOplogGuard = makeGuard([&] { _oplogPinnedByBackup = boost::none; }); // Persist the sizeStorer information to disk before opening the backup cursor. We aren't @@ -1907,6 +1665,74 @@ bool WiredTigerKVEngine::supportsDirectoryPerDB() const { return true; } +void WiredTigerKVEngine::checkpoint() { + const Timestamp stableTimestamp = getStableTimestamp(); + const Timestamp initialDataTimestamp = getInitialDataTimestamp(); + + // The amount of oplog to keep is primarily dictated by a user setting. However, in unexpected + // cases, durable, recover to a timestamp storage engines may need to play forward from an oplog + // entry that would otherwise be truncated by the user setting. Furthermore, the entries in + // prepared or large transactions can refer to previous entries in the same transaction. + // + // Live (replication) rollback will replay the oplog from exactly the stable timestamp. With + // prepared or large transactions, it may require some additional entries prior to the stable + // timestamp. These requirements are summarized in getOplogNeededForRollback. Truncating the + // oplog at this point is sufficient for in-memory configurations, but could cause an + // unrecoverable scenario if the node crashed and has to play from the last stable checkpoint. + // + // By recording the oplog needed for rollback "now", then taking a stable checkpoint, we can + // safely assume that the oplog needed for crash recovery has caught up to the recorded value. + // After the checkpoint, this value will be published such that actors which truncate the oplog + // can read an updated value. + try { + // Three cases: + // + // First, initialDataTimestamp is Timestamp(0, 1) -> Take full checkpoint. This is when + // there is no consistent view of the data (i.e: during initial sync). + // + // Second, stableTimestamp < initialDataTimestamp: Skip checkpoints. The data on disk is + // prone to being rolled back. Hold off on checkpoints. Hope that the stable timestamp + // surpasses the data on disk, allowing storage to persist newer copies to disk. + // + // Third, stableTimestamp >= initialDataTimestamp: Take stable checkpoint. Steady state + // case. + if (initialDataTimestamp.asULL() <= 1) { + UniqueWiredTigerSession session = _sessionCache->getSession(); + WT_SESSION* s = session->getSession(); + invariantWTOK(s->checkpoint(s, "use_timestamp=false")); + } else if (stableTimestamp < initialDataTimestamp) { + LOGV2_FOR_RECOVERY( + 23985, + 2, + "Stable timestamp is behind the initial data timestamp, skipping a checkpoint.", + "stableTimestamp"_attr = stableTimestamp.toString(), + "initialDataTimestamp"_attr = initialDataTimestamp.toString()); + } else { + auto oplogNeededForRollback = getOplogNeededForRollback(); + + LOGV2_FOR_RECOVERY(23986, + 2, + "Performing stable checkpoint.", + "stableTimestamp"_attr = stableTimestamp, + "oplogNeededForRollback"_attr = toString(oplogNeededForRollback)); + + UniqueWiredTigerSession session = _sessionCache->getSession(); + WT_SESSION* s = session->getSession(); + invariantWTOK(s->checkpoint(s, "use_timestamp=true")); + + if (oplogNeededForRollback.isOK()) { + // Now that the checkpoint is durable, publish the oplog needed to recover from it. + _oplogNeededForCrashRecovery.store(oplogNeededForRollback.getValue().asULL()); + } + } + } catch (const WriteConflictException&) { + // TODO SERVER-50824: Check if this can be removed now that WT-3483 is done. + LOGV2_WARNING(22346, "Checkpoint encountered a write conflict exception."); + } catch (const AssertionException& exc) { + invariant(ErrorCodes::isShutdownError(exc.code()), exc.what()); + } +} + bool WiredTigerKVEngine::hasIdent(OperationContext* opCtx, StringData ident) const { return _hasUri(WiredTigerRecoveryUnit::get(opCtx)->getSession()->getSession(), _uri(ident)); } @@ -2045,10 +1871,6 @@ void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, bool forc // After publishing a stable timestamp to WT, we can record the updated stable timestamp value // for the necessary oplog to keep. _stableTimestamp.store(stableTimestamp.asULL()); - if (_checkpointThread && !_checkpointThread->hasTriggeredFirstStableCheckpoint()) { - _checkpointThread->triggerFirstStableCheckpoint( - prevStable, Timestamp(_initialDataTimestamp.load()), stableTimestamp); - } // If 'force' is set, then we have already set the oldest timestamp equal to the stable // timestamp, so there is nothing left to do. @@ -2193,13 +2015,6 @@ StatusWith WiredTigerKVEngine::recoverToStableTimestamp(OperationCont 23989, 2, "WiredTiger::RecoverToStableTimestamp syncing size storer to disk."); syncSizeInfo(true); - if (!_ephemeral) { - LOGV2_FOR_ROLLBACK( - 23990, 2, "WiredTiger::RecoverToStableTimestamp shutting down checkpoint thread."); - // Shutdown WiredTigerKVEngine owned accesses into the storage engine. - _checkpointThread->shutdown(); - } - const Timestamp stableTimestamp(_stableTimestamp.load()); const Timestamp initialDataTimestamp(_initialDataTimestamp.load()); @@ -2216,11 +2031,6 @@ StatusWith WiredTigerKVEngine::recoverToStableTimestamp(OperationCont str::stream() << "Error rolling back to stable. Err: " << wiredtiger_strerror(ret)}; } - if (!_ephemeral) { - _checkpointThread = std::make_unique(this, _sessionCache.get()); - _checkpointThread->go(); - } - _sizeStorer = std::make_unique(_conn, _sizeStorerUri, _readOnly); return {stableTimestamp}; @@ -2345,7 +2155,7 @@ boost::optional WiredTigerKVEngine::getOplogNeededForCrashRecovery() return boost::none; } - return Timestamp(_checkpointThread->getOplogNeededForCrashRecovery()); + return Timestamp(_oplogNeededForCrashRecovery.load()); } Timestamp WiredTigerKVEngine::getPinnedOplog() const { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 9327ae7454f..bfd539e7815 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -103,8 +103,6 @@ public: ~WiredTigerKVEngine(); - void startAsyncThreads() override; - void notifyStartupComplete() override; void setRecordStoreExtraOptions(const std::string& options); @@ -119,6 +117,8 @@ public: return !isEphemeral(); } + void checkpoint() override; + bool isDurable() const override { return _durable; } @@ -369,7 +369,6 @@ public: private: class WiredTigerSessionSweeper; - class WiredTigerCheckpointThread; /** * Opens a connection on the WiredTiger database 'path' with the configuration 'wtOpenConfig'. @@ -458,7 +457,6 @@ private: const bool _keepDataHistory = true; std::unique_ptr _sessionSweeper; - std::unique_ptr _checkpointThread; std::string _rsOptions; std::string _indexOptions; @@ -485,6 +483,8 @@ private: // timestamp. Provided by replication layer because WT does not persist timestamps. AtomicWord _initialDataTimestamp; + AtomicWord _oplogNeededForCrashRecovery; + std::unique_ptr _runTimeConfigParam; mutable Mutex _highestDurableTimestampMutex = diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp index b870c017798..2580960a76c 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp @@ -43,7 +43,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_test_fixture.h" -#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" +#include "mongo/db/storage/checkpointer.h" #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" #include "mongo/logv2/log.h" @@ -82,19 +82,16 @@ public: private: std::unique_ptr makeEngine() { - auto engine = std::make_unique(kWiredTigerEngineName, - _dbpath.path(), - _cs.get(), - "", - 1, - 0, - false, - false, - _forRepair, - false); - // There are unit tests expecting checkpoints to occur asynchronously. - engine->startAsyncThreads(); - return engine; + return std::make_unique(kWiredTigerEngineName, + _dbpath.path(), + _cs.get(), + "", + 1, + 0, + false, + false, + _forRepair, + false); } const std::unique_ptr _cs = std::make_unique(); @@ -246,6 +243,9 @@ TEST_F(WiredTigerKVEngineRepairTest, UnrecoverableOrphanedDataFilesAreRebuilt) { } TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) { + std::unique_ptr checkpointer = std::make_unique(_engine); + checkpointer->go(); + auto opCtxPtr = makeOperationContext(); // The initial data timestamp has to be set to take stable checkpoints. The first stable // timestamp greater than this will also trigger a checkpoint. The following loop of the @@ -262,7 +262,7 @@ TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) { #endif #endif { - wiredTigerGlobalOptions.checkpointDelaySecs = 1; + storageGlobalParams.checkpointDelaySecs = 1; } (); @@ -341,6 +341,8 @@ TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) { _engine->setStableTimestamp(Timestamp(30, 1), false); callbackShouldFail.store(false); assertPinnedMovesSoon(Timestamp(40, 1)); + + checkpointer->shutdown({ErrorCodes::ShutdownInProgress, "Test finished"}); } std::unique_ptr makeHelper() { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp index 740672e7a2c..2dde320ceeb 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp @@ -61,13 +61,6 @@ public: false, // .repair false // .readOnly ) { - // Deliberately not calling _engine->startAsyncThreads() because it starts an asynchronous - // checkpointing thread that can interfere with unit tests manipulating checkpoints - // manually. - // - // Alternatively, we would have to start using wiredTigerGlobalOptions.checkpointDelaySecs - // to set a high enough value such that the async thread never runs during testing. - repl::ReplicationCoordinator::set( getGlobalServiceContext(), std::unique_ptr(new repl::ReplicationCoordinatorMock( -- cgit v1.2.1