diff options
-rw-r--r-- | src/mongo/db/repl/primary_only_service.cpp | 66 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service.h | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service_test.cpp | 112 |
3 files changed, 83 insertions, 134 deletions
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index cc09d7fc8cd..9351ad28fdf 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -126,6 +126,7 @@ void PrimaryOnlyService::onStepUp(long long term) { invariant(term > _term, str::stream() << "term " << term << " is not greater than " << _term); _term = term; + _state = State::kRunning; // Install a new executor, while moving the old one into 'executor2' so it can be accessed // outside of _mutex. @@ -169,59 +170,34 @@ void PrimaryOnlyService::shutdown() { shutdownImpl(); } -SemiFuture<PrimaryOnlyService::InstanceID> PrimaryOnlyService::startNewInstance( - OperationContext* opCtx, BSONObj initialState) { - +std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateInstance( + BSONObj initialState) { const auto idElem = initialState["_id"]; uassert(4908702, str::stream() << "Missing _id element when adding new instance of PrimaryOnlyService \"" << getServiceName() << "\"", !idElem.eoo()); - InstanceID instanceID = idElem.wrap().getOwned(); - - // Write initial state document to service's state document collection - insertDocument(opCtx, getStateDocumentsNS(), initialState); - const OpTime writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - - // Wait for the new instance's state document insert to be replicated to a majority and then - // create, register, and return corresponding Instance object. - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(writeOpTime) - .thenRunOn(**_executor) - .then([this, - instanceID = std::move(instanceID), - initialState = std::move(initialState), - writeTerm = writeOpTime.getTerm()] { - if (MONGO_unlikely(PrimaryOnlyServiceHangBeforeCreatingInstance.shouldFail())) { - PrimaryOnlyServiceHangBeforeCreatingInstance.pauseWhileSet(); - } - - auto instance = constructInstance(initialState); - - stdx::lock_guard lk(_mutex); + InstanceID instanceID = idElem.wrap(); - if (_state == State::kPaused || _term > writeTerm) { - return instanceID; - } - invariant(_state == State::kRunning); - invariant(_term == writeTerm); - - auto [_, inserted] = _instances.emplace(instanceID, instance); - invariant( - inserted, - str::stream() - << "Starting new PrimaryOnlyService of type " << getServiceName() - << " failed; a service instance of that type already exists with instance ID: " - << instanceID.toString()); - - // TODO(SERVER-49239): schedule first call to runOnce(). - return instanceID; - }) - .semi(); + stdx::lock_guard lk(_mutex); + uassert( + ErrorCodes::NotMaster, + str::stream() << "Not Primary when trying to create a new instance of PrimaryOnlyService " + << getServiceName(), + _state == State::kRunning); + + auto it = _instances.find(instanceID); + if (it != _instances.end()) { + return it->second; + } + auto [it2, inserted] = + _instances.emplace(instanceID.getOwned(), constructInstance(std::move(initialState))); + invariant(inserted); + return it2->second; } -boost::optional<std::shared_ptr<PrimaryOnlyService::Instance>> -PrimaryOnlyService::lookupInstanceBase(const InstanceID& id) { +boost::optional<std::shared_ptr<PrimaryOnlyService::Instance>> PrimaryOnlyService::lookupInstance( + const InstanceID& id) { stdx::lock_guard lk(_mutex); auto it = _instances.find(id); diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h index 7145cadebd8..d07da959def 100644 --- a/src/mongo/db/repl/primary_only_service.h +++ b/src/mongo/db/repl/primary_only_service.h @@ -136,18 +136,28 @@ public: virtual ~TypedInstance() = default; /** - * Same functionality as PrimaryOnlyService::lookupInstanceBase, but returns a pointer of + * Same functionality as PrimaryOnlyService::lookupInstance, but returns a pointer of * the proper derived class for the Instance. */ static boost::optional<std::shared_ptr<InstanceType>> lookup(PrimaryOnlyService* service, const InstanceID& id) { - auto instance = service->lookupInstanceBase(id); + auto instance = service->lookupInstance(id); if (!instance) { return boost::none; } return checked_pointer_cast<InstanceType>(instance.get()); } + + /** + * Same functionality as PrimaryOnlyService::getOrCreateInstance, but returns a pointer of + * the proper derived class for the Instance. + */ + static std::shared_ptr<InstanceType> getOrCreate(PrimaryOnlyService* service, + BSONObj initialState) { + auto instance = service->getOrCreateInstance(std::move(initialState)); + return checked_pointer_cast<InstanceType>(instance); + } }; explicit PrimaryOnlyService(ServiceContext* serviceContext); @@ -191,14 +201,6 @@ public: */ void onStepDown(); - /** - * Writes the given 'initialState' object to the service's state document collection and then - * schedules work to construct an in-memory instance object and start it running, as soon as the - * write of the state object is majority committed. This is the main way that consumers can - * start up new PrimaryOnlyService tasks. - */ - SemiFuture<InstanceID> startNewInstance(OperationContext* opCtx, BSONObj initialState); - protected: /** * Returns a ScopedTaskExecutor used to run instances of this service. @@ -212,10 +214,19 @@ protected: /** * Given an InstanceId returns the corresponding running Instance object, or boost::none if - * there is none. Note that 'lookupInstanceBase' will not find a newly added Instance until the - * Future returned by the call to 'startNewInstance' that added it resolves. + * there is none. + */ + boost::optional<std::shared_ptr<Instance>> lookupInstance(const InstanceID& id); + + /** + * Extracts an InstanceID from the _id field of the given 'initialState' object. If an Instance + * with the extracted InstanceID already exists in _intances, returns it. If not, constructs a + * new Instance (by calling constructInstance()), registers it in _instances, and returns it. + * It is illegal to call this more than once with 'initialState' documents that have the same + * _id but are otherwise not completely identical. + * Throws NotMaster if the node is not currently primary. */ - boost::optional<std::shared_ptr<Instance>> lookupInstanceBase(const InstanceID& id); + std::shared_ptr<Instance> getOrCreateInstance(BSONObj initialState); private: ServiceContext* const _serviceContext; @@ -239,7 +250,7 @@ private: kShutdown, }; - State _state = State::kRunning; + State _state = State::kPaused; // The term that this service is running under. long long _term = OpTime::kUninitializedTerm; diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp index 51089c7c5d5..2989f6e6980 100644 --- a/src/mongo/db/repl/primary_only_service_test.cpp +++ b/src/mongo/db/repl/primary_only_service_test.cpp @@ -187,86 +187,48 @@ DEATH_TEST_F(PrimaryOnlyServiceTest, registry.registerService(std::move(service2)); } -TEST_F(PrimaryOnlyServiceTest, BasicStartNewInstance) { - BSONObj state = BSON("_id" << 0 << "state" - << "foo"); - - // Check that state document collection is empty - auto opCtx = makeOperationContext(); - DBDirectClient client(opCtx.get()); - auto obj = client.findOne(_service->getStateDocumentsNS().toString(), Query()); - ASSERT(obj.isEmpty()); - - // Successfully start a new instance - auto fut = _service->startNewInstance(opCtx.get(), state); - auto instanceID = fut.get(); - ASSERT_EQ(0, instanceID["_id"].Int()); - - // Check that the state document for the instance was created properly. - obj = client.findOne(_service->getStateDocumentsNS().toString(), Query()); - ASSERT_EQ(0, obj["_id"].Int()); - ASSERT_EQ("foo", obj["state"].String()); - - // Check that we can look up the Instance by ID. - auto instance = TestService::Instance::lookup(_service, BSON("_id" << 0)); - ASSERT_EQ(0, instance.get()->getState()["_id"].Int()); - ASSERT_EQ("foo", instance.get()->getState()["state"].String()); -} - -TEST_F(PrimaryOnlyServiceTest, DoubleRegisterInstance) { - BSONObj state1 = BSON("_id" << 0 << "state" - << "foo"); - BSONObj state2 = BSON("_id" << 0 << "state" - << "bar"); - - auto opCtx = makeOperationContext(); +TEST_F(PrimaryOnlyServiceTest, LookupInstance) { - // Register instance with _id 0 - auto fut = _service->startNewInstance(opCtx.get(), std::move(state1)); - - // Assert that registering a second instance with the same _id throws DuplicateKey - ASSERT_THROWS_CODE(_service->startNewInstance(opCtx.get(), std::move(state2)), - DBException, - ErrorCodes::DuplicateKey); + auto instance = TestService::Instance::getOrCreate(_service, + BSON("_id" << 0 << "state" + << "foo")); + ASSERT(instance.get()); + ASSERT_EQ(0, instance->getState()["_id"].Int()); + ASSERT_EQ("foo", instance->getState()["state"].String()); - // Verify first instance was registered successfully. - auto instanceID = fut.get(); - ASSERT_EQ(0, instanceID["_id"].Int()); + auto instance2 = TestService::Instance::lookup(_service, BSON("_id" << 0)); - // Check that we can look up the Instance by ID. - auto instance = TestService::Instance::lookup(_service, BSON("_id" << 0)); - ASSERT_EQ(0, instance.get()->getState()["_id"].Int()); - ASSERT_EQ("foo", instance.get()->getState()["state"].String()); + ASSERT(instance2.get()); + ASSERT_EQ(instance.get(), instance2.get().get()); + ASSERT_EQ(0, instance2.get()->getState()["_id"].Int()); + ASSERT_EQ("foo", instance2.get()->getState()["state"].String()); } -TEST_F(PrimaryOnlyServiceTest, StepDownDuringRegisterInstance) { - BSONObj state = BSON("_id" << 0 << "state" - << "foo"); - - // Begin starting a new instance, but pause after writing the state document to disk. - auto& fp = PrimaryOnlyServiceHangBeforeCreatingInstance; - fp.setMode(FailPoint::alwaysOn); - auto opCtx = makeOperationContext(); - auto fut = _service->startNewInstance(opCtx.get(), state); - fp.waitForTimesEntered(1); +TEST_F(PrimaryOnlyServiceTest, DoubleCreateInstance) { + + auto instance = TestService::Instance::getOrCreate(_service, + BSON("_id" << 0 << "state" + << "foo")); + ASSERT(instance.get()); + ASSERT_EQ(0, instance->getState()["_id"].Int()); + ASSERT_EQ("foo", instance->getState()["state"].String()); + + // Trying to create a new instance with the same _id but different state otherwise just returns + // the already existing instance based on the _id only. + auto instance2 = TestService::Instance::getOrCreate(_service, + BSON("_id" << 0 << "state" + << "bar")); + ASSERT_EQ(instance.get(), instance2.get()); + ASSERT_EQ(0, instance2->getState()["_id"].Int()); + ASSERT_EQ("foo", instance2->getState()["state"].String()); +} - // Now step down +TEST_F(PrimaryOnlyServiceTest, CreateWhenNotPrimary) { _registry->onStepDown(); - auto replCoord = ReplicationCoordinator::get(opCtx.get()); - ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); - - // Now allow startNewInstance to complete. - fp.setMode(FailPoint::off); - auto instanceID = fut.get(); - ASSERT_EQ(0, instanceID["_id"].Int()); - - // Check that the Instance was not created since we were are no longer primary. - auto instance = TestService::Instance::lookup(_service, BSON("_id" << 0)); - ASSERT_FALSE(instance.is_initialized()); - - // Check that the state document for the instance was created properly. - DBDirectClient client(opCtx.get()); - BSONObj obj = client.findOne(_service->getStateDocumentsNS().toString(), Query()); - ASSERT_EQ(0, obj["_id"].Int()); - ASSERT_EQ("foo", obj["state"].String()); + + ASSERT_THROWS_CODE(TestService::Instance::getOrCreate(_service, + BSON("_id" << 0 << "state" + << "foo")), + DBException, + ErrorCodes::NotMaster); } |