summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp66
-rw-r--r--src/mongo/db/repl/primary_only_service.h39
-rw-r--r--src/mongo/db/repl/primary_only_service_test.cpp112
3 files changed, 83 insertions, 134 deletions
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index cc09d7fc8cd..9351ad28fdf 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -126,6 +126,7 @@ void PrimaryOnlyService::onStepUp(long long term) {
invariant(term > _term,
str::stream() << "term " << term << " is not greater than " << _term);
_term = term;
+ _state = State::kRunning;
// Install a new executor, while moving the old one into 'executor2' so it can be accessed
// outside of _mutex.
@@ -169,59 +170,34 @@ void PrimaryOnlyService::shutdown() {
shutdownImpl();
}
-SemiFuture<PrimaryOnlyService::InstanceID> PrimaryOnlyService::startNewInstance(
- OperationContext* opCtx, BSONObj initialState) {
-
+std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateInstance(
+ BSONObj initialState) {
const auto idElem = initialState["_id"];
uassert(4908702,
str::stream() << "Missing _id element when adding new instance of PrimaryOnlyService \""
<< getServiceName() << "\"",
!idElem.eoo());
- InstanceID instanceID = idElem.wrap().getOwned();
-
- // Write initial state document to service's state document collection
- insertDocument(opCtx, getStateDocumentsNS(), initialState);
- const OpTime writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
-
- // Wait for the new instance's state document insert to be replicated to a majority and then
- // create, register, and return corresponding Instance object.
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(writeOpTime)
- .thenRunOn(**_executor)
- .then([this,
- instanceID = std::move(instanceID),
- initialState = std::move(initialState),
- writeTerm = writeOpTime.getTerm()] {
- if (MONGO_unlikely(PrimaryOnlyServiceHangBeforeCreatingInstance.shouldFail())) {
- PrimaryOnlyServiceHangBeforeCreatingInstance.pauseWhileSet();
- }
-
- auto instance = constructInstance(initialState);
-
- stdx::lock_guard lk(_mutex);
+ InstanceID instanceID = idElem.wrap();
- if (_state == State::kPaused || _term > writeTerm) {
- return instanceID;
- }
- invariant(_state == State::kRunning);
- invariant(_term == writeTerm);
-
- auto [_, inserted] = _instances.emplace(instanceID, instance);
- invariant(
- inserted,
- str::stream()
- << "Starting new PrimaryOnlyService of type " << getServiceName()
- << " failed; a service instance of that type already exists with instance ID: "
- << instanceID.toString());
-
- // TODO(SERVER-49239): schedule first call to runOnce().
- return instanceID;
- })
- .semi();
+ stdx::lock_guard lk(_mutex);
+ uassert(
+ ErrorCodes::NotMaster,
+ str::stream() << "Not Primary when trying to create a new instance of PrimaryOnlyService "
+ << getServiceName(),
+ _state == State::kRunning);
+
+ auto it = _instances.find(instanceID);
+ if (it != _instances.end()) {
+ return it->second;
+ }
+ auto [it2, inserted] =
+ _instances.emplace(instanceID.getOwned(), constructInstance(std::move(initialState)));
+ invariant(inserted);
+ return it2->second;
}
-boost::optional<std::shared_ptr<PrimaryOnlyService::Instance>>
-PrimaryOnlyService::lookupInstanceBase(const InstanceID& id) {
+boost::optional<std::shared_ptr<PrimaryOnlyService::Instance>> PrimaryOnlyService::lookupInstance(
+ const InstanceID& id) {
stdx::lock_guard lk(_mutex);
auto it = _instances.find(id);
diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h
index 7145cadebd8..d07da959def 100644
--- a/src/mongo/db/repl/primary_only_service.h
+++ b/src/mongo/db/repl/primary_only_service.h
@@ -136,18 +136,28 @@ public:
virtual ~TypedInstance() = default;
/**
- * Same functionality as PrimaryOnlyService::lookupInstanceBase, but returns a pointer of
+ * Same functionality as PrimaryOnlyService::lookupInstance, but returns a pointer of
* the proper derived class for the Instance.
*/
static boost::optional<std::shared_ptr<InstanceType>> lookup(PrimaryOnlyService* service,
const InstanceID& id) {
- auto instance = service->lookupInstanceBase(id);
+ auto instance = service->lookupInstance(id);
if (!instance) {
return boost::none;
}
return checked_pointer_cast<InstanceType>(instance.get());
}
+
+ /**
+ * Same functionality as PrimaryOnlyService::getOrCreateInstance, but returns a pointer of
+ * the proper derived class for the Instance.
+ */
+ static std::shared_ptr<InstanceType> getOrCreate(PrimaryOnlyService* service,
+ BSONObj initialState) {
+ auto instance = service->getOrCreateInstance(std::move(initialState));
+ return checked_pointer_cast<InstanceType>(instance);
+ }
};
explicit PrimaryOnlyService(ServiceContext* serviceContext);
@@ -191,14 +201,6 @@ public:
*/
void onStepDown();
- /**
- * Writes the given 'initialState' object to the service's state document collection and then
- * schedules work to construct an in-memory instance object and start it running, as soon as the
- * write of the state object is majority committed. This is the main way that consumers can
- * start up new PrimaryOnlyService tasks.
- */
- SemiFuture<InstanceID> startNewInstance(OperationContext* opCtx, BSONObj initialState);
-
protected:
/**
* Returns a ScopedTaskExecutor used to run instances of this service.
@@ -212,10 +214,19 @@ protected:
/**
* Given an InstanceId returns the corresponding running Instance object, or boost::none if
- * there is none. Note that 'lookupInstanceBase' will not find a newly added Instance until the
- * Future returned by the call to 'startNewInstance' that added it resolves.
+ * there is none.
+ */
+ boost::optional<std::shared_ptr<Instance>> lookupInstance(const InstanceID& id);
+
+ /**
+ * Extracts an InstanceID from the _id field of the given 'initialState' object. If an Instance
+ * with the extracted InstanceID already exists in _intances, returns it. If not, constructs a
+ * new Instance (by calling constructInstance()), registers it in _instances, and returns it.
+ * It is illegal to call this more than once with 'initialState' documents that have the same
+ * _id but are otherwise not completely identical.
+ * Throws NotMaster if the node is not currently primary.
*/
- boost::optional<std::shared_ptr<Instance>> lookupInstanceBase(const InstanceID& id);
+ std::shared_ptr<Instance> getOrCreateInstance(BSONObj initialState);
private:
ServiceContext* const _serviceContext;
@@ -239,7 +250,7 @@ private:
kShutdown,
};
- State _state = State::kRunning;
+ State _state = State::kPaused;
// The term that this service is running under.
long long _term = OpTime::kUninitializedTerm;
diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp
index 51089c7c5d5..2989f6e6980 100644
--- a/src/mongo/db/repl/primary_only_service_test.cpp
+++ b/src/mongo/db/repl/primary_only_service_test.cpp
@@ -187,86 +187,48 @@ DEATH_TEST_F(PrimaryOnlyServiceTest,
registry.registerService(std::move(service2));
}
-TEST_F(PrimaryOnlyServiceTest, BasicStartNewInstance) {
- BSONObj state = BSON("_id" << 0 << "state"
- << "foo");
-
- // Check that state document collection is empty
- auto opCtx = makeOperationContext();
- DBDirectClient client(opCtx.get());
- auto obj = client.findOne(_service->getStateDocumentsNS().toString(), Query());
- ASSERT(obj.isEmpty());
-
- // Successfully start a new instance
- auto fut = _service->startNewInstance(opCtx.get(), state);
- auto instanceID = fut.get();
- ASSERT_EQ(0, instanceID["_id"].Int());
-
- // Check that the state document for the instance was created properly.
- obj = client.findOne(_service->getStateDocumentsNS().toString(), Query());
- ASSERT_EQ(0, obj["_id"].Int());
- ASSERT_EQ("foo", obj["state"].String());
-
- // Check that we can look up the Instance by ID.
- auto instance = TestService::Instance::lookup(_service, BSON("_id" << 0));
- ASSERT_EQ(0, instance.get()->getState()["_id"].Int());
- ASSERT_EQ("foo", instance.get()->getState()["state"].String());
-}
-
-TEST_F(PrimaryOnlyServiceTest, DoubleRegisterInstance) {
- BSONObj state1 = BSON("_id" << 0 << "state"
- << "foo");
- BSONObj state2 = BSON("_id" << 0 << "state"
- << "bar");
-
- auto opCtx = makeOperationContext();
+TEST_F(PrimaryOnlyServiceTest, LookupInstance) {
- // Register instance with _id 0
- auto fut = _service->startNewInstance(opCtx.get(), std::move(state1));
-
- // Assert that registering a second instance with the same _id throws DuplicateKey
- ASSERT_THROWS_CODE(_service->startNewInstance(opCtx.get(), std::move(state2)),
- DBException,
- ErrorCodes::DuplicateKey);
+ auto instance = TestService::Instance::getOrCreate(_service,
+ BSON("_id" << 0 << "state"
+ << "foo"));
+ ASSERT(instance.get());
+ ASSERT_EQ(0, instance->getState()["_id"].Int());
+ ASSERT_EQ("foo", instance->getState()["state"].String());
- // Verify first instance was registered successfully.
- auto instanceID = fut.get();
- ASSERT_EQ(0, instanceID["_id"].Int());
+ auto instance2 = TestService::Instance::lookup(_service, BSON("_id" << 0));
- // Check that we can look up the Instance by ID.
- auto instance = TestService::Instance::lookup(_service, BSON("_id" << 0));
- ASSERT_EQ(0, instance.get()->getState()["_id"].Int());
- ASSERT_EQ("foo", instance.get()->getState()["state"].String());
+ ASSERT(instance2.get());
+ ASSERT_EQ(instance.get(), instance2.get().get());
+ ASSERT_EQ(0, instance2.get()->getState()["_id"].Int());
+ ASSERT_EQ("foo", instance2.get()->getState()["state"].String());
}
-TEST_F(PrimaryOnlyServiceTest, StepDownDuringRegisterInstance) {
- BSONObj state = BSON("_id" << 0 << "state"
- << "foo");
-
- // Begin starting a new instance, but pause after writing the state document to disk.
- auto& fp = PrimaryOnlyServiceHangBeforeCreatingInstance;
- fp.setMode(FailPoint::alwaysOn);
- auto opCtx = makeOperationContext();
- auto fut = _service->startNewInstance(opCtx.get(), state);
- fp.waitForTimesEntered(1);
+TEST_F(PrimaryOnlyServiceTest, DoubleCreateInstance) {
+
+ auto instance = TestService::Instance::getOrCreate(_service,
+ BSON("_id" << 0 << "state"
+ << "foo"));
+ ASSERT(instance.get());
+ ASSERT_EQ(0, instance->getState()["_id"].Int());
+ ASSERT_EQ("foo", instance->getState()["state"].String());
+
+ // Trying to create a new instance with the same _id but different state otherwise just returns
+ // the already existing instance based on the _id only.
+ auto instance2 = TestService::Instance::getOrCreate(_service,
+ BSON("_id" << 0 << "state"
+ << "bar"));
+ ASSERT_EQ(instance.get(), instance2.get());
+ ASSERT_EQ(0, instance2->getState()["_id"].Int());
+ ASSERT_EQ("foo", instance2->getState()["state"].String());
+}
- // Now step down
+TEST_F(PrimaryOnlyServiceTest, CreateWhenNotPrimary) {
_registry->onStepDown();
- auto replCoord = ReplicationCoordinator::get(opCtx.get());
- ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
-
- // Now allow startNewInstance to complete.
- fp.setMode(FailPoint::off);
- auto instanceID = fut.get();
- ASSERT_EQ(0, instanceID["_id"].Int());
-
- // Check that the Instance was not created since we were are no longer primary.
- auto instance = TestService::Instance::lookup(_service, BSON("_id" << 0));
- ASSERT_FALSE(instance.is_initialized());
-
- // Check that the state document for the instance was created properly.
- DBDirectClient client(opCtx.get());
- BSONObj obj = client.findOne(_service->getStateDocumentsNS().toString(), Query());
- ASSERT_EQ(0, obj["_id"].Int());
- ASSERT_EQ("foo", obj["state"].String());
+
+ ASSERT_THROWS_CODE(TestService::Instance::getOrCreate(_service,
+ BSON("_id" << 0 << "state"
+ << "foo")),
+ DBException,
+ ErrorCodes::NotMaster);
}