summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2020-07-27 18:53:26 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-31 21:23:21 +0000
commit0e3a54f0342277098a09783c456747db23a3ba72 (patch)
tree0b0bc820e8b4fd80ba318b411e0130c9dd8a1f2c
parentcf05a11f71768a4ae692d1ff83a5e016d29cec1c (diff)
downloadmongo-0e3a54f0342277098a09783c456747db23a3ba72.tar.gz
SERVER-49239 Start running PrimaryOnlyService instances when they are created.
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp19
-rw-r--r--src/mongo/db/repl/primary_only_service.h48
-rw-r--r--src/mongo/db/repl/primary_only_service_test.cpp27
-rw-r--r--src/mongo/executor/task_executor.h8
4 files changed, 62 insertions, 40 deletions
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index eaeb1b32e02..2a2efcf06c3 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -227,6 +227,10 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateIns
auto [it2, inserted] =
_instances.emplace(instanceID.getOwned(), constructInstance(std::move(initialState)));
invariant(inserted);
+
+ // Kick off async work to run the instance
+ it2->second->scheduleRun(_scopedExecutor);
+
return it2->second;
}
@@ -242,5 +246,20 @@ boost::optional<std::shared_ptr<PrimaryOnlyService::Instance>> PrimaryOnlyServic
return it->second;
}
+void PrimaryOnlyService::Instance::scheduleRun(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) {
+ invariant(!_running);
+ _running = true;
+
+ (*executor)->schedule([this, executor = std::move(executor)](auto status) {
+ if (ErrorCodes::isCancelationError(status) || ErrorCodes::NotMaster == status) {
+ return;
+ }
+ invariant(status);
+
+ run(std::move(executor));
+ });
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h
index 3cd16b8897a..a8f7cd8c518 100644
--- a/src/mongo/db/repl/primary_only_service.h
+++ b/src/mongo/db/repl/primary_only_service.h
@@ -41,9 +41,7 @@
#include "mongo/executor/scoped_task_executor.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
-#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
-#include "mongo/util/future.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -86,42 +84,26 @@ public:
*/
class Instance {
public:
- struct RunOnceResult {
- enum State {
- kKeepRunning,
- kFinished,
- };
- // If set to kFinished signals that the task this Instance represents is complete, and
- // both the in-memory Instance object and the on-disk state document can be cleaned up.
- State state = kKeepRunning;
-
- // If set, signals that the next call to 'runOnce' on this instance shouldn't occur
- // until this optime is majority committed.
- boost::optional<OpTime> optime;
-
- static RunOnceResult kComplete() {
- return RunOnceResult{kFinished, boost::none};
- }
-
- static RunOnceResult kKeepGoing(OpTime ot) {
- return RunOnceResult{kKeepRunning, std::move(ot)};
- }
-
- private:
- RunOnceResult(State st, boost::optional<OpTime> ot)
- : state(std::move(st)), optime(std::move(ot)) {}
- };
-
virtual ~Instance() = default;
/**
+ * Schedules work to call this instance's 'run' method against the provided
+ * ScopedTaskExecutor. Also includes checking to ensure that run is only ever scheduled
+ * once.
+ */
+ void scheduleRun(std::shared_ptr<executor::ScopedTaskExecutor> executor);
+
+ protected:
+ /**
* This is the main function that PrimaryOnlyService implementations will need to implement,
- * and is where the bulk of the work those services perform is executed. The
- * PrimaryOnlyService machinery will call this function repeatedly until the RunOnceResult
- * returned has 'complete' set to true, at which point the state document will be deleted
- * and the Instance destroyed.
+ * and is where the bulk of the work those services perform is scheduled. All work run for
+ * this Instance should be scheduled on 'executor'. Instances are responsible for inserting,
+ * updating, and deleting their state documents as needed.
*/
- virtual SemiFuture<RunOnceResult> runOnce(OperationContext* opCtx) = 0;
+ virtual void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept = 0;
+
+ private:
+ bool _running = false;
};
/**
diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp
index 4926cb56256..c2c15ec4ffb 100644
--- a/src/mongo/db/repl/primary_only_service_test.cpp
+++ b/src/mongo/db/repl/primary_only_service_test.cpp
@@ -29,7 +29,6 @@
#include <memory>
-
#include "mongo/db/client.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/op_observer_impl.h"
@@ -41,6 +40,7 @@
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/future.h"
using namespace mongo;
using namespace mongo::repl;
@@ -70,8 +70,12 @@ public:
Instance(const BSONObj& state)
: PrimaryOnlyService::TypedInstance<Instance>(), _state(state) {}
- SemiFuture<RunOnceResult> runOnce(OperationContext* opCtx) override {
- return SemiFuture<RunOnceResult>::makeReady(RunOnceResult::kComplete());
+ void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override {
+ _completionPromise.emplaceValue();
+ }
+
+ void waitForCompletion() {
+ _completionPromise.getFuture().wait();
}
const BSONObj& getState() const {
@@ -80,6 +84,7 @@ public:
private:
BSONObj _state;
+ SharedPromise<void> _completionPromise;
};
};
@@ -91,9 +96,8 @@ public:
WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+ auto opCtx = cc().makeOperationContext();
{
- auto opCtx = cc().makeOperationContext();
-
auto replCoord = std::make_unique<ReplicationCoordinatorMock>(serviceContext);
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY));
ASSERT_OK(replCoord->updateTerm(opCtx.get(), 1));
@@ -114,6 +118,7 @@ public:
std::make_unique<TestService>(getServiceContext());
_registry->registerService(std::move(service));
+ _registry->onStartup(opCtx.get());
_registry->onStepUpComplete(nullptr, 1);
}
@@ -148,8 +153,17 @@ DEATH_TEST_F(PrimaryOnlyServiceTest,
registry.registerService(std::move(service2));
}
-TEST_F(PrimaryOnlyServiceTest, LookupInstance) {
+TEST_F(PrimaryOnlyServiceTest, BasicCreateInstance) {
+ auto instance = TestService::Instance::getOrCreate(_service,
+ BSON("_id" << 0 << "state"
+ << "foo"));
+ ASSERT(instance.get());
+ ASSERT_EQ(0, instance->getState()["_id"].Int());
+ ASSERT_EQ("foo", instance->getState()["state"].String());
+ instance->waitForCompletion();
+}
+TEST_F(PrimaryOnlyServiceTest, LookupInstance) {
auto instance = TestService::Instance::getOrCreate(_service,
BSON("_id" << 0 << "state"
<< "foo"));
@@ -166,7 +180,6 @@ TEST_F(PrimaryOnlyServiceTest, LookupInstance) {
}
TEST_F(PrimaryOnlyServiceTest, DoubleCreateInstance) {
-
auto instance = TestService::Instance::getOrCreate(_service,
BSON("_id" << 0 << "state"
<< "foo"));
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 4f2dad79abe..c3a46671672 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -213,6 +213,10 @@ public:
Date_t deadline = Date_t::max()) = 0;
+ /**
+ * Schedules the given Task to run in this executor.
+ * Note that 'func' is implicitly noexcept and should not ever leak exceptions.
+ */
void schedule(OutOfLineExecutor::Task func) final override;
/**
@@ -225,6 +229,8 @@ public:
* converts to CallbackFn, since such a value would be moved from and invalidated during
* conversion with no way to recover it.
*
+ * "work" should be considered implicitly 'noexcept' and thus should not throw any exceptions.
+ *
* May be called by client threads or callbacks running in the executor.
*
* Contract: Implementations should guarantee that callback should be called *after* doing any
@@ -244,6 +250,8 @@ public:
* converts to CallbackFn, since such a value would be moved from and invalidated during
* conversion with no way to recover it.
*
+ * "work" should be considered implicitly 'noexcept' and thus should not throw any exceptions.
+ *
* May be called by client threads or callbacks running in the executor.
*
* Contract: Implementations should guarantee that callback should be called *after* doing any