summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Wlodarek <gregory.wlodarek@mongodb.com>2020-09-09 03:08:07 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-10 23:41:14 +0000
commitf10e0ad7caf897b6444580d618e4a1e1577793d3 (patch)
tree9685f732c852dc7e264776632aa1356ecfcd928f
parentb68df4e87d60fcaeb5cb5ea34762695910c8f2c6 (diff)
downloadmongo-f10e0ad7caf897b6444580d618e4a1e1577793d3.tar.gz
SERVER-29418 Create a storage-engine agnostic checkpointing thread
-rw-r--r--src/mongo/db/mongod_options.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp14
-rw-r--r--src/mongo/db/storage/SConscript15
-rw-r--r--src/mongo/db/storage/checkpointer.cpp168
-rw-r--r--src/mongo/db/storage/checkpointer.h114
-rw-r--r--src/mongo/db/storage/control/storage_control.cpp15
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h4
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h14
-rw-r--r--src/mongo/db/storage/storage_engine.h11
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp10
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h4
-rw-r--r--src/mongo/db/storage/storage_engine_mock.h4
-rw-r--r--src/mongo/db/storage/storage_options.cpp1
-rw-r--r--src/mongo/db/storage/storage_options.h4
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript1
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp330
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h8
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp32
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp7
21 files changed, 458 insertions, 308 deletions
diff --git a/src/mongo/db/mongod_options.cpp b/src/mongo/db/mongod_options.cpp
index f0722782157..e499d04881a 100644
--- a/src/mongo/db/mongod_options.cpp
+++ b/src/mongo/db/mongod_options.cpp
@@ -404,6 +404,9 @@ Status storeMongodOptions(const moe::Environment& params) {
if (params.count("storage.syncPeriodSecs")) {
storageGlobalParams.syncdelay = params["storage.syncPeriodSecs"].as<double>();
+ storageGlobalParams.checkpointDelaySecs =
+ static_cast<size_t>(params["storage.syncPeriodSecs"].as<double>());
+
if (storageGlobalParams.syncdelay < 0 ||
storageGlobalParams.syncdelay > StorageGlobalParams::kMaxSyncdelaySecs) {
return Status(ErrorCodes::BadValue,
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 159179530a9..371a2c6af5f 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -74,6 +74,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/rollback_gen.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/checkpointer.h"
#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/durable_catalog.h"
@@ -1271,7 +1272,18 @@ void StorageInterfaceImpl::setStableTimestamp(ServiceContext* serviceCtx, Timest
"holdStableTimestamp"_attr = holdStableTimestamp);
}
});
- serviceCtx->getStorageEngine()->setStableTimestamp(newStableTimestamp);
+
+ StorageEngine* storageEngine = serviceCtx->getStorageEngine();
+ Timestamp prevStableTimestamp = storageEngine->getStableTimestamp();
+
+ storageEngine->setStableTimestamp(newStableTimestamp);
+
+ Checkpointer* checkpointer = Checkpointer::get(serviceCtx);
+ if (checkpointer && !checkpointer->hasTriggeredFirstStableCheckpoint()) {
+ checkpointer->triggerFirstStableCheckpoint(prevStableTimestamp,
+ storageEngine->getInitialDataTimestamp(),
+ storageEngine->getStableTimestamp());
+ }
}
void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx,
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 53ac37b0e30..f60d463a976 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -121,11 +121,13 @@ env.Library(
'control/storage_control.cpp',
],
LIBDEPS=[
+ 'checkpointer',
'journal_flusher',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/service_context',
+ 'storage_options',
],
)
@@ -513,6 +515,19 @@ env.Library(
)
env.Library(
+ target='checkpointer',
+ source=[
+ 'checkpointer.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/util/background_job',
+ 'storage_options',
+ ],
+)
+
+env.Library(
target='two_phase_index_build_knobs_idl',
source=[
env.Idlc('two_phase_index_build_knobs.idl')[0],
diff --git a/src/mongo/db/storage/checkpointer.cpp b/src/mongo/db/storage/checkpointer.cpp
new file mode 100644
index 00000000000..825e914d062
--- /dev/null
+++ b/src/mongo/db/storage/checkpointer.cpp
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/checkpointer.h"
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/kv/kv_engine.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/fail_point.h"
+
+namespace mongo {
+
+namespace {
+
+const auto getCheckpointer = ServiceContext::declareDecoration<std::unique_ptr<Checkpointer>>();
+
+MONGO_FAIL_POINT_DEFINE(pauseCheckpointThread);
+
+} // namespace
+
+Checkpointer* Checkpointer::get(ServiceContext* serviceCtx) {
+ return getCheckpointer(serviceCtx).get();
+}
+
+Checkpointer* Checkpointer::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+void Checkpointer::set(ServiceContext* serviceCtx, std::unique_ptr<Checkpointer> newCheckpointer) {
+ auto& checkpointer = getCheckpointer(serviceCtx);
+ if (checkpointer) {
+ invariant(!checkpointer->running(),
+ "Tried to reset the Checkpointer without shutting down the original instance.");
+ }
+ checkpointer = std::move(newCheckpointer);
+}
+
+void Checkpointer::run() {
+ ThreadClient tc(name(), getGlobalServiceContext());
+ LOGV2_DEBUG(22307, 1, "Starting thread", "threadName"_attr = name());
+
+ while (true) {
+ auto opCtx = tc->makeOperationContext();
+
+ {
+ stdx::unique_lock<Latch> lock(_mutex);
+ MONGO_IDLE_THREAD_BLOCK;
+
+ // Wait for 'storageGlobalParams.checkpointDelaySecs' seconds; or until either shutdown
+ // is signaled or a checkpoint is triggered.
+ _sleepCV.wait_for(lock,
+ stdx::chrono::seconds(static_cast<std::int64_t>(
+ storageGlobalParams.checkpointDelaySecs)),
+ [&] { return _shuttingDown || _triggerCheckpoint; });
+
+ // If the checkpointDelaySecs is set to 0, that means we should skip checkpointing.
+ // However, checkpointDelaySecs is adjustable by a runtime server parameter, so we
+ // need to wake up to check periodically. The wakeup to check period is arbitrary.
+ while (storageGlobalParams.checkpointDelaySecs == 0 && !_shuttingDown &&
+ !_triggerCheckpoint) {
+ _sleepCV.wait_for(lock, stdx::chrono::seconds(static_cast<std::int64_t>(3)), [&] {
+ return _shuttingDown || _triggerCheckpoint;
+ });
+ }
+
+ if (_shuttingDown) {
+ invariant(!_shutdownReason.isOK());
+ LOGV2_DEBUG(22309,
+ 1,
+ "Stopping thread",
+ "threadName"_attr = name(),
+ "reason"_attr = _shutdownReason);
+ return;
+ }
+
+ // Clear the trigger so we do not immediately checkpoint again after this.
+ _triggerCheckpoint = false;
+ }
+
+ pauseCheckpointThread.pauseWhileSet();
+
+ const Date_t startTime = Date_t::now();
+
+ // TODO SERVER-50861: Access the storage engine via the ServiceContext.
+ _kvEngine->checkpoint();
+
+ const auto secondsElapsed = durationCount<Seconds>(Date_t::now() - startTime);
+ if (secondsElapsed >= 30) {
+ LOGV2_DEBUG(22308,
+ 1,
+ "Checkpoint was slow to complete",
+ "secondsElapsed"_attr = secondsElapsed);
+ }
+ }
+}
+
+void Checkpointer::triggerFirstStableCheckpoint(Timestamp prevStable,
+ Timestamp initialData,
+ Timestamp currStable) {
+ stdx::unique_lock<Latch> lock(_mutex);
+ invariant(!_hasTriggeredFirstStableCheckpoint);
+ if (prevStable < initialData && currStable >= initialData) {
+ LOGV2(22310,
+ "Triggering the first stable checkpoint",
+ "initialDataTimestamp"_attr = initialData,
+ "prevStableTimestamp"_attr = prevStable,
+ "currStableTimestamp"_attr = currStable);
+ _hasTriggeredFirstStableCheckpoint = true;
+ _triggerCheckpoint = true;
+ _sleepCV.notify_one();
+ }
+}
+
+bool Checkpointer::hasTriggeredFirstStableCheckpoint() {
+ stdx::unique_lock<Latch> lock(_mutex);
+ return _hasTriggeredFirstStableCheckpoint;
+}
+
+void Checkpointer::shutdown(const Status& reason) {
+ LOGV2(22322, "Shutting down checkpoint thread");
+
+ {
+ stdx::unique_lock<Latch> lock(_mutex);
+ _shuttingDown = true;
+ _shutdownReason = reason;
+
+ // Wake up the checkpoint thread early, to take a final checkpoint before shutting down, if
+ // one has not coincidentally just been taken.
+ _sleepCV.notify_one();
+ }
+
+ wait();
+ LOGV2(22323, "Finished shutting down checkpoint thread");
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/checkpointer.h b/src/mongo/db/storage/checkpointer.h
new file mode 100644
index 00000000000..6c50974c2ba
--- /dev/null
+++ b/src/mongo/db/storage/checkpointer.h
@@ -0,0 +1,114 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/background.h"
+
+namespace mongo {
+
+class KVEngine;
+class OperationContext;
+class ServiceContext;
+class Timestamp;
+
+class Checkpointer : public BackgroundJob {
+public:
+ Checkpointer(KVEngine* kvEngine)
+ : BackgroundJob(false /* deleteSelf */),
+ _kvEngine(kvEngine),
+ _shuttingDown(false),
+ _shutdownReason(Status::OK()),
+ _hasTriggeredFirstStableCheckpoint(false),
+ _triggerCheckpoint(false) {}
+
+ static Checkpointer* get(ServiceContext* serviceCtx);
+ static Checkpointer* get(OperationContext* opCtx);
+ static void set(ServiceContext* serviceCtx, std::unique_ptr<Checkpointer> newCheckpointer);
+
+ std::string name() const override {
+ return "Checkpointer";
+ }
+
+ /**
+ * Starts the checkpoint thread that runs every storageGlobalParams.checkpointDelaySecs seconds.
+ */
+ void run() override;
+
+ /**
+ * Triggers taking the first stable checkpoint if the stable timestamp has advanced past the
+ * initial data timestamp.
+ *
+ * The checkpoint thread runs automatically every storageGlobalParams.checkpointDelaySecs
+ * seconds. This function avoids potentially waiting that full duration for a stable checkpoint,
+ * initiating one immediately.
+ *
+ * Do not call this function if hasTriggeredFirstStableCheckpoint() returns true.
+ */
+ void triggerFirstStableCheckpoint(Timestamp prevStable,
+ Timestamp initialData,
+ Timestamp currStable);
+
+ /**
+ * Returns whether the first stable checkpoint has already been triggered.
+ */
+ bool hasTriggeredFirstStableCheckpoint();
+
+ /**
+ * Blocks until the checkpoint thread has been fully shutdown.
+ */
+ void shutdown(const Status& reason);
+
+private:
+ // A pointer to the KVEngine is maintained only due to unit testing limitations that don't fully
+ // setup the ServiceContext.
+ // TODO SERVER-50861: Remove this pointer.
+ KVEngine* const _kvEngine;
+
+ // Protects the state below.
+ Mutex _mutex = MONGO_MAKE_LATCH("Checkpointer::_mutex");
+
+ // The checkpoint thread idles on this condition variable for a particular time duration between
+ // taking checkpoints. It can be triggered early to expedite either: immediate checkpointing if
+ // _triggerCheckpoint is set; or shutdown cleanup if _shuttingDown is set.
+ stdx::condition_variable _sleepCV;
+
+ bool _shuttingDown;
+ Status _shutdownReason;
+
+ // This flag ensures the first stable checkpoint is only triggered once.
+ bool _hasTriggeredFirstStableCheckpoint;
+
+ // This flag allows the checkpoint thread to wake up early when _sleepCV is signaled.
+ bool _triggerCheckpoint;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/control/storage_control.cpp b/src/mongo/db/storage/control/storage_control.cpp
index f0b7e7d825f..50213d44dfc 100644
--- a/src/mongo/db/storage/control/storage_control.cpp
+++ b/src/mongo/db/storage/control/storage_control.cpp
@@ -35,7 +35,9 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/checkpointer.h"
#include "mongo/db/storage/control/journal_flusher.h"
+#include "mongo/db/storage/storage_options.h"
#include "mongo/logv2/log.h"
namespace mongo {
@@ -73,12 +75,25 @@ void startStorageControls(ServiceContext* serviceContext, bool forTestOnly) {
journalFlusher->go();
JournalFlusher::set(serviceContext, std::move(journalFlusher));
+ if (storageEngine->supportsCheckpoints() && !storageEngine->isEphemeral() &&
+ !storageGlobalParams.readOnly) {
+ std::unique_ptr<Checkpointer> checkpointer =
+ std::make_unique<Checkpointer>(storageEngine->getEngine());
+ checkpointer->go();
+ Checkpointer::set(serviceContext, std::move(checkpointer));
+ }
+
areControlsStarted = true;
}
void stopStorageControls(ServiceContext* serviceContext, const Status& reason) {
if (areControlsStarted) {
JournalFlusher::get(serviceContext)->shutdown(reason);
+
+ auto checkpointer = Checkpointer::get(serviceContext);
+ if (checkpointer) {
+ checkpointer->shutdown(reason);
+ }
}
}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h
index b3da8bb0085..fd243b0c8c1 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_kv_engine.h
@@ -173,6 +173,10 @@ public:
Timestamp getOldestTimestamp() const override;
+ Timestamp getStableTimestamp() const override {
+ return Timestamp();
+ }
+
void setOldestTimestamp(Timestamp newOldestTimestamp, bool force) override;
std::map<Timestamp, std::shared_ptr<StringStore>> getHistory_forTest();
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 46dad070544..6c8c67df3c4 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -53,18 +53,6 @@ class SnapshotManager;
class KVEngine {
public:
/**
- * This function should only be called after the StorageEngine is set on the ServiceContext.
- *
- * Starts asycnhronous threads for a storage engine's integration layer. Any such thread
- * generating an OperationContext should be initialized here.
- *
- * In order for OperationContexts to be generated with real Locker objects, the generation must
- * occur after the StorageEngine is instantiated and set on the ServiceContext. Otherwise,
- * OperationContexts are created with LockerNoops.
- */
- virtual void startAsyncThreads() {}
-
- /**
* During the startup process, the storage engine is one of the first components to be started
* up and fully initialized. But that fully initialized storage engine may not be recognized as
* the end for the remaining storage startup tasks that still need to be performed.
@@ -275,6 +263,8 @@ public:
return false;
}
+ virtual void checkpoint() {}
+
virtual bool isDurable() const = 0;
/**
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 72c09e125b6..edf31b874fe 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -473,6 +473,12 @@ public:
std::shared_ptr<Ident> ident) = 0;
/**
+ * Called when the checkpoint thread instructs the storage engine to take a checkpoint. The
+ * underlying storage engine must take a checkpoint at this point.
+ */
+ virtual void checkpoint() = 0;
+
+ /**
* Recovers the storage engine state to the last stable timestamp. "Stable" in this case
* refers to a timestamp that is guaranteed to never be rolled back. The stable timestamp
* used should be one provided by StorageEngine::setStableTimestamp().
@@ -517,6 +523,11 @@ public:
virtual void setStableTimestamp(Timestamp stableTimestamp, bool force = false) = 0;
/**
+ * Returns the stable timestamp.
+ */
+ virtual Timestamp getStableTimestamp() const = 0;
+
+ /**
* Tells the storage engine the timestamp of the data at startup. This is necessary because
* timestamps are not persisted in the storage layer.
*/
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index e8efa8ce88d..22c82a09eba 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -670,8 +670,6 @@ void StorageEngineImpl::finishInit() {
// A storage engine may need to start threads that require OperationsContexts with real Lockers,
// as opposed to LockerNoops. Placing the start logic here, after the StorageEngine has been
// instantiated, causes makeOperationContext() to create LockerImpls instead of LockerNoops.
- _engine->startAsyncThreads();
-
if (_engine->supportsRecoveryTimestamp()) {
_timestampMonitor = std::make_unique<TimestampMonitor>(
_engine.get(), getGlobalServiceContext()->getPeriodicRunner());
@@ -893,6 +891,10 @@ void StorageEngineImpl::setStableTimestamp(Timestamp stableTimestamp, bool force
_engine->setStableTimestamp(stableTimestamp, force);
}
+Timestamp StorageEngineImpl::getStableTimestamp() const {
+ return _engine->getStableTimestamp();
+}
+
void StorageEngineImpl::setInitialDataTimestamp(Timestamp initialDataTimestamp) {
_engine->setInitialDataTimestamp(initialDataTimestamp);
}
@@ -1033,6 +1035,10 @@ void StorageEngineImpl::addDropPendingIdent(const Timestamp& dropTimestamp,
_dropPendingIdentReaper.addDropPendingIdent(dropTimestamp, nss, ident);
}
+void StorageEngineImpl::checkpoint() {
+ _engine->checkpoint();
+}
+
void StorageEngineImpl::_onMinOfCheckpointAndOldestTimestampChanged(const Timestamp& timestamp) {
if (timestamp.isNull()) {
return;
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index 7d71e5de128..fed128f9b59 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -123,6 +123,8 @@ public:
virtual void setStableTimestamp(Timestamp stableTimestamp, bool force = false) override;
+ virtual Timestamp getStableTimestamp() const override;
+
virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) override;
virtual Timestamp getInitialDataTimestamp() const override;
@@ -315,6 +317,8 @@ public:
const NamespaceString& nss,
std::shared_ptr<Ident> ident) override;
+ void checkpoint() override;
+
DurableCatalog* getCatalog() override {
return _catalog.get();
}
diff --git a/src/mongo/db/storage/storage_engine_mock.h b/src/mongo/db/storage/storage_engine_mock.h
index 5c7078e2a2d..96eb8020b1d 100644
--- a/src/mongo/db/storage/storage_engine_mock.h
+++ b/src/mongo/db/storage/storage_engine_mock.h
@@ -138,6 +138,9 @@ public:
MONGO_UNREACHABLE;
}
void setStableTimestamp(Timestamp stableTimestamp, bool force = false) final {}
+ Timestamp getStableTimestamp() const override {
+ return Timestamp();
+ }
void setInitialDataTimestamp(Timestamp timestamp) final {}
Timestamp getInitialDataTimestamp() const override {
return Timestamp();
@@ -172,6 +175,7 @@ public:
void addDropPendingIdent(const Timestamp& dropTimestamp,
const NamespaceString& nss,
std::shared_ptr<Ident> ident) final {}
+ void checkpoint() final {}
Status currentFilesCompatible(OperationContext* opCtx) const final {
return Status::OK();
}
diff --git a/src/mongo/db/storage/storage_options.cpp b/src/mongo/db/storage/storage_options.cpp
index 7ba94afde29..431698a807d 100644
--- a/src/mongo/db/storage/storage_options.cpp
+++ b/src/mongo/db/storage/storage_options.cpp
@@ -58,6 +58,7 @@ void StorageGlobalParams::reset() {
oplogMinRetentionHours.store(0.0);
allowOplogTruncation = true;
disableLockFreeReads = true;
+ checkpointDelaySecs = 0;
}
StorageGlobalParams storageGlobalParams;
diff --git a/src/mongo/db/storage/storage_options.h b/src/mongo/db/storage/storage_options.h
index f6284a06244..e7fe5331f96 100644
--- a/src/mongo/db/storage/storage_options.h
+++ b/src/mongo/db/storage/storage_options.h
@@ -123,6 +123,10 @@ struct StorageGlobalParams {
// settings with which lock-free reads are incompatible: standalone mode; and
// enableMajorityReadConcern=false.
bool disableLockFreeReads;
+
+ // Delay in seconds between triggering the next checkpoint after the completion of the previous
+ // one. A value of 0 indicates that checkpointing will be skipped.
+ size_t checkpointDelaySecs;
};
extern StorageGlobalParams storageGlobalParams;
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
index 0cf7d92ce08..5d24feec685 100644
--- a/src/mongo/db/storage/wiredtiger/SConscript
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -139,6 +139,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/service_context_d',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
+ '$BUILD_DIR/mongo/db/storage/checkpointer',
'$BUILD_DIR/mongo/db/storage/durable_catalog_impl',
'$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness',
'$BUILD_DIR/mongo/db/storage/recovery_unit_test_harness',
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp
index d7bba3ee94d..8149bab8757 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.cpp
@@ -43,11 +43,6 @@ WiredTigerGlobalOptions wiredTigerGlobalOptions;
Status WiredTigerGlobalOptions::store(const moe::Environment& params) {
// WiredTiger storage engine options
- if (params.count("storage.syncPeriodSecs")) {
- wiredTigerGlobalOptions.checkpointDelaySecs =
- static_cast<size_t>(params["storage.syncPeriodSecs"].as<double>());
- }
-
if (!wiredTigerGlobalOptions.engineConfig.empty()) {
LOGV2(22293,
"Engine custom option: {wiredTigerGlobalOptions_engineConfig}",
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h
index 21d4c522f3b..51546164c39 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_global_options.h
@@ -40,7 +40,6 @@ class WiredTigerGlobalOptions {
public:
WiredTigerGlobalOptions()
: cacheSizeGB(0),
- checkpointDelaySecs(0),
statisticsLogDelaySecs(0),
directoryForIndexes(false),
maxCacheOverflowFileSizeGBDeprecated(0),
@@ -50,7 +49,6 @@ public:
Status store(const optionenvironment::Environment& params);
double cacheSizeGB;
- size_t checkpointDelaySecs;
size_t statisticsLogDelaySecs;
std::string journalCompressor;
bool directoryForIndexes;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 1553c1740fe..f169f952e05 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -119,8 +119,6 @@ namespace {
MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely);
MONGO_FAIL_POINT_DEFINE(WTSetOldestTSToStableTS);
-MONGO_FAIL_POINT_DEFINE(pauseCheckpointThread);
-
} // namespace
bool WiredTigerFileVersion::shouldDowngrade(bool readOnly,
@@ -255,231 +253,6 @@ std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult
}
}
-class WiredTigerKVEngine::WiredTigerCheckpointThread : public BackgroundJob {
-public:
- explicit WiredTigerCheckpointThread(WiredTigerKVEngine* wiredTigerKVEngine,
- WiredTigerSessionCache* sessionCache)
- : BackgroundJob(false /* deleteSelf */),
- _wiredTigerKVEngine(wiredTigerKVEngine),
- _sessionCache(sessionCache) {}
-
- virtual string name() const {
- return "WTCheckpointThread";
- }
-
- virtual void run() {
- ThreadClient tc(name(), getGlobalServiceContext());
- LOGV2_DEBUG(22307, 1, "Starting thread", "threadName"_attr = name());
-
- while (true) {
- auto opCtx = tc->makeOperationContext();
-
- {
- stdx::unique_lock<Latch> lock(_mutex);
- MONGO_IDLE_THREAD_BLOCK;
-
- // Wait for 'wiredTigerGlobalOptions.checkpointDelaySecs' seconds; or until either
- // shutdown is signaled or a checkpoint is triggered.
- _condvar.wait_for(lock,
- stdx::chrono::seconds(static_cast<std::int64_t>(
- wiredTigerGlobalOptions.checkpointDelaySecs)),
- [&] { return _shuttingDown || _triggerCheckpoint; });
-
- // If the checkpointDelaySecs is set to 0, that means we should skip checkpointing.
- // However, checkpointDelaySecs is adjustable by a runtime server parameter, so we
- // need to wake up to check periodically. The wakeup to check period is arbitrary.
- while (wiredTigerGlobalOptions.checkpointDelaySecs == 0 && !_shuttingDown &&
- !_triggerCheckpoint) {
- _condvar.wait_for(lock,
- stdx::chrono::seconds(static_cast<std::int64_t>(3)),
- [&] { return _shuttingDown || _triggerCheckpoint; });
- }
-
- if (_shuttingDown) {
- LOGV2_DEBUG(22309, 1, "Stopping thread", "threadName"_attr = name());
- return;
- }
-
- // Clear the trigger so we do not immediately checkpoint again after this.
- _triggerCheckpoint = false;
- }
-
- pauseCheckpointThread.pauseWhileSet();
-
- const Date_t startTime = Date_t::now();
-
- const Timestamp stableTimestamp = _wiredTigerKVEngine->getStableTimestamp();
- const Timestamp initialDataTimestamp = _wiredTigerKVEngine->getInitialDataTimestamp();
-
- // The amount of oplog to keep is primarily dictated by a user setting. However, in
- // unexpected cases, durable, recover to a timestamp storage engines may need to play
- // forward from an oplog entry that would otherwise be truncated by the user
- // setting. Furthermore, the entries in prepared or large transactions can refer to
- // previous entries in the same transaction.
- //
- // Live (replication) rollback will replay oplogs from exactly the stable timestamp.
- // With prepared or large transactions, it may require some additional entries prior to
- // the stable timestamp. These requirements are summarized in getOplogNeededForRollback.
- // Truncating the oplog at this point is sufficient for in-memory configurations, but
- // could cause an unrecoverable scenario if the node crashed and has to play from the
- // last stable checkpoint.
- //
- // By recording the oplog needed for rollback "now", then taking a stable checkpoint,
- // we can safely assume that the oplog needed for crash recovery has caught up to the
- // recorded value. After the checkpoint, this value will be published such that actors
- // which truncate the oplog can read an updated value.
- try {
- // Three cases:
- //
- // First, initialDataTimestamp is Timestamp(0, 1) -> Take full checkpoint. This is
- // when there is no consistent view of the data (i.e: during initial sync).
- //
- // Second, stableTimestamp < initialDataTimestamp: Skip checkpoints. The data on
- // disk is prone to being rolled back. Hold off on checkpoints. Hope that the
- // stable timestamp surpasses the data on disk, allowing storage to persist newer
- // copies to disk.
- //
- // Third, stableTimestamp >= initialDataTimestamp: Take stable checkpoint. Steady
- // state case.
- if (initialDataTimestamp.asULL() <= 1) {
- UniqueWiredTigerSession session = _sessionCache->getSession();
- WT_SESSION* s = session->getSession();
- invariantWTOK(s->checkpoint(s, "use_timestamp=false"));
- } else if (stableTimestamp < initialDataTimestamp) {
- LOGV2_FOR_RECOVERY(
- 23985,
- 2,
- "Stable timestamp is behind the initial data timestamp, skipping "
- "a checkpoint. StableTimestamp: {stableTimestamp} InitialDataTimestamp: "
- "{initialDataTimestamp}",
- "stableTimestamp"_attr = stableTimestamp.toString(),
- "initialDataTimestamp"_attr = initialDataTimestamp.toString());
- } else {
- auto oplogNeededForRollback = _wiredTigerKVEngine->getOplogNeededForRollback();
-
- LOGV2_FOR_RECOVERY(
- 23986,
- 2,
- "Performing stable checkpoint. StableTimestamp: {stableTimestamp}, "
- "OplogNeededForRollback: {oplogNeededForRollback}",
- "stableTimestamp"_attr = stableTimestamp,
- "oplogNeededForRollback"_attr = toString(oplogNeededForRollback));
-
- UniqueWiredTigerSession session = _sessionCache->getSession();
- WT_SESSION* s = session->getSession();
- invariantWTOK(s->checkpoint(s, "use_timestamp=true"));
-
- if (oplogNeededForRollback.isOK()) {
- // Now that the checkpoint is durable, publish the oplog needed to recover
- // from it.
- stdx::lock_guard<Latch> lk(_oplogNeededForCrashRecoveryMutex);
- _oplogNeededForCrashRecovery.store(
- oplogNeededForRollback.getValue().asULL());
- }
- }
-
- const auto secondsElapsed = durationCount<Seconds>(Date_t::now() - startTime);
- if (secondsElapsed >= 30) {
- LOGV2_DEBUG(22308,
- 1,
- "Checkpoint took {secondsElapsed} seconds to complete.",
- "secondsElapsed"_attr = secondsElapsed);
- }
- } catch (const WriteConflictException&) {
- // Temporary: remove this after WT-3483
- LOGV2_WARNING(22346, "Checkpoint encountered a write conflict exception.");
- } catch (const AssertionException& exc) {
- invariant(ErrorCodes::isShutdownError(exc.code()), exc.what());
- }
- }
- }
-
- /**
- * Returns true if we have already triggered taking the first checkpoint.
- */
- bool hasTriggeredFirstStableCheckpoint() {
- stdx::unique_lock<Latch> lock(_mutex);
- return _hasTriggeredFirstStableCheckpoint;
- }
-
- /**
- * Triggers taking the first stable checkpoint, which is when the stable timestamp advances past
- * the initial data timestamp.
- *
- * The checkpoint thread runs automatically every wiredTigerGlobalOptions.checkpointDelaySecs
- * seconds. This function avoids potentially waiting that full duration for a stable checkpoint,
- * initiating one immediately.
- *
- * Do not call this function if hasTriggeredFirstStableCheckpoint() returns true.
- */
- void triggerFirstStableCheckpoint(Timestamp prevStable,
- Timestamp initialData,
- Timestamp currStable) {
- stdx::unique_lock<Latch> lock(_mutex);
- invariant(!_hasTriggeredFirstStableCheckpoint);
- if (prevStable < initialData && currStable >= initialData) {
- LOGV2(22310,
- "Triggering the first stable checkpoint. Initial Data: {initialData} PrevStable: "
- "{prevStable} CurrStable: {currStable}",
- "Triggering the first stable checkpoint",
- "initialData"_attr = initialData,
- "prevStable"_attr = prevStable,
- "currStable"_attr = currStable);
- _hasTriggeredFirstStableCheckpoint = true;
- _triggerCheckpoint = true;
- _condvar.notify_one();
- }
- }
-
- std::uint64_t getOplogNeededForCrashRecovery() const {
- return _oplogNeededForCrashRecovery.load();
- }
-
- /*
- * Atomically assign _oplogNeededForCrashRecovery to a variable.
- * _oplogNeededForCrashRecovery will not change during assignment.
- */
- void assignOplogNeededForCrashRecoveryTo(boost::optional<Timestamp>* timestamp) {
- stdx::lock_guard<Latch> lk(_oplogNeededForCrashRecoveryMutex);
- *timestamp = Timestamp(_oplogNeededForCrashRecovery.load());
- }
-
- void shutdown() {
- {
- stdx::unique_lock<Latch> lock(_mutex);
- _shuttingDown = true;
- // Wake up the checkpoint thread early, to take a final checkpoint before shutting
- // down, if one has not coincidentally just been taken.
- _condvar.notify_one();
- }
- wait();
- }
-
-private:
- WiredTigerKVEngine* _wiredTigerKVEngine;
- WiredTigerSessionCache* _sessionCache;
-
- Mutex _oplogNeededForCrashRecoveryMutex =
- MONGO_MAKE_LATCH("WiredTigerCheckpointThread::_oplogNeededForCrashRecoveryMutex");
- AtomicWord<std::uint64_t> _oplogNeededForCrashRecovery;
-
- // Protects the state below.
- Mutex _mutex = MONGO_MAKE_LATCH("WiredTigerCheckpointThread::_mutex");
-
- // The checkpoint thread idles on this condition variable for a particular time duration between
- // taking checkpoints. It can be triggered early to expedite either: immediate checkpointing if
- // _triggerCheckpoint is set; or shutdown cleanup if _shuttingDown is set.
- stdx::condition_variable _condvar;
-
- bool _shuttingDown = false;
-
- // This flag ensures the first stable checkpoint is only triggered once.
- bool _hasTriggeredFirstStableCheckpoint = false;
-
- // This flag allows the checkpoint thread to wake up early when _condvar is signaled.
- bool _triggerCheckpoint = false;
-};
-
namespace {
TicketHolder openWriteTransaction(128);
TicketHolder openReadTransaction(128);
@@ -759,16 +532,6 @@ WiredTigerKVEngine::~WiredTigerKVEngine() {
_sessionCache.reset(nullptr);
}
-void WiredTigerKVEngine::startAsyncThreads() {
- if (!_ephemeral) {
- if (!_readOnly) {
- _checkpointThread =
- std::make_unique<WiredTigerCheckpointThread>(this, _sessionCache.get());
- _checkpointThread->go();
- }
- }
-}
-
void WiredTigerKVEngine::notifyStartupComplete() {
WiredTigerUtil::notifyStartupComplete();
}
@@ -898,11 +661,6 @@ void WiredTigerKVEngine::cleanShutdown() {
_sessionSweeper->shutdown();
LOGV2(22319, "Finished shutting down session sweeper thread");
}
- if (_checkpointThread) {
- LOGV2(22322, "Shutting down checkpoint thread");
- _checkpointThread->shutdown();
- LOGV2(22323, "Finished shutting down checkpoint thread");
- }
LOGV2_FOR_RECOVERY(23988,
2,
"Shutdown timestamps.",
@@ -1385,7 +1143,7 @@ WiredTigerKVEngine::beginNonBlockingBackup(OperationContext* opCtx,
// Oplog truncation thread won't remove oplog since the checkpoint pinned by the backup cursor.
stdx::lock_guard<Latch> lock(_oplogPinnedByBackupMutex);
- _checkpointThread->assignOplogNeededForCrashRecoveryTo(&_oplogPinnedByBackup);
+ _oplogPinnedByBackup = Timestamp(_oplogNeededForCrashRecovery.load());
auto pinOplogGuard = makeGuard([&] { _oplogPinnedByBackup = boost::none; });
// Persist the sizeStorer information to disk before opening the backup cursor. We aren't
@@ -1907,6 +1665,74 @@ bool WiredTigerKVEngine::supportsDirectoryPerDB() const {
return true;
}
+void WiredTigerKVEngine::checkpoint() {
+ const Timestamp stableTimestamp = getStableTimestamp();
+ const Timestamp initialDataTimestamp = getInitialDataTimestamp();
+
+ // The amount of oplog to keep is primarily dictated by a user setting. However, in unexpected
+ // cases, durable, recover to a timestamp storage engines may need to play forward from an oplog
+ // entry that would otherwise be truncated by the user setting. Furthermore, the entries in
+ // prepared or large transactions can refer to previous entries in the same transaction.
+ //
+ // Live (replication) rollback will replay the oplog from exactly the stable timestamp. With
+ // prepared or large transactions, it may require some additional entries prior to the stable
+ // timestamp. These requirements are summarized in getOplogNeededForRollback. Truncating the
+ // oplog at this point is sufficient for in-memory configurations, but could cause an
+ // unrecoverable scenario if the node crashed and has to play from the last stable checkpoint.
+ //
+ // By recording the oplog needed for rollback "now", then taking a stable checkpoint, we can
+ // safely assume that the oplog needed for crash recovery has caught up to the recorded value.
+ // After the checkpoint, this value will be published such that actors which truncate the oplog
+ // can read an updated value.
+ try {
+ // Three cases:
+ //
+ // First, initialDataTimestamp is Timestamp(0, 1) -> Take full checkpoint. This is when
+ // there is no consistent view of the data (i.e: during initial sync).
+ //
+ // Second, stableTimestamp < initialDataTimestamp: Skip checkpoints. The data on disk is
+ // prone to being rolled back. Hold off on checkpoints. Hope that the stable timestamp
+ // surpasses the data on disk, allowing storage to persist newer copies to disk.
+ //
+ // Third, stableTimestamp >= initialDataTimestamp: Take stable checkpoint. Steady state
+ // case.
+ if (initialDataTimestamp.asULL() <= 1) {
+ UniqueWiredTigerSession session = _sessionCache->getSession();
+ WT_SESSION* s = session->getSession();
+ invariantWTOK(s->checkpoint(s, "use_timestamp=false"));
+ } else if (stableTimestamp < initialDataTimestamp) {
+ LOGV2_FOR_RECOVERY(
+ 23985,
+ 2,
+ "Stable timestamp is behind the initial data timestamp, skipping a checkpoint.",
+ "stableTimestamp"_attr = stableTimestamp.toString(),
+ "initialDataTimestamp"_attr = initialDataTimestamp.toString());
+ } else {
+ auto oplogNeededForRollback = getOplogNeededForRollback();
+
+ LOGV2_FOR_RECOVERY(23986,
+ 2,
+ "Performing stable checkpoint.",
+ "stableTimestamp"_attr = stableTimestamp,
+ "oplogNeededForRollback"_attr = toString(oplogNeededForRollback));
+
+ UniqueWiredTigerSession session = _sessionCache->getSession();
+ WT_SESSION* s = session->getSession();
+ invariantWTOK(s->checkpoint(s, "use_timestamp=true"));
+
+ if (oplogNeededForRollback.isOK()) {
+ // Now that the checkpoint is durable, publish the oplog needed to recover from it.
+ _oplogNeededForCrashRecovery.store(oplogNeededForRollback.getValue().asULL());
+ }
+ }
+ } catch (const WriteConflictException&) {
+ // TODO SERVER-50824: Check if this can be removed now that WT-3483 is done.
+ LOGV2_WARNING(22346, "Checkpoint encountered a write conflict exception.");
+ } catch (const AssertionException& exc) {
+ invariant(ErrorCodes::isShutdownError(exc.code()), exc.what());
+ }
+}
+
bool WiredTigerKVEngine::hasIdent(OperationContext* opCtx, StringData ident) const {
return _hasUri(WiredTigerRecoveryUnit::get(opCtx)->getSession()->getSession(), _uri(ident));
}
@@ -2045,10 +1871,6 @@ void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, bool forc
// After publishing a stable timestamp to WT, we can record the updated stable timestamp value
// for the necessary oplog to keep.
_stableTimestamp.store(stableTimestamp.asULL());
- if (_checkpointThread && !_checkpointThread->hasTriggeredFirstStableCheckpoint()) {
- _checkpointThread->triggerFirstStableCheckpoint(
- prevStable, Timestamp(_initialDataTimestamp.load()), stableTimestamp);
- }
// If 'force' is set, then we have already set the oldest timestamp equal to the stable
// timestamp, so there is nothing left to do.
@@ -2193,13 +2015,6 @@ StatusWith<Timestamp> WiredTigerKVEngine::recoverToStableTimestamp(OperationCont
23989, 2, "WiredTiger::RecoverToStableTimestamp syncing size storer to disk.");
syncSizeInfo(true);
- if (!_ephemeral) {
- LOGV2_FOR_ROLLBACK(
- 23990, 2, "WiredTiger::RecoverToStableTimestamp shutting down checkpoint thread.");
- // Shutdown WiredTigerKVEngine owned accesses into the storage engine.
- _checkpointThread->shutdown();
- }
-
const Timestamp stableTimestamp(_stableTimestamp.load());
const Timestamp initialDataTimestamp(_initialDataTimestamp.load());
@@ -2216,11 +2031,6 @@ StatusWith<Timestamp> WiredTigerKVEngine::recoverToStableTimestamp(OperationCont
str::stream() << "Error rolling back to stable. Err: " << wiredtiger_strerror(ret)};
}
- if (!_ephemeral) {
- _checkpointThread = std::make_unique<WiredTigerCheckpointThread>(this, _sessionCache.get());
- _checkpointThread->go();
- }
-
_sizeStorer = std::make_unique<WiredTigerSizeStorer>(_conn, _sizeStorerUri, _readOnly);
return {stableTimestamp};
@@ -2345,7 +2155,7 @@ boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery()
return boost::none;
}
- return Timestamp(_checkpointThread->getOplogNeededForCrashRecovery());
+ return Timestamp(_oplogNeededForCrashRecovery.load());
}
Timestamp WiredTigerKVEngine::getPinnedOplog() const {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 9327ae7454f..bfd539e7815 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -103,8 +103,6 @@ public:
~WiredTigerKVEngine();
- void startAsyncThreads() override;
-
void notifyStartupComplete() override;
void setRecordStoreExtraOptions(const std::string& options);
@@ -119,6 +117,8 @@ public:
return !isEphemeral();
}
+ void checkpoint() override;
+
bool isDurable() const override {
return _durable;
}
@@ -369,7 +369,6 @@ public:
private:
class WiredTigerSessionSweeper;
- class WiredTigerCheckpointThread;
/**
* Opens a connection on the WiredTiger database 'path' with the configuration 'wtOpenConfig'.
@@ -458,7 +457,6 @@ private:
const bool _keepDataHistory = true;
std::unique_ptr<WiredTigerSessionSweeper> _sessionSweeper;
- std::unique_ptr<WiredTigerCheckpointThread> _checkpointThread;
std::string _rsOptions;
std::string _indexOptions;
@@ -485,6 +483,8 @@ private:
// timestamp. Provided by replication layer because WT does not persist timestamps.
AtomicWord<std::uint64_t> _initialDataTimestamp;
+ AtomicWord<std::uint64_t> _oplogNeededForCrashRecovery;
+
std::unique_ptr<WiredTigerEngineRuntimeConfigParameter> _runTimeConfigParam;
mutable Mutex _highestDurableTimestampMutex =
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
index b870c017798..2580960a76c 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
@@ -43,7 +43,7 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_test_fixture.h"
-#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h"
+#include "mongo/db/storage/checkpointer.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/logv2/log.h"
@@ -82,19 +82,16 @@ public:
private:
std::unique_ptr<WiredTigerKVEngine> makeEngine() {
- auto engine = std::make_unique<WiredTigerKVEngine>(kWiredTigerEngineName,
- _dbpath.path(),
- _cs.get(),
- "",
- 1,
- 0,
- false,
- false,
- _forRepair,
- false);
- // There are unit tests expecting checkpoints to occur asynchronously.
- engine->startAsyncThreads();
- return engine;
+ return std::make_unique<WiredTigerKVEngine>(kWiredTigerEngineName,
+ _dbpath.path(),
+ _cs.get(),
+ "",
+ 1,
+ 0,
+ false,
+ false,
+ _forRepair,
+ false);
}
const std::unique_ptr<ClockSource> _cs = std::make_unique<ClockSourceMock>();
@@ -246,6 +243,9 @@ TEST_F(WiredTigerKVEngineRepairTest, UnrecoverableOrphanedDataFilesAreRebuilt) {
}
TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) {
+ std::unique_ptr<Checkpointer> checkpointer = std::make_unique<Checkpointer>(_engine);
+ checkpointer->go();
+
auto opCtxPtr = makeOperationContext();
// The initial data timestamp has to be set to take stable checkpoints. The first stable
// timestamp greater than this will also trigger a checkpoint. The following loop of the
@@ -262,7 +262,7 @@ TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) {
#endif
#endif
{
- wiredTigerGlobalOptions.checkpointDelaySecs = 1;
+ storageGlobalParams.checkpointDelaySecs = 1;
}
();
@@ -341,6 +341,8 @@ TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) {
_engine->setStableTimestamp(Timestamp(30, 1), false);
callbackShouldFail.store(false);
assertPinnedMovesSoon(Timestamp(40, 1));
+
+ checkpointer->shutdown({ErrorCodes::ShutdownInProgress, "Test finished"});
}
std::unique_ptr<KVHarnessHelper> makeHelper() {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
index 740672e7a2c..2dde320ceeb 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
@@ -61,13 +61,6 @@ public:
false, // .repair
false // .readOnly
) {
- // Deliberately not calling _engine->startAsyncThreads() because it starts an asynchronous
- // checkpointing thread that can interfere with unit tests manipulating checkpoints
- // manually.
- //
- // Alternatively, we would have to start using wiredTigerGlobalOptions.checkpointDelaySecs
- // to set a high enough value such that the async thread never runs during testing.
-
repl::ReplicationCoordinator::set(
getGlobalServiceContext(),
std::unique_ptr<repl::ReplicationCoordinator>(new repl::ReplicationCoordinatorMock(