summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-07-24 14:30:04 -0400
committerBen Caimano <ben.caimano@10gen.com>2019-07-29 16:37:15 -0400
commitcc04956b4da01e57abd70b4adc3f4f673451ee56 (patch)
tree6085995591c05ab42a4d739c5da09bc1f863e4e9
parent295e292133b40837a57abe72d78cb2127ecef18a (diff)
downloadmongo-cc04956b4da01e57abd70b4adc3f4f673451ee56.tar.gz
SERVER-35114 Make it possible to adjust the period of active jobs in the PeriodicRunner
(cherry picked from commit bf4a3cff4dc5572f2e97cb5279fe63c8227187e0) SERVER-39936 Use PeriodicRunner handles to simplify shutdown ordering (cherry picked from commit 1eff33bd1a8d48eb607675f87faf1836ba325006) This is a partial backport. Parts of the original commits listed do not exist in v4.0
-rw-r--r--src/mongo/db/db.cpp21
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp1
-rw-r--r--src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp35
-rw-r--r--src/mongo/db/periodic_runner_job_abort_expired_transactions.h27
-rw-r--r--src/mongo/db/service_context.h10
-rw-r--r--src/mongo/db/service_context_d_test_fixture.cpp5
-rw-r--r--src/mongo/db/service_liaison_mock.cpp5
-rw-r--r--src/mongo/db/service_liaison_mongod.cpp22
-rw-r--r--src/mongo/db/service_liaison_mongod.h4
-rw-r--r--src/mongo/db/service_liaison_mongos.cpp18
-rw-r--r--src/mongo/db/service_liaison_mongos.h3
-rw-r--r--src/mongo/dbtests/framework.cpp6
-rw-r--r--src/mongo/embedded/embedded.cpp15
-rw-r--r--src/mongo/embedded/periodic_runner_embedded.cpp106
-rw-r--r--src/mongo/embedded/periodic_runner_embedded.h40
-rw-r--r--src/mongo/s/server.cpp13
-rw-r--r--src/mongo/util/SConscript1
-rw-r--r--src/mongo/util/periodic_runner.cpp50
-rw-r--r--src/mongo/util/periodic_runner.h101
-rw-r--r--src/mongo/util/periodic_runner_impl.cpp167
-rw-r--r--src/mongo/util/periodic_runner_impl.h45
-rw-r--r--src/mongo/util/periodic_runner_impl_test.cpp145
22 files changed, 458 insertions, 382 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 2b6f05de989..22b76688f2b 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -336,6 +336,11 @@ ExitCode _initAndListen(int listenPort) {
}
serviceContext->setTransportLayer(std::move(tl));
}
+
+ // Set up the periodic runner for background job execution. This is required to be running
+ // before the storage engine is initialized.
+ auto runner = makePeriodicRunner(serviceContext);
+ serviceContext->setPeriodicRunner(std::move(runner));
initializeStorageEngine(serviceContext, StorageEngineInitFlags::kNone);
#ifdef MONGO_CONFIG_WIREDTIGER_ENABLED
@@ -614,11 +619,6 @@ ExitCode _initAndListen(int listenPort) {
PeriodicTask::startRunningPeriodicTasks();
- // Set up the periodic runner for background job execution
- auto runner = makePeriodicRunner(serviceContext);
- runner->startup();
- serviceContext->setPeriodicRunner(std::move(runner));
-
SessionKiller::set(serviceContext,
std::make_shared<SessionKiller>(serviceContext, killSessionsLocal));
@@ -626,7 +626,7 @@ ExitCode _initAndListen(int listenPort) {
// Only do this on storage engines supporting snapshot reads, which hold resources we wish to
// release periodically in order to avoid storage cache pressure build up.
if (storageEngine->supportsReadConcernSnapshot()) {
- startPeriodicThreadToAbortExpiredTransactions(serviceContext);
+ PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->start();
}
// Set up the logical session cache
@@ -923,12 +923,11 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
// Shut down the global dbclient pool so callers stop waiting for connections.
globalConnPool.shutdown();
- // Shut down the background periodic task runner
- if (auto runner = serviceContext->getPeriodicRunner()) {
- runner->shutdown();
- }
+ if (auto storageEngine = serviceContext->getStorageEngine()) {
+ if (storageEngine->supportsReadConcernSnapshot()) {
+ PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->stop();
+ }
- if (serviceContext->getStorageEngine()) {
ServiceContext::UniqueOperationContext uniqueOpCtx;
OperationContext* opCtx = client->getOperationContext();
if (!opCtx) {
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index ce5a013f20f..9d9b1498776 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -43,7 +43,6 @@
#include "mongo/platform/atomic_word.h"
#include "mongo/util/duration.h"
#include "mongo/util/log.h"
-#include "mongo/util/periodic_runner.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
index 16845bb2fd4..e14f93d4d24 100644
--- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
+++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
@@ -43,11 +43,34 @@
namespace mongo {
-void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext) {
- // Enforce calling this function once, and only once.
- static bool firstCall = true;
- invariant(firstCall);
- firstCall = false;
+namespace {
+const auto gServiceDecoration =
+ ServiceContext::declareDecoration<PeriodicThreadToAbortExpiredTransactions>();
+} // anonymous namespace
+
+auto PeriodicThreadToAbortExpiredTransactions::get(ServiceContext* serviceContext)
+ -> PeriodicThreadToAbortExpiredTransactions& {
+ auto& jobContainer = gServiceDecoration(serviceContext);
+ jobContainer._init(serviceContext);
+
+ return jobContainer;
+}
+
+auto PeriodicThreadToAbortExpiredTransactions::operator*() const noexcept -> PeriodicJobAnchor& {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return *_anchor;
+}
+
+auto PeriodicThreadToAbortExpiredTransactions::operator-> () const noexcept -> PeriodicJobAnchor* {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _anchor.get();
+}
+
+void PeriodicThreadToAbortExpiredTransactions::_init(ServiceContext* serviceContext) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_anchor) {
+ return;
+ }
auto periodicRunner = serviceContext->getPeriodicRunner();
invariant(periodicRunner);
@@ -94,7 +117,7 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex
},
Seconds(1));
- periodicRunner->scheduleJob(std::move(job));
+ _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
}
} // namespace mongo
diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
index 44e9640c9a9..777848dc6bc 100644
--- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
+++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
@@ -30,18 +30,31 @@
#pragma once
-namespace mongo {
+#include <memory>
+
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/periodic_runner.h"
-class ServiceContext;
+namespace mongo {
/**
- * Defines and starts a periodic background job to check for and abort expired transactions.
+ * Defines a periodic background job to check for and abort expired transactions.
* The job will run every (transactionLifetimeLimitSeconds/2) seconds, or at most once per second
* and at least once per minute.
- *
- * This function should only ever be called once, during mongod server startup (db.cpp).
- * The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary.
*/
-void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext);
+class PeriodicThreadToAbortExpiredTransactions {
+public:
+ static PeriodicThreadToAbortExpiredTransactions& get(ServiceContext* serviceContext);
+
+ PeriodicJobAnchor& operator*() const noexcept;
+ PeriodicJobAnchor* operator->() const noexcept;
+
+private:
+ void _init(ServiceContext* serviceContext);
+
+ mutable stdx::mutex _mutex;
+ std::shared_ptr<PeriodicJobAnchor> _anchor;
+};
} // namespace mongo
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index 55b0545171e..013cc87a2fa 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -533,11 +533,6 @@ private:
stdx::mutex _mutex;
/**
- * The storage engine, if any.
- */
- std::unique_ptr<StorageEngine> _storageEngine;
-
- /**
* The periodic runner.
*/
std::unique_ptr<PeriodicRunner> _runner;
@@ -558,6 +553,11 @@ private:
std::unique_ptr<transport::ServiceExecutor> _serviceExecutor;
/**
+ * The storage engine, if any.
+ */
+ std::unique_ptr<StorageEngine> _storageEngine;
+
+ /**
* Vector of registered observers.
*/
std::vector<ClientObserverHolder> _clientObservers;
diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp
index 91692972787..5df95627c9c 100644
--- a/src/mongo/db/service_context_d_test_fixture.cpp
+++ b/src/mongo/db/service_context_d_test_fixture.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/storage/storage_engine_init.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/periodic_runner_factory.h"
#include "mongo/db/catalog/database_holder.h"
@@ -69,6 +70,10 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine, RepairAct
auto logicalClock = std::make_unique<LogicalClock>(serviceContext);
LogicalClock::set(serviceContext, std::move(logicalClock));
+ // Set up the periodic runner to allow background job execution for tests that require it.
+ auto runner = makePeriodicRunner(getServiceContext());
+ getServiceContext()->setPeriodicRunner(std::move(runner));
+
storageGlobalParams.dbpath = _tempDir.path();
initializeStorageEngine(serviceContext, StorageEngineInitFlags::kNone);
diff --git a/src/mongo/db/service_liaison_mock.cpp b/src/mongo/db/service_liaison_mock.cpp
index 8193943090e..ba56d789341 100644
--- a/src/mongo/db/service_liaison_mock.cpp
+++ b/src/mongo/db/service_liaison_mock.cpp
@@ -41,7 +41,6 @@ namespace mongo {
MockServiceLiaisonImpl::MockServiceLiaisonImpl() {
_timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>();
_runner = makePeriodicRunner(getGlobalServiceContext());
- _runner->startup();
}
LogicalSessionIdSet MockServiceLiaisonImpl::getActiveOpSessions() const {
@@ -54,9 +53,7 @@ LogicalSessionIdSet MockServiceLiaisonImpl::getOpenCursorSessions() const {
return _cursorSessions;
}
-void MockServiceLiaisonImpl::join() {
- _runner->shutdown();
-}
+void MockServiceLiaisonImpl::join() {}
Date_t MockServiceLiaisonImpl::now() const {
return _timerFactory->now();
diff --git a/src/mongo/db/service_liaison_mongod.cpp b/src/mongo/db/service_liaison_mongod.cpp
index af214b60227..efbf1c222bc 100644
--- a/src/mongo/db/service_liaison_mongod.cpp
+++ b/src/mongo/db/service_liaison_mongod.cpp
@@ -28,6 +28,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
#include "mongo/platform/basic.h"
#include "mongo/db/service_liaison_mongod.h"
@@ -38,6 +40,7 @@
#include "mongo/db/service_context.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -87,17 +90,20 @@ LogicalSessionIdSet ServiceLiaisonMongod::getOpenCursorSessions() const {
void ServiceLiaisonMongod::scheduleJob(PeriodicRunner::PeriodicJob job) {
invariant(hasGlobalServiceContext());
- auto jobHandle = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
- jobHandle->start();
- _jobs.push_back(std::move(jobHandle));
+ auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
+ jobAnchor.start();
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _jobs.push_back(std::move(jobAnchor));
+ }
}
void ServiceLiaisonMongod::join() {
- invariant(hasGlobalServiceContext());
- for (auto&& jobHandle : _jobs) {
- jobHandle->stop();
- }
- _jobs.clear();
+ auto jobs = [&] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return std::exchange(_jobs, {});
+ }();
}
Date_t ServiceLiaisonMongod::now() const {
diff --git a/src/mongo/db/service_liaison_mongod.h b/src/mongo/db/service_liaison_mongod.h
index 5cd081f8823..541b67447ab 100644
--- a/src/mongo/db/service_liaison_mongod.h
+++ b/src/mongo/db/service_liaison_mongod.h
@@ -69,7 +69,9 @@ protected:
* Returns the service context.
*/
ServiceContext* _context() override;
- std::vector<std::unique_ptr<PeriodicRunner::PeriodicJobHandle>> _jobs;
+
+ stdx::mutex _mutex;
+ std::vector<PeriodicJobAnchor> _jobs;
};
} // namespace mongo
diff --git a/src/mongo/db/service_liaison_mongos.cpp b/src/mongo/db/service_liaison_mongos.cpp
index c52ec45fb6b..7f7c4a9c423 100644
--- a/src/mongo/db/service_liaison_mongos.cpp
+++ b/src/mongo/db/service_liaison_mongos.cpp
@@ -28,6 +28,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
#include "mongo/platform/basic.h"
#include "mongo/db/service_liaison_mongos.h"
@@ -37,7 +39,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/clock_source.h"
-#include "mongo/util/periodic_runner.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -80,12 +82,20 @@ LogicalSessionIdSet ServiceLiaisonMongos::getOpenCursorSessions() const {
void ServiceLiaisonMongos::scheduleJob(PeriodicRunner::PeriodicJob job) {
invariant(hasGlobalServiceContext());
- getGlobalServiceContext()->getPeriodicRunner()->scheduleJob(std::move(job));
+ auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
+ jobAnchor.start();
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _jobs.push_back(std::move(jobAnchor));
+ }
}
void ServiceLiaisonMongos::join() {
- invariant(hasGlobalServiceContext());
- getGlobalServiceContext()->getPeriodicRunner()->shutdown();
+ auto jobs = [&] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return std::exchange(_jobs, {});
+ }();
}
Date_t ServiceLiaisonMongos::now() const {
diff --git a/src/mongo/db/service_liaison_mongos.h b/src/mongo/db/service_liaison_mongos.h
index ecf08d5d822..57d06727bf0 100644
--- a/src/mongo/db/service_liaison_mongos.h
+++ b/src/mongo/db/service_liaison_mongos.h
@@ -69,6 +69,9 @@ protected:
* Returns the service context.
*/
ServiceContext* _context() override;
+
+ stdx::mutex _mutex;
+ std::vector<PeriodicJobAnchor> _jobs;
};
} // namespace mongo
diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp
index b53b78b380c..95624c6b6aa 100644
--- a/src/mongo/dbtests/framework.cpp
+++ b/src/mongo/dbtests/framework.cpp
@@ -54,6 +54,7 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
+#include "mongo/util/periodic_runner_factory.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/version.h"
@@ -91,6 +92,11 @@ int runDbTests(int argc, char** argv) {
srand((unsigned)frameworkGlobalParams.seed);
+ // Set up the periodic runner for background job execution, which is required by the storage
+ // engine to be running beforehand.
+ auto runner = makePeriodicRunner(globalServiceContext);
+ globalServiceContext->setPeriodicRunner(std::move(runner));
+
initializeStorageEngine(globalServiceContext, StorageEngineInitFlags::kNone);
auto registry = stdx::make_unique<OpObserverRegistry>();
registry->addObserver(stdx::make_unique<UUIDCatalogObserver>());
diff --git a/src/mongo/embedded/embedded.cpp b/src/mongo/embedded/embedded.cpp
index 8736e53ac79..7f7ee332f0f 100644
--- a/src/mongo/embedded/embedded.cpp
+++ b/src/mongo/embedded/embedded.cpp
@@ -156,11 +156,6 @@ void shutdown(ServiceContext* srvContext) {
LogicalSessionCache::set(serviceContext, nullptr);
- // Shut down the background periodic task runner
- if (auto runner = serviceContext->getPeriodicRunner()) {
- runner->shutdown();
- }
-
// Global storage engine may not be started in all cases before we exit
if (serviceContext->getStorageEngine()) {
shutdownGlobalStorageEngineCleanly(serviceContext);
@@ -219,6 +214,11 @@ ServiceContext* initialize(const char* yaml_config) {
DEV log(LogComponent::kControl) << "DEBUG build (which is slower)" << endl;
+ // The periodic runner is required by the storage engine to be running beforehand.
+ auto periodicRunner = std::make_unique<PeriodicRunnerEmbedded>(
+ serviceContext, serviceContext->getPreciseClockSource());
+ serviceContext->setPeriodicRunner(std::move(periodicRunner));
+
initializeStorageEngine(serviceContext, StorageEngineInitFlags::kAllowNoLockFile);
// Warn if we detect configurations for multiple registered storage engines in the same
@@ -312,11 +312,6 @@ ServiceContext* initialize(const char* yaml_config) {
restartInProgressIndexesFromLastShutdown(startupOpCtx.get());
}
- auto periodicRunner = std::make_unique<PeriodicRunnerEmbedded>(
- serviceContext, serviceContext->getPreciseClockSource());
- periodicRunner->startup();
- serviceContext->setPeriodicRunner(std::move(periodicRunner));
-
// Set up the logical session cache
auto sessionCache = makeLogicalSessionCacheEmbedded();
LogicalSessionCache::set(serviceContext, std::move(sessionCache));
diff --git a/src/mongo/embedded/periodic_runner_embedded.cpp b/src/mongo/embedded/periodic_runner_embedded.cpp
index b20ea9cec2f..d21bdbfcd97 100644
--- a/src/mongo/embedded/periodic_runner_embedded.cpp
+++ b/src/mongo/embedded/periodic_runner_embedded.cpp
@@ -38,21 +38,6 @@
#include "mongo/util/scopeguard.h"
namespace mongo {
-namespace {
-
-template <typename T>
-std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) {
- if (auto p = ptr.lock()) {
- return p;
- } else {
- uasserted(ErrorCodes::InternalError, errMsg);
- }
-}
-
-constexpr auto kPeriodicJobHandleLifetimeErrMsg =
- "The PeriodicRunner job for this handle no longer exists"_sd;
-
-} // namespace
struct PeriodicRunnerEmbedded::PeriodicJobSorter {
bool operator()(std::shared_ptr<PeriodicJobImpl> const& lhs,
@@ -65,59 +50,13 @@ struct PeriodicRunnerEmbedded::PeriodicJobSorter {
PeriodicRunnerEmbedded::PeriodicRunnerEmbedded(ServiceContext* svc, ClockSource* clockSource)
: _svc(svc), _clockSource(clockSource) {}
-PeriodicRunnerEmbedded::~PeriodicRunnerEmbedded() {
- PeriodicRunnerEmbedded::shutdown();
-}
-
-std::shared_ptr<PeriodicRunnerEmbedded::PeriodicJobImpl> PeriodicRunnerEmbedded::createAndAddJob(
- PeriodicJob job, bool shouldStart) {
+auto PeriodicRunnerEmbedded::makeJob(PeriodicJob job) -> JobAnchor {
auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this);
stdx::lock_guard<stdx::mutex> lk(_mutex);
_jobs.push_back(impl);
std::push_heap(_jobs.begin(), _jobs.end(), PeriodicJobSorter());
- if (shouldStart && _running)
- impl->start();
- return impl;
-}
-
-std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerEmbedded::makeJob(
- PeriodicJob job) {
- return std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(std::move(job), false));
-}
-
-void PeriodicRunnerEmbedded::scheduleJob(PeriodicJob job) {
- createAndAddJob(std::move(job), true);
-}
-
-void PeriodicRunnerEmbedded::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_running) {
- return;
- }
-
- _running = true;
-
- // schedule any jobs that we have
- for (auto& job : _jobs) {
- job->start();
- }
-}
-
-void PeriodicRunnerEmbedded::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_running) {
- _running = false;
-
- for (auto& job : _jobs) {
- stdx::lock_guard<stdx::mutex> jobLock(job->_mutex);
- if (job->isAlive(jobLock)) {
- job->_stopWithMasterAndJobLock(lk, jobLock);
- }
- }
- _jobs.clear();
- }
+ return JobAnchor(impl);
}
bool PeriodicRunnerEmbedded::tryPump() {
@@ -226,37 +165,36 @@ void PeriodicRunnerEmbedded::PeriodicJobImpl::stop() {
// sure the user can invalidate it after this call.
stdx::lock_guard<stdx::mutex> masterLock(_periodicRunner->_mutex);
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _stopWithMasterAndJobLock(masterLock, lk);
+ if (isAlive(lk)) {
+ _stopWithMasterAndJobLock(masterLock, lk);
+ }
}
-void PeriodicRunnerEmbedded::PeriodicJobImpl::_stopWithMasterAndJobLock(WithLock masterLock,
- WithLock jobLock) {
- invariant(isAlive(jobLock));
- _execStatus = PeriodicJobImpl::ExecutionStatus::kCanceled;
+Milliseconds PeriodicRunnerEmbedded::PeriodicJobImpl::getPeriod() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _job.interval;
}
-bool PeriodicRunnerEmbedded::PeriodicJobImpl::isAlive(WithLock lk) {
- return _execStatus == ExecutionStatus::kRunning || _execStatus == ExecutionStatus::kPaused;
-}
+void PeriodicRunnerEmbedded::PeriodicJobImpl::setPeriod(Milliseconds ms) {
+ stdx::lock_guard<stdx::mutex> masterLk(_periodicRunner->_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::start() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->start();
-}
+ _job.interval = ms;
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::stop() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->stop();
+ if (_execStatus == PeriodicJobImpl::ExecutionStatus::kRunning) {
+ std::make_heap(
+ _periodicRunner->_jobs.begin(), _periodicRunner->_jobs.end(), PeriodicJobSorter());
+ }
}
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::pause() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->pause();
+void PeriodicRunnerEmbedded::PeriodicJobImpl::_stopWithMasterAndJobLock(WithLock masterLock,
+ WithLock jobLock) {
+ invariant(isAlive(jobLock));
+ _execStatus = PeriodicJobImpl::ExecutionStatus::kCanceled;
}
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::resume() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->resume();
+bool PeriodicRunnerEmbedded::PeriodicJobImpl::isAlive(WithLock lk) {
+ return _execStatus == ExecutionStatus::kRunning || _execStatus == ExecutionStatus::kPaused;
}
} // namespace mongo
diff --git a/src/mongo/embedded/periodic_runner_embedded.h b/src/mongo/embedded/periodic_runner_embedded.h
index 20dc7a410ba..98632bbce19 100644
--- a/src/mongo/embedded/periodic_runner_embedded.h
+++ b/src/mongo/embedded/periodic_runner_embedded.h
@@ -48,31 +48,28 @@ namespace mongo {
class PeriodicRunnerEmbedded : public PeriodicRunner {
public:
PeriodicRunnerEmbedded(ServiceContext* svc, ClockSource* clockSource);
- ~PeriodicRunnerEmbedded();
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override;
- void scheduleJob(PeriodicJob job) override;
-
- void startup() override;
-
- void shutdown() override;
+ JobAnchor makeJob(PeriodicJob job) override;
// Safe to call from multiple threads but will only execute on one thread at a time.
// Returns true if it attempted to run any jobs.
bool tryPump();
private:
- class PeriodicJobImpl {
- MONGO_DISALLOW_COPYING(PeriodicJobImpl);
+ class PeriodicJobImpl : public ControllableJob {
+ PeriodicJobImpl(const PeriodicJobImpl&) = delete;
+ PeriodicJobImpl& operator=(const PeriodicJobImpl&) = delete;
public:
friend class PeriodicRunnerEmbedded;
PeriodicJobImpl(PeriodicJob job, ClockSource* source, PeriodicRunnerEmbedded* runner);
- void start();
- void pause();
- void resume();
- void stop();
+ void start() override;
+ void pause() override;
+ void resume() override;
+ void stop() override;
+ Milliseconds getPeriod() override;
+ void setPeriod(Milliseconds ms) override;
bool isAlive(WithLock lk);
@@ -99,22 +96,6 @@ private:
};
struct PeriodicJobSorter;
- std::shared_ptr<PeriodicRunnerEmbedded::PeriodicJobImpl> createAndAddJob(PeriodicJob job,
- bool shouldStart);
-
- class PeriodicJobHandleImpl : public PeriodicJobHandle {
- public:
- explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl)
- : _jobWeak(jobImpl) {}
- void start() override;
- void stop() override;
- void pause() override;
- void resume() override;
-
- private:
- std::weak_ptr<PeriodicJobImpl> _jobWeak;
- };
-
ServiceContext* _svc;
ClockSource* _clockSource;
@@ -123,7 +104,6 @@ private:
std::vector<std::shared_ptr<PeriodicJobImpl>> _Pausedjobs;
stdx::mutex _mutex;
- bool _running = false;
};
} // namespace mongo
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index ad5817c67b9..ce517052716 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -191,12 +191,6 @@ void cleanupTask(ServiceContext* serviceContext) {
if (serviceContext) {
serviceContext->setKillAllOperations();
-
- // Shut down the background periodic task runner.
- auto runner = serviceContext->getPeriodicRunner();
- if (runner) {
- runner->shutdown();
- }
}
// Perform all shutdown operations after setKillAllOperations is called in order to ensure
@@ -432,9 +426,10 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
PeriodicTask::startRunningPeriodicTasks();
// Set up the periodic runner for background job execution
- auto runner = makePeriodicRunner(serviceContext);
- runner->startup();
- serviceContext->setPeriodicRunner(std::move(runner));
+ {
+ auto runner = makePeriodicRunner(serviceContext);
+ serviceContext->setPeriodicRunner(std::move(runner));
+ }
SessionKiller::set(serviceContext,
std::make_shared<SessionKiller>(serviceContext, killSessionsRemote));
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index cd9ac27b4fc..547f379eacc 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -276,6 +276,7 @@ env.Library(
"periodic_runner.cpp",
],
LIBDEPS=[
+ "$BUILD_DIR/mongo/base",
],
)
diff --git a/src/mongo/util/periodic_runner.cpp b/src/mongo/util/periodic_runner.cpp
index 5e8cbc9b344..fecf2b9e9a5 100644
--- a/src/mongo/util/periodic_runner.cpp
+++ b/src/mongo/util/periodic_runner.cpp
@@ -32,8 +32,58 @@
#include "mongo/util/periodic_runner.h"
+#include "mongo/util/assert_util.h"
+
namespace mongo {
PeriodicRunner::~PeriodicRunner() = default;
+PeriodicJobAnchor::PeriodicJobAnchor(std::shared_ptr<Job> handle) : _handle{std::move(handle)} {}
+
+PeriodicJobAnchor::~PeriodicJobAnchor() {
+ if (!_handle) {
+ return;
+ }
+
+ _handle->stop();
+}
+
+void PeriodicJobAnchor::start() {
+ invariant(_handle);
+ _handle->start();
+}
+
+void PeriodicJobAnchor::pause() {
+ invariant(_handle);
+ _handle->pause();
+}
+
+void PeriodicJobAnchor::resume() {
+ invariant(_handle);
+ _handle->resume();
+}
+
+void PeriodicJobAnchor::stop() {
+ invariant(_handle);
+ _handle->stop();
+}
+
+void PeriodicJobAnchor::setPeriod(Milliseconds ms) {
+ invariant(_handle);
+ _handle->setPeriod(ms);
+}
+
+Milliseconds PeriodicJobAnchor::getPeriod() {
+ invariant(_handle);
+ return _handle->getPeriod();
+}
+
+void PeriodicJobAnchor::detach() {
+ _handle.reset();
+}
+
+bool PeriodicJobAnchor::isValid() const noexcept {
+ return static_cast<bool>(_handle);
+}
+
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h
index 48354326e29..5fd16115701 100644
--- a/src/mongo/util/periodic_runner.h
+++ b/src/mongo/util/periodic_runner.h
@@ -30,15 +30,19 @@
#pragma once
+#include <functional>
+#include <memory>
#include <string>
-#include "mongo/base/disallow_copying.h"
-#include "mongo/stdx/functional.h"
+#include <boost/optional.hpp>
+
+#include "mongo/stdx/mutex.h"
#include "mongo/util/time_support.h"
namespace mongo {
class Client;
+class PeriodicJobAnchor;
/**
* An interface for objects that run work items at specified intervals.
@@ -52,7 +56,8 @@ class Client;
*/
class PeriodicRunner {
public:
- using Job = stdx::function<void(Client* client)>;
+ using Job = std::function<void(Client* client)>;
+ using JobAnchor = PeriodicJobAnchor;
struct PeriodicJob {
PeriodicJob(std::string name, Job callable, Milliseconds period)
@@ -74,65 +79,115 @@ public:
Milliseconds interval;
};
- class PeriodicJobHandle {
+ /**
+ * A ControllableJob allows a user to reschedule the execution of a Job
+ */
+ class ControllableJob {
public:
- virtual ~PeriodicJobHandle() = default;
+ virtual ~ControllableJob() = default;
/**
* Starts running the job
*/
virtual void start() = 0;
+
/**
* Pauses the job temporarily so that it does not execute until
* unpaused
*/
virtual void pause() = 0;
+
/**
* Resumes a paused job so that it continues executing each interval
*/
virtual void resume() = 0;
+
/**
* Stops the job, this function blocks until the job is stopped
* Safe to invalidate the job callable after calling this.
*/
virtual void stop() = 0;
+
+ /**
+ * Returns the current period for the job
+ */
+ virtual Milliseconds getPeriod() = 0;
+
+ /**
+ * Updates the period of the job. This takes effect immediately by altering the current
+ * scheduling of the task. I.e. if more than ms have passed since the last execution of the
+ * job, it is run immediately. Otherwise the scheduling is adjusted forward or back by
+ * abs(new - old).
+ */
+ virtual void setPeriod(Milliseconds ms) = 0;
};
virtual ~PeriodicRunner();
-
/**
* Creates a new job and adds it to the runner, but does not schedule it.
* The caller is responsible for calling 'start' on the resulting handle in
* order to begin the job running. This API should be used when the caller
* is interested in observing and controlling the job execution state.
*/
- virtual std::unique_ptr<PeriodicJobHandle> makeJob(PeriodicJob job) = 0;
+ virtual JobAnchor makeJob(PeriodicJob job) = 0;
+};
- /**
- * Schedules a job to be run at periodic intervals.
- *
- * If the runner is not running when a job is scheduled, that job should
- * be saved so that it may run in the future once startup() is called.
- */
- virtual void scheduleJob(PeriodicJob job) = 0;
+/**
+ * A PeriodicJobAnchor allows the holder to control the scheduling of a job for the lifetime of the
+ * anchor. When an anchor is destructed, it stops its underlying job.
+ *
+ * The underlying weak_ptr for this class is not synchronized. In essence, treat use of this class
+ * as if it were a raw pointer to a ControllableJob.
+ *
+ * Each wrapped PeriodicRunner::ControllableJob function on this object throws
+ * if the underlying job is gone (e.g. in shutdown).
+ */
+class PeriodicJobAnchor {
+public:
+ using Job = PeriodicRunner::ControllableJob;
+
+public:
+ // Note that this constructor is only intended for use with PeriodicRunner::makeJob()
+ explicit PeriodicJobAnchor(std::shared_ptr<Job> handle);
+
+ PeriodicJobAnchor() = default;
+ PeriodicJobAnchor(PeriodicJobAnchor&&) = default;
+ PeriodicJobAnchor& operator=(PeriodicJobAnchor&&) = default;
+
+ PeriodicJobAnchor(const PeriodicJobAnchor&) = delete;
+ PeriodicJobAnchor& operator=(const PeriodicJobAnchor&) = delete;
+
+ ~PeriodicJobAnchor();
+
+ void start();
+ void pause();
+ void resume();
+ void stop();
+ void setPeriod(Milliseconds ms);
+ Milliseconds getPeriod();
/**
- * Starts up this periodic runner.
+ * Abandon responsibility for scheduling the execution of this job
*
- * This method may safely be called multiple times, either with or without
- * calls to shutdown() in between.
+ * This effectively invalidates the anchor.
*/
- virtual void startup() = 0;
+ void detach();
/**
- * Shuts down this periodic runner. Stops all jobs from running.
+ * Returns if this PeriodicJobAnchor is associated with a PeriodicRunner::ControllableJob
*
- * This method may safely be called multiple times, either with or without
- * calls to startup() in between. Any jobs that have been scheduled on this
- * runner should no longer execute once shutdown() is called.
+ * This function is useful to see if a PeriodicJobAnchor is initialized. It does not necessarily
+ * inform whether a PeriodicJobAnchor will throw from a control function above.
*/
- virtual void shutdown() = 0;
+ bool isValid() const noexcept;
+
+ explicit operator bool() const noexcept {
+ return isValid();
+ }
+
+private:
+ std::shared_ptr<Job> _handle;
};
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp
index f5fb19a8a2b..874fda8900c 100644
--- a/src/mongo/util/periodic_runner_impl.cpp
+++ b/src/mongo/util/periodic_runner_impl.cpp
@@ -28,13 +28,17 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
#include "mongo/platform/basic.h"
#include "mongo/util/periodic_runner_impl.h"
#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -42,95 +46,81 @@ namespace mongo {
PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource)
: _svc(svc), _clockSource(clockSource) {}
-PeriodicRunnerImpl::~PeriodicRunnerImpl() {
- PeriodicRunnerImpl::shutdown();
-}
-
-std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> PeriodicRunnerImpl::createAndAddJob(
- PeriodicJob job) {
+auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor {
auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this->_svc);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _jobs.push_back(impl);
- return impl;
+ JobAnchor anchor(std::move(impl));
+ return anchor;
}
-std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerImpl::makeJob(PeriodicJob job) {
- auto handle = std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(job));
- return std::move(handle);
-}
-
-void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) {
- auto impl = createAndAddJob(job);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_running) {
- impl->start();
- }
+PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
+ ClockSource* source,
+ ServiceContext* svc)
+ : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {
+ auto stopPF = makePromiseFuture<void>();
+ _stopPromise = stopPF.promise.share();
+ _stopFuture = std::move(stopPF.future);
}
-void PeriodicRunnerImpl::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
+ auto startPF = makePromiseFuture<void>();
- if (_running) {
- return;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_execStatus == ExecutionStatus::NOT_SCHEDULED);
}
- _running = true;
-
- // schedule any jobs that we have
- for (auto& job : _jobs) {
- job->start();
- }
-}
+ _thread = stdx::thread([ this, startPromise = std::move(startPF.promise) ]() mutable {
+ ON_BLOCK_EXIT([this] { _stopPromise.emplaceValue(); });
-void PeriodicRunnerImpl::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_running) {
- _running = false;
+ Client::initThread(_job.name, _serviceContext, nullptr);
- for (auto& job : _jobs) {
- job->stop();
+ // Let start() know we're running
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
}
- _jobs.clear();
- }
-}
-
-PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
- ClockSource* source,
- ServiceContext* svc)
- : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {}
-
-void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::NOT_SCHEDULED);
- _thread = stdx::thread([this] {
- Client::initThread(_job.name, _serviceContext, nullptr);
- while (true) {
- auto start = _clockSource->now();
+ startPromise.emplaceValue();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (_execStatus != ExecutionStatus::CANCELED) {
// Wait until it's unpaused or canceled
_condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
if (_execStatus == ExecutionStatus::CANCELED) {
- break;
+ return;
}
+ auto start = _clockSource->now();
+
// Unlock while job is running so we can pause/cancel concurrently
lk.unlock();
_job.job(Client::getCurrent());
lk.lock();
- if (_clockSource->waitForConditionUntil(_condvar, lk, start + _job.interval, [&] {
- return _execStatus == ExecutionStatus::CANCELED;
- })) {
- break;
- }
+ auto getDeadlineFromInterval = [&] { return start + _job.interval; };
+
+ do {
+ auto deadline = getDeadlineFromInterval();
+
+ if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] {
+ return _execStatus == ExecutionStatus::CANCELED ||
+ getDeadlineFromInterval() != deadline;
+ })) {
+ if (_execStatus == ExecutionStatus::CANCELED) {
+ return;
+ }
+ }
+ } while (_clockSource->now() < getDeadlineFromInterval());
}
});
- _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
+
+ // Wait for the thread to actually start
+ startPF.future.get();
}
void PeriodicRunnerImpl::PeriodicJobImpl::start() {
+ LOG(2) << "Starting periodic job " << _job.name;
+
_run();
}
@@ -150,53 +140,40 @@ void PeriodicRunnerImpl::PeriodicJobImpl::resume() {
}
void PeriodicRunnerImpl::PeriodicJobImpl::stop() {
- {
+ auto lastExecStatus = [&] {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_execStatus != ExecutionStatus::RUNNING && _execStatus != ExecutionStatus::PAUSED)
- return;
- invariant(_thread.joinable());
+ return std::exchange(_execStatus, ExecutionStatus::CANCELED);
+ }();
- _execStatus = PeriodicJobImpl::ExecutionStatus::CANCELED;
+ // If we never started, then nobody should wait
+ if (lastExecStatus == ExecutionStatus::NOT_SCHEDULED) {
+ return;
}
- _condvar.notify_one();
- _thread.join();
-}
-namespace {
+ // Only join once
+ if (lastExecStatus != ExecutionStatus::CANCELED) {
+ LOG(2) << "Stopping periodic job " << _job.name;
-template <typename T>
-std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) {
- if (auto p = ptr.lock()) {
- return p;
- } else {
- uasserted(ErrorCodes::InternalError, errMsg);
+ _condvar.notify_one();
+ _thread.join();
}
-}
-constexpr auto kPeriodicJobHandleLifetimeErrMsg =
- "The PeriodicRunner job for this handle no longer exists"_sd;
-
-} // namespace
-
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::start() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->start();
+ _stopFuture.get();
}
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::stop() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->stop();
+Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _job.interval;
}
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::pause() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->pause();
-}
+void PeriodicRunnerImpl::PeriodicJobImpl::setPeriod(Milliseconds ms) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _job.interval = ms;
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::resume() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->resume();
+ if (_execStatus == PeriodicJobImpl::ExecutionStatus::RUNNING) {
+ _condvar.notify_one();
+ }
}
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h
index 8c59a4f3e84..6c9850c18f1 100644
--- a/src/mongo/util/periodic_runner_impl.h
+++ b/src/mongo/util/periodic_runner_impl.h
@@ -37,6 +37,7 @@
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/future.h"
#include "mongo/util/periodic_runner.h"
namespace mongo {
@@ -51,27 +52,24 @@ class ServiceContext;
class PeriodicRunnerImpl : public PeriodicRunner {
public:
PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource);
- ~PeriodicRunnerImpl();
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override;
- void scheduleJob(PeriodicJob job) override;
-
- void startup() override;
-
- void shutdown() override;
+ JobAnchor makeJob(PeriodicJob job) override;
private:
- class PeriodicJobImpl {
- MONGO_DISALLOW_COPYING(PeriodicJobImpl);
+ class PeriodicJobImpl : public ControllableJob {
+ PeriodicJobImpl(const PeriodicJobImpl&) = delete;
+ PeriodicJobImpl& operator=(const PeriodicJobImpl&) = delete;
public:
friend class PeriodicRunnerImpl;
PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc);
- void start();
- void pause();
- void resume();
- void stop();
+ void start() override;
+ void pause() override;
+ void resume() override;
+ void stop() override;
+ Milliseconds getPeriod() override;
+ void setPeriod(Milliseconds ms) override;
enum class ExecutionStatus { NOT_SCHEDULED, RUNNING, PAUSED, CANCELED };
@@ -81,7 +79,10 @@ private:
PeriodicJob _job;
ClockSource* _clockSource;
ServiceContext* _serviceContext;
+
stdx::thread _thread;
+ SharedPromise<void> _stopPromise;
+ Future<void> _stopFuture;
stdx::mutex _mutex;
stdx::condition_variable _condvar;
@@ -93,26 +94,8 @@ private:
std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> createAndAddJob(PeriodicJob job);
- class PeriodicJobHandleImpl : public PeriodicJobHandle {
- public:
- explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl)
- : _jobWeak(jobImpl) {}
- void start() override;
- void stop() override;
- void pause() override;
- void resume() override;
-
- private:
- std::weak_ptr<PeriodicJobImpl> _jobWeak;
- };
-
ServiceContext* _svc;
ClockSource* _clockSource;
-
- std::vector<std::shared_ptr<PeriodicJobImpl>> _jobs;
-
- stdx::mutex _mutex;
- bool _running = false;
};
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp
index ef3a6ad9782..f455470afa3 100644
--- a/src/mongo/util/periodic_runner_impl_test.cpp
+++ b/src/mongo/util/periodic_runner_impl_test.cpp
@@ -52,10 +52,6 @@ public:
_runner = stdx::make_unique<PeriodicRunnerImpl>(getServiceContext(), _clockSource.get());
}
- void tearDown() override {
- _runner->shutdown();
- }
-
ClockSourceMock& clockSource() {
return *_clockSource;
}
@@ -73,7 +69,6 @@ class PeriodicRunnerImplTest : public PeriodicRunnerImplTestNoSetup {
public:
void setUp() override {
PeriodicRunnerImplTestNoSetup::setUp();
- runner().startup();
}
};
@@ -95,7 +90,8 @@ TEST_F(PeriodicRunnerImplTest, OneJobTest) {
},
interval);
- runner().scheduleJob(std::move(job));
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Fast forward ten times, we should run all ten times.
for (int i = 0; i < 10; i++) {
@@ -127,7 +123,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobDoesNotRunWithoutStart) {
},
interval);
- auto handle = runner().makeJob(std::move(job));
+ auto jobAnchor = runner().makeJob(std::move(job));
clockSource().advance(interval);
ASSERT_EQ(count, 0);
@@ -152,8 +148,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobRunsCorrectlyWithStart) {
},
interval);
- auto handle = runner().makeJob(std::move(job));
- handle->start();
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Fast forward ten times, we should run all ten times.
for (int i = 0; i < 10; i++) {
{
@@ -187,8 +183,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) {
},
interval);
- auto handle = runner().makeJob(std::move(job));
- handle->start();
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Wait for the first execution.
{
stdx::unique_lock<stdx::mutex> lk(mutex);
@@ -198,7 +194,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) {
{
stdx::unique_lock<stdx::mutex> lk(mutex);
isPaused = true;
- handle->pause();
+ jobAnchor.pause();
}
// Fast forward ten times, we shouldn't run anymore. If we do, the assert inside the job will
@@ -229,8 +225,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
},
interval);
- auto handle = runner().makeJob(std::move(job));
- handle->start();
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Wait for the first execution.
{
stdx::unique_lock<stdx::mutex> lk(mutex);
@@ -253,7 +249,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
}
}
- handle->pause();
+ jobAnchor.pause();
// Fast forward ten times, we shouldn't run anymore.
for (int i = 0; i < 10; i++) {
@@ -263,7 +259,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
// Make sure we didn't run anymore while paused.
ASSERT_EQ(count, numIterationsBeforePause);
- handle->resume();
+ jobAnchor.resume();
// Fast forward, we should run at least once.
clockSource().advance(interval);
@@ -276,39 +272,6 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
tearDown();
}
-TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) {
- int count = 0;
- Milliseconds interval{5};
-
- stdx::mutex mutex;
- stdx::condition_variable cv;
-
- // Schedule a job before startup
- PeriodicRunner::PeriodicJob job("job",
- [&count, &mutex, &cv](Client*) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- count++;
- }
- cv.notify_all();
- },
- interval);
-
- runner().scheduleJob(std::move(job));
-
- // Start the runner, job should still run
- runner().startup();
-
- clockSource().advance(interval);
-
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count > 0; });
- }
-
- tearDown();
-}
-
TEST_F(PeriodicRunnerImplTest, TwoJobsTest) {
int countA = 0;
int countB = 0;
@@ -339,8 +302,11 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsTest) {
},
intervalB);
- runner().scheduleJob(std::move(jobA));
- runner().scheduleJob(std::move(jobB));
+ auto jobAnchorA = runner().makeJob(std::move(jobA));
+ auto jobAnchorB = runner().makeJob(std::move(jobB));
+
+ jobAnchorA.start();
+ jobAnchorB.start();
// Fast forward and wait for both jobs to run the right number of times
for (int i = 0; i <= 10; i++) {
@@ -383,8 +349,11 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) {
},
Milliseconds(1));
- runner().scheduleJob(std::move(jobA));
- runner().scheduleJob(std::move(jobB));
+ auto jobAnchorA = runner().makeJob(std::move(jobA));
+ auto jobAnchorB = runner().makeJob(std::move(jobB));
+
+ jobAnchorA.start();
+ jobAnchorB.start();
clockSource().advance(Milliseconds(1));
@@ -399,5 +368,75 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) {
tearDown();
}
+TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) {
+ size_t timesCalled = 0;
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Add a job, ensure that it runs once
+ PeriodicRunner::PeriodicJob job("job",
+ [&](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ timesCalled++;
+ }
+ cv.notify_one();
+ },
+ Milliseconds(5));
+
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
+ // Wait for the first execution.
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&] { return timesCalled; });
+ }
+
+ jobAnchor.setPeriod(Milliseconds(10));
+ ASSERT_EQ(jobAnchor.getPeriod(), Milliseconds(10));
+
+ // if we change the period to a longer duration, that doesn't trigger a run
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_EQ(timesCalled, 1ul);
+ }
+
+ clockSource().advance(Milliseconds(5));
+
+ // We actually changed the period
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_EQ(timesCalled, 1ul);
+ }
+
+ clockSource().advance(Milliseconds(5));
+
+ // Now we hit the new cutoff
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&] { return timesCalled == 2ul; });
+ }
+
+ clockSource().advance(Milliseconds(5));
+
+ // Haven't hit it
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_EQ(timesCalled, 2ul);
+ }
+
+ jobAnchor.setPeriod(Milliseconds(4));
+ ASSERT_EQ(jobAnchor.getPeriod(), Milliseconds(4));
+
+ // shortening triggers the period
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&] { return timesCalled == 3ul; });
+ }
+
+ tearDown();
+}
+
} // namespace
} // namespace mongo