From 0e3a54f0342277098a09783c456747db23a3ba72 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 27 Jul 2020 18:53:26 -0400 Subject: SERVER-49239 Start running PrimaryOnlyService instances when they are created. --- src/mongo/db/repl/primary_only_service.cpp | 19 ++++++++++ src/mongo/db/repl/primary_only_service.h | 48 ++++++++----------------- src/mongo/db/repl/primary_only_service_test.cpp | 27 ++++++++++---- src/mongo/executor/task_executor.h | 8 +++++ 4 files changed, 62 insertions(+), 40 deletions(-) diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index eaeb1b32e02..2a2efcf06c3 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -227,6 +227,10 @@ std::shared_ptr PrimaryOnlyService::getOrCreateIns auto [it2, inserted] = _instances.emplace(instanceID.getOwned(), constructInstance(std::move(initialState))); invariant(inserted); + + // Kick off async work to run the instance + it2->second->scheduleRun(_scopedExecutor); + return it2->second; } @@ -242,5 +246,20 @@ boost::optional> PrimaryOnlyServic return it->second; } +void PrimaryOnlyService::Instance::scheduleRun( + std::shared_ptr executor) { + invariant(!_running); + _running = true; + + (*executor)->schedule([this, executor = std::move(executor)](auto status) { + if (ErrorCodes::isCancelationError(status) || ErrorCodes::NotMaster == status) { + return; + } + invariant(status); + + run(std::move(executor)); + }); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h index 3cd16b8897a..a8f7cd8c518 100644 --- a/src/mongo/db/repl/primary_only_service.h +++ b/src/mongo/db/repl/primary_only_service.h @@ -41,9 +41,7 @@ #include "mongo/executor/scoped_task_executor.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" -#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" -#include "mongo/util/future.h" #include "mongo/util/string_map.h" namespace mongo { @@ -86,42 +84,26 @@ public: */ class Instance { public: - struct RunOnceResult { - enum State { - kKeepRunning, - kFinished, - }; - // If set to kFinished signals that the task this Instance represents is complete, and - // both the in-memory Instance object and the on-disk state document can be cleaned up. - State state = kKeepRunning; - - // If set, signals that the next call to 'runOnce' on this instance shouldn't occur - // until this optime is majority committed. - boost::optional optime; - - static RunOnceResult kComplete() { - return RunOnceResult{kFinished, boost::none}; - } - - static RunOnceResult kKeepGoing(OpTime ot) { - return RunOnceResult{kKeepRunning, std::move(ot)}; - } - - private: - RunOnceResult(State st, boost::optional ot) - : state(std::move(st)), optime(std::move(ot)) {} - }; - virtual ~Instance() = default; + /** + * Schedules work to call this instance's 'run' method against the provided + * ScopedTaskExecutor. Also includes checking to ensure that run is only ever scheduled + * once. + */ + void scheduleRun(std::shared_ptr executor); + + protected: /** * This is the main function that PrimaryOnlyService implementations will need to implement, - * and is where the bulk of the work those services perform is executed. The - * PrimaryOnlyService machinery will call this function repeatedly until the RunOnceResult - * returned has 'complete' set to true, at which point the state document will be deleted - * and the Instance destroyed. + * and is where the bulk of the work those services perform is scheduled. All work run for + * this Instance should be scheduled on 'executor'. Instances are responsible for inserting, + * updating, and deleting their state documents as needed. */ - virtual SemiFuture runOnce(OperationContext* opCtx) = 0; + virtual void run(std::shared_ptr executor) noexcept = 0; + + private: + bool _running = false; }; /** diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp index 4926cb56256..c2c15ec4ffb 100644 --- a/src/mongo/db/repl/primary_only_service_test.cpp +++ b/src/mongo/db/repl/primary_only_service_test.cpp @@ -29,7 +29,6 @@ #include - #include "mongo/db/client.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer_impl.h" @@ -41,6 +40,7 @@ #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/future.h" using namespace mongo; using namespace mongo::repl; @@ -70,8 +70,12 @@ public: Instance(const BSONObj& state) : PrimaryOnlyService::TypedInstance(), _state(state) {} - SemiFuture runOnce(OperationContext* opCtx) override { - return SemiFuture::makeReady(RunOnceResult::kComplete()); + void run(std::shared_ptr executor) noexcept override { + _completionPromise.emplaceValue(); + } + + void waitForCompletion() { + _completionPromise.getFuture().wait(); } const BSONObj& getState() const { @@ -80,6 +84,7 @@ public: private: BSONObj _state; + SharedPromise _completionPromise; }; }; @@ -91,9 +96,8 @@ public: WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + auto opCtx = cc().makeOperationContext(); { - auto opCtx = cc().makeOperationContext(); - auto replCoord = std::make_unique(serviceContext); ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY)); ASSERT_OK(replCoord->updateTerm(opCtx.get(), 1)); @@ -114,6 +118,7 @@ public: std::make_unique(getServiceContext()); _registry->registerService(std::move(service)); + _registry->onStartup(opCtx.get()); _registry->onStepUpComplete(nullptr, 1); } @@ -148,8 +153,17 @@ DEATH_TEST_F(PrimaryOnlyServiceTest, registry.registerService(std::move(service2)); } -TEST_F(PrimaryOnlyServiceTest, LookupInstance) { +TEST_F(PrimaryOnlyServiceTest, BasicCreateInstance) { + auto instance = TestService::Instance::getOrCreate(_service, + BSON("_id" << 0 << "state" + << "foo")); + ASSERT(instance.get()); + ASSERT_EQ(0, instance->getState()["_id"].Int()); + ASSERT_EQ("foo", instance->getState()["state"].String()); + instance->waitForCompletion(); +} +TEST_F(PrimaryOnlyServiceTest, LookupInstance) { auto instance = TestService::Instance::getOrCreate(_service, BSON("_id" << 0 << "state" << "foo")); @@ -166,7 +180,6 @@ TEST_F(PrimaryOnlyServiceTest, LookupInstance) { } TEST_F(PrimaryOnlyServiceTest, DoubleCreateInstance) { - auto instance = TestService::Instance::getOrCreate(_service, BSON("_id" << 0 << "state" << "foo")); diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 4f2dad79abe..c3a46671672 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -213,6 +213,10 @@ public: Date_t deadline = Date_t::max()) = 0; + /** + * Schedules the given Task to run in this executor. + * Note that 'func' is implicitly noexcept and should not ever leak exceptions. + */ void schedule(OutOfLineExecutor::Task func) final override; /** @@ -225,6 +229,8 @@ public: * converts to CallbackFn, since such a value would be moved from and invalidated during * conversion with no way to recover it. * + * "work" should be considered implicitly 'noexcept' and thus should not throw any exceptions. + * * May be called by client threads or callbacks running in the executor. * * Contract: Implementations should guarantee that callback should be called *after* doing any @@ -244,6 +250,8 @@ public: * converts to CallbackFn, since such a value would be moved from and invalidated during * conversion with no way to recover it. * + * "work" should be considered implicitly 'noexcept' and thus should not throw any exceptions. + * * May be called by client threads or callbacks running in the executor. * * Contract: Implementations should guarantee that callback should be called *after* doing any -- cgit v1.2.1