diff options
Diffstat (limited to 'src/mongo/db')
22 files changed, 184 insertions, 95 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index a35ab54a2f1..bd9e2a9b75f 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -326,7 +326,6 @@ ExitCode _initAndListen(int listenPort) { // Set up the periodic runner for background job execution. This is required to be running // before the storage engine is initialized. auto runner = makePeriodicRunner(serviceContext); - runner->startup(); serviceContext->setPeriodicRunner(std::move(runner)); FlowControl::set(serviceContext, std::make_unique<FlowControl>( @@ -603,8 +602,8 @@ ExitCode _initAndListen(int listenPort) { // Only do this on storage engines supporting snapshot reads, which hold resources we wish to // release periodically in order to avoid storage cache pressure build up. if (storageEngine->supportsReadConcernSnapshot()) { - startPeriodicThreadToAbortExpiredTransactions(serviceContext); - startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(serviceContext); + PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->start(); + PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(serviceContext)->start(); } // Set up the logical session cache @@ -919,13 +918,12 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { flowControlTicketholder->setInShutdown(); } - // Shut down the background periodic task runner. This must be done before shutting down the - // storage engine. - if (auto runner = serviceContext->getPeriodicRunner()) { - runner->shutdown(); - } + if (auto storageEngine = serviceContext->getStorageEngine()) { + if (storageEngine->supportsReadConcernSnapshot()) { + PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->stop(); + PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(serviceContext)->stop(); + } - if (serviceContext->getStorageEngine()) { ServiceContext::UniqueOperationContext uniqueOpCtx; OperationContext* opCtx = client->getOperationContext(); if (!opCtx) { diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index 2617d9060c4..8afd9f6889f 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -41,7 +41,6 @@ #include "mongo/platform/atomic_word.h" #include "mongo/util/duration.h" #include "mongo/util/log.h" -#include "mongo/util/periodic_runner.h" #include "mongo/util/scopeguard.h" namespace mongo { diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp index f3fa239daac..0f55d053fb3 100644 --- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp +++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp @@ -58,13 +58,32 @@ Milliseconds getPeriod(const Argument& transactionLifetimeLimitSeconds) { return period; } + } // namespace -void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext) { - // Enforce calling this function once, and only once. - static bool firstCall = true; - invariant(firstCall); - firstCall = false; +auto PeriodicThreadToAbortExpiredTransactions::get(ServiceContext* serviceContext) + -> PeriodicThreadToAbortExpiredTransactions& { + auto& jobContainer = _serviceDecoration(serviceContext); + jobContainer._init(serviceContext); + + return jobContainer; +} + +auto PeriodicThreadToAbortExpiredTransactions::operator*() const noexcept -> PeriodicJobAnchor& { + stdx::lock_guard lk(_mutex); + return *_anchor; +} + +auto PeriodicThreadToAbortExpiredTransactions::operator-> () const noexcept -> PeriodicJobAnchor* { + stdx::lock_guard lk(_mutex); + return _anchor.get(); +} + +void PeriodicThreadToAbortExpiredTransactions::_init(ServiceContext* serviceContext) { + stdx::lock_guard lk(_mutex); + if (_anchor) { + return; + } auto periodicRunner = serviceContext->getPeriodicRunner(); invariant(periodicRunner); @@ -87,18 +106,17 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex }, getPeriod(gTransactionLifetimeLimitSeconds.load())); - auto handle = periodicRunner->makeJob(std::move(job)); - handle->start(); - - TransactionParticipant::observeTransactionLifetimeLimitSeconds - .addObserver([handle = std::move(handle)](const Argument& secs) { - try { - handle->setPeriod(getPeriod(secs)); - } catch (const DBException& ex) { - log() << "Failed to update period of thread which aborts expired transactions " - << ex.toStatus(); - } - }); + _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job))); + + TransactionParticipant::observeTransactionLifetimeLimitSeconds.addObserver([anchor = _anchor]( + const Argument& secs) { + try { + anchor->setPeriod(getPeriod(secs)); + } catch (const DBException& ex) { + log() << "Failed to update period of thread which aborts expired transactions " + << ex.toStatus(); + } + }); } } // namespace mongo diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h index 6f07e37c247..88bf08d7ee5 100644 --- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h +++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h @@ -29,18 +29,34 @@ #pragma once -namespace mongo { +#include <memory> + +#include "mongo/db/service_context.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/periodic_runner.h" -class ServiceContext; +namespace mongo { /** - * Defines and starts a periodic background job to check for and abort expired transactions. + * Defines a periodic background job to check for and abort expired transactions. * The job will run every (transactionLifetimeLimitSeconds/2) seconds, or at most once per second * and at least once per minute. - * - * This function should only ever be called once, during mongod server startup (db.cpp). - * The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary. */ -void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext); +class PeriodicThreadToAbortExpiredTransactions { +public: + static PeriodicThreadToAbortExpiredTransactions& get(ServiceContext* serviceContext); + + PeriodicJobAnchor& operator*() const noexcept; + PeriodicJobAnchor* operator->() const noexcept; + +private: + void _init(ServiceContext* serviceContext); + + inline static const auto _serviceDecoration = + ServiceContext::declareDecoration<PeriodicThreadToAbortExpiredTransactions>(); + + mutable stdx::mutex _mutex; + std::shared_ptr<PeriodicJobAnchor> _anchor; +}; } // namespace mongo diff --git a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp index 8e4f6594bab..c27418d948f 100644 --- a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp +++ b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp @@ -43,11 +43,30 @@ namespace mongo { -void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* serviceContext) { - // Enforce calling this function once, and only once. - static bool firstCall = true; - invariant(firstCall); - firstCall = false; +auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(ServiceContext* serviceContext) + -> PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded& { + auto& jobContainer = _serviceDecoration(serviceContext); + jobContainer._init(serviceContext); + return jobContainer; +} + +auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::operator-> () const noexcept + -> PeriodicJobAnchor* { + stdx::lock_guard lk(_mutex); + return _anchor.get(); +} + +auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::operator*() const noexcept + -> PeriodicJobAnchor& { + stdx::lock_guard lk(_mutex); + return *_anchor; +} + +void PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::_init(ServiceContext* serviceContext) { + stdx::lock_guard lk(_mutex); + if (_anchor) { + return; + } auto periodicRunner = serviceContext->getPeriodicRunner(); invariant(periodicRunner); @@ -71,19 +90,19 @@ void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* ser }, Seconds(snapshotWindowParams.decreaseHistoryIfNotNeededPeriodSeconds.load())); - auto handle = periodicRunner->makeJob(std::move(job)); - handle->start(); + _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job))); - SnapshotWindowParams::observeDecreaseHistoryIfNotNeededPeriodSeconds - .addObserver([handle = std::move(handle)](const auto& secs) { - try { - handle->setPeriod(Seconds(secs)); - } catch (const DBException& ex) { - log() << "Failed to update the period of the thread which decreases data history " - "target window size if there have been no new SnapshotTooOld errors." - << ex.toStatus(); - } - }); + SnapshotWindowParams::observeDecreaseHistoryIfNotNeededPeriodSeconds.addObserver([anchor = + _anchor]( + const auto& secs) { + try { + anchor->setPeriod(Seconds(secs)); + } catch (const DBException& ex) { + log() << "Failed to update the period of the thread which decreases data history " + "target window size if there have been no new SnapshotTooOld errors." + << ex.toStatus(); + } + }); } } // namespace mongo diff --git a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h index 1b6c2e65b18..f6e23949a24 100644 --- a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h +++ b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h @@ -29,9 +29,13 @@ #pragma once -namespace mongo { +#include <memory> + +#include "mongo/db/service_context.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/periodic_runner.h" -class ServiceContext; +namespace mongo { /** * Periodically checks whether there has been any storage engine cache pressure and SnapshotTooOld @@ -43,6 +47,21 @@ class ServiceContext; * This function should only ever be called once, during mongod server startup (db.cpp). * The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary. */ -void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* serviceContext); +class PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded { +public: + static PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded& get(ServiceContext* serviceContext); + + PeriodicJobAnchor* operator->() const noexcept; + PeriodicJobAnchor& operator*() const noexcept; + +private: + void _init(ServiceContext* serviceContext); + + inline static const auto _serviceDecoration = + ServiceContext::declareDecoration<PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded>(); + + mutable stdx::mutex _mutex; + std::shared_ptr<PeriodicJobAnchor> _anchor; +}; } // namespace mongo diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.cpp b/src/mongo/db/s/periodic_balancer_config_refresher.cpp index 87b8ad55a4d..f9cab569d89 100644 --- a/src/mongo/db/s/periodic_balancer_config_refresher.cpp +++ b/src/mongo/db/s/periodic_balancer_config_refresher.cpp @@ -46,8 +46,7 @@ namespace { const auto getPeriodicBalancerConfigRefresher = ServiceContext::declareDecoration<PeriodicBalancerConfigRefresher>(); -std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher( - ServiceContext* serviceContext) { +PeriodicJobAnchor launchBalancerConfigRefresher(ServiceContext* serviceContext) { auto periodicRunner = serviceContext->getPeriodicRunner(); invariant(periodicRunner); @@ -66,7 +65,7 @@ std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher }, Seconds(30)); auto balancerConfigRefresher = periodicRunner->makeJob(std::move(job)); - balancerConfigRefresher->start(); + balancerConfigRefresher.start(); return balancerConfigRefresher; } @@ -86,7 +85,7 @@ void PeriodicBalancerConfigRefresher::onShardingInitialization(ServiceContext* s _isPrimary = isPrimary; // This function is called on sharding state initialization, so go ahead // and start up the balancer config refresher task if we're a primary. - if (isPrimary && !_balancerConfigRefresher) { + if (isPrimary && !_balancerConfigRefresher.isValid()) { _balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext); } } @@ -95,12 +94,12 @@ void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) { _isPrimary = true; // If this is the first time we're stepping up, start a thread to periodically refresh the // balancer configuration. - if (!_balancerConfigRefresher) { + if (!_balancerConfigRefresher.isValid()) { _balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext); } else { // If we're stepping up again after having stepped down, just resume // the existing task. - _balancerConfigRefresher->resume(); + _balancerConfigRefresher.resume(); } } } @@ -108,9 +107,9 @@ void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) { void PeriodicBalancerConfigRefresher::onStepDown() { if (_isPrimary) { _isPrimary = false; - invariant(_balancerConfigRefresher); + invariant(_balancerConfigRefresher.isValid()); // We don't need to be refreshing the balancer configuration unless we're primary. - _balancerConfigRefresher->pause(); + _balancerConfigRefresher.pause(); } } diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.h b/src/mongo/db/s/periodic_balancer_config_refresher.h index 96a36202025..a331f9cfab5 100644 --- a/src/mongo/db/s/periodic_balancer_config_refresher.h +++ b/src/mongo/db/s/periodic_balancer_config_refresher.h @@ -80,6 +80,6 @@ private: bool _isPrimary{false}; // Periodic job for refreshing the balancer configuration - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _balancerConfigRefresher; + PeriodicJobAnchor _balancerConfigRefresher; }; } // namespace mongo diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index d122a32e5e3..0aa04389245 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -533,11 +533,6 @@ private: stdx::mutex _mutex; /** - * The storage engine, if any. - */ - std::unique_ptr<StorageEngine> _storageEngine; - - /** * The periodic runner. */ std::unique_ptr<PeriodicRunner> _runner; @@ -558,6 +553,11 @@ private: std::unique_ptr<transport::ServiceExecutor> _serviceExecutor; /** + * The storage engine, if any. + */ + std::unique_ptr<StorageEngine> _storageEngine; + + /** * Vector of registered observers. */ std::vector<ClientObserverHolder> _clientObservers; diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp index 873e558082e..992e8a86967 100644 --- a/src/mongo/db/service_context_d_test_fixture.cpp +++ b/src/mongo/db/service_context_d_test_fixture.cpp @@ -73,7 +73,6 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine, RepairAct // Set up the periodic runner to allow background job execution for tests that require it. auto runner = makePeriodicRunner(getServiceContext()); - runner->startup(); getServiceContext()->setPeriodicRunner(std::move(runner)); storageGlobalParams.dbpath = _tempDir.path(); diff --git a/src/mongo/db/service_liaison_mock.cpp b/src/mongo/db/service_liaison_mock.cpp index fd95a7ebc6d..ab4397f1980 100644 --- a/src/mongo/db/service_liaison_mock.cpp +++ b/src/mongo/db/service_liaison_mock.cpp @@ -40,7 +40,6 @@ namespace mongo { MockServiceLiaisonImpl::MockServiceLiaisonImpl() { _timerFactory = std::make_unique<executor::AsyncTimerFactoryMock>(); _runner = makePeriodicRunner(getGlobalServiceContext()); - _runner->startup(); } LogicalSessionIdSet MockServiceLiaisonImpl::getActiveOpSessions() const { @@ -53,9 +52,7 @@ LogicalSessionIdSet MockServiceLiaisonImpl::getOpenCursorSessions(OperationConte return _cursorSessions; } -void MockServiceLiaisonImpl::join() { - _runner->shutdown(); -} +void MockServiceLiaisonImpl::join() {} Date_t MockServiceLiaisonImpl::now() const { return _timerFactory->now(); diff --git a/src/mongo/db/service_liaison_mongod.cpp b/src/mongo/db/service_liaison_mongod.cpp index 1008267bf19..94e1fbd9217 100644 --- a/src/mongo/db/service_liaison_mongod.cpp +++ b/src/mongo/db/service_liaison_mongod.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + #include "mongo/platform/basic.h" #include "mongo/db/service_liaison_mongod.h" @@ -37,6 +39,7 @@ #include "mongo/db/service_context.h" #include "mongo/stdx/mutex.h" #include "mongo/util/clock_source.h" +#include "mongo/util/log.h" namespace mongo { @@ -73,17 +76,20 @@ LogicalSessionIdSet ServiceLiaisonMongod::getOpenCursorSessions(OperationContext void ServiceLiaisonMongod::scheduleJob(PeriodicRunner::PeriodicJob job) { invariant(hasGlobalServiceContext()); - auto jobHandle = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); - jobHandle->start(); - _jobs.push_back(std::move(jobHandle)); + auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); + jobAnchor.start(); + + { + stdx::lock_guard lk(_mutex); + _jobs.push_back(std::move(jobAnchor)); + } } void ServiceLiaisonMongod::join() { - invariant(hasGlobalServiceContext()); - for (auto&& jobHandle : _jobs) { - jobHandle->stop(); - } - _jobs.clear(); + auto jobs = [&] { + stdx::lock_guard lk(_mutex); + return std::exchange(_jobs, {}); + }(); } Date_t ServiceLiaisonMongod::now() const { diff --git a/src/mongo/db/service_liaison_mongod.h b/src/mongo/db/service_liaison_mongod.h index c334eb5f36c..b1060425f6f 100644 --- a/src/mongo/db/service_liaison_mongod.h +++ b/src/mongo/db/service_liaison_mongod.h @@ -68,7 +68,9 @@ protected: * Returns the service context. */ ServiceContext* _context() override; - std::vector<std::unique_ptr<PeriodicRunner::PeriodicJobHandle>> _jobs; + + stdx::mutex _mutex; + std::vector<PeriodicJobAnchor> _jobs; }; } // namespace mongo diff --git a/src/mongo/db/service_liaison_mongos.cpp b/src/mongo/db/service_liaison_mongos.cpp index 7da5e508109..666ca06ea68 100644 --- a/src/mongo/db/service_liaison_mongos.cpp +++ b/src/mongo/db/service_liaison_mongos.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + #include "mongo/platform/basic.h" #include "mongo/db/service_liaison_mongos.h" @@ -36,7 +38,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/stdx/mutex.h" #include "mongo/util/clock_source.h" -#include "mongo/util/periodic_runner.h" +#include "mongo/util/log.h" namespace mongo { @@ -79,12 +81,20 @@ LogicalSessionIdSet ServiceLiaisonMongos::getOpenCursorSessions(OperationContext void ServiceLiaisonMongos::scheduleJob(PeriodicRunner::PeriodicJob job) { invariant(hasGlobalServiceContext()); - getGlobalServiceContext()->getPeriodicRunner()->scheduleJob(std::move(job)); + auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); + jobAnchor.start(); + + { + stdx::lock_guard lk(_mutex); + _jobs.push_back(std::move(jobAnchor)); + } } void ServiceLiaisonMongos::join() { - invariant(hasGlobalServiceContext()); - getGlobalServiceContext()->getPeriodicRunner()->shutdown(); + auto jobs = [&] { + stdx::lock_guard lk(_mutex); + return std::exchange(_jobs, {}); + }(); } Date_t ServiceLiaisonMongos::now() const { diff --git a/src/mongo/db/service_liaison_mongos.h b/src/mongo/db/service_liaison_mongos.h index 580dc3cf00e..ab40801557d 100644 --- a/src/mongo/db/service_liaison_mongos.h +++ b/src/mongo/db/service_liaison_mongos.h @@ -68,6 +68,9 @@ protected: * Returns the service context. */ ServiceContext* _context() override; + + stdx::mutex _mutex; + std::vector<PeriodicJobAnchor> _jobs; }; } // namespace mongo diff --git a/src/mongo/db/storage/flow_control.cpp b/src/mongo/db/storage/flow_control.cpp index 3f1d2e0fec3..df076e2d82c 100644 --- a/src/mongo/db/storage/flow_control.cpp +++ b/src/mongo/db/storage/flow_control.cpp @@ -134,12 +134,13 @@ FlowControl::FlowControl(ServiceContext* service, repl::ReplicationCoordinator* // cause a slow start on start up. FlowControlTicketholder::set(service, std::make_unique<FlowControlTicketholder>(_kMaxTickets)); - service->getPeriodicRunner()->scheduleJob( + _jobAnchor = service->getPeriodicRunner()->makeJob( {"FlowControlRefresher", [this](Client* client) { FlowControlTicketholder::get(client->getServiceContext())->refreshTo(getNumTickets()); }, Seconds(1)}); + _jobAnchor.start(); } FlowControl* FlowControl::get(ServiceContext* service) { diff --git a/src/mongo/db/storage/flow_control.h b/src/mongo/db/storage/flow_control.h index e3905a3728b..64f0d0b1d00 100644 --- a/src/mongo/db/storage/flow_control.h +++ b/src/mongo/db/storage/flow_control.h @@ -141,6 +141,8 @@ private: // This value is used for calculating server status metrics. std::uint64_t _startWaitTime = 0; + + PeriodicJobAnchor _jobAnchor; }; } // namespace mongo diff --git a/src/mongo/db/storage/kv/storage_engine_impl.cpp b/src/mongo/db/storage/kv/storage_engine_impl.cpp index 3aa39b1c123..f2734ced75f 100644 --- a/src/mongo/db/storage/kv/storage_engine_impl.cpp +++ b/src/mongo/db/storage/kv/storage_engine_impl.cpp @@ -877,7 +877,7 @@ void StorageEngineImpl::TimestampMonitor::startup() { // Take a global lock in MODE_IS while fetching timestamps to guarantee that // rollback-to-stable isn't running concurrently. - { + try { auto opCtx = client->getOperationContext(); mongo::ServiceContext::UniqueOperationContext uOpCtx; if (!opCtx) { @@ -894,6 +894,9 @@ void StorageEngineImpl::TimestampMonitor::startup() { checkpoint = _engine->getCheckpointTimestamp(); oldest = _engine->getOldestTimestamp(); stable = _engine->getStableTimestamp(); + } catch (const ExceptionFor<ErrorCodes::InterruptedAtShutdown>&) { + // If we're interrupted at shutdown, it's fine to give up on future notifications + return; } Timestamp minOfCheckpointAndOldest = @@ -922,7 +925,8 @@ void StorageEngineImpl::TimestampMonitor::startup() { }, Seconds(1)); - _periodicRunner->scheduleJob(std::move(job)); + _job = _periodicRunner->makeJob(std::move(job)); + _job.start(); _running = true; } diff --git a/src/mongo/db/storage/kv/storage_engine_impl.h b/src/mongo/db/storage/kv/storage_engine_impl.h index 4d060cdcd4b..6c6dd3fdf2c 100644 --- a/src/mongo/db/storage/kv/storage_engine_impl.h +++ b/src/mongo/db/storage/kv/storage_engine_impl.h @@ -280,6 +280,7 @@ public: KVEngine* _engine; bool _running; + PeriodicJobAnchor _job; // The set of timestamps that were last reported to the listeners by the monitor. MonitoredTimestamps _currentTimestamps; diff --git a/src/mongo/db/storage/kv/storage_engine_test.cpp b/src/mongo/db/storage/kv/storage_engine_test.cpp index d424f64f3c1..40eddbfcef3 100644 --- a/src/mongo/db/storage/kv/storage_engine_test.cpp +++ b/src/mongo/db/storage/kv/storage_engine_test.cpp @@ -391,10 +391,6 @@ public: } ~TimestampKVEngineTest() { - // Shut down the background periodic task runner, before the storage engine. - auto runner = getServiceContext()->getPeriodicRunner(); - runner->shutdown(); - _storageEngine->cleanShutdown(); _storageEngine.reset(); } diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.cpp b/src/mongo/db/storage/mobile/mobile_kv_engine.cpp index 4c8b2401683..c6ff81b12f7 100644 --- a/src/mongo/db/storage/mobile/mobile_kv_engine.cpp +++ b/src/mongo/db/storage/mobile/mobile_kv_engine.cpp @@ -100,7 +100,7 @@ MobileKVEngine::MobileKVEngine(const std::string& path, maybeVacuum(client, Date_t::max()); }, Minutes(options.vacuumCheckIntervalMinutes))); - _vacuumJob->start(); + _vacuumJob.start(); } } diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.h b/src/mongo/db/storage/mobile/mobile_kv_engine.h index 10da2d2e1a0..094bf0674d9 100644 --- a/src/mongo/db/storage/mobile/mobile_kv_engine.h +++ b/src/mongo/db/storage/mobile/mobile_kv_engine.h @@ -154,7 +154,7 @@ private: std::string _path; embedded::MobileOptions _options; - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _vacuumJob; + PeriodicJobAnchor _vacuumJob; }; } // namespace mongo |