diff options
author | Spencer T Brody <spencer@mongodb.com> | 2020-08-25 13:37:37 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-27 04:08:44 +0000 |
commit | ab874e203ae565060de279237bd6d0a41f6ab2c6 (patch) | |
tree | dbc4a6ae186e0ef92c7d7b421a863e28dcdaf0c3 /src | |
parent | cc4f24459ee9504fe4952d52820e82978be475fe (diff) | |
download | mongo-ab874e203ae565060de279237bd6d0a41f6ab2c6.tar.gz |
SERVER-49238 Ensure that OpCtxs on PrimaryOnlyService threads are interrupted if the node is no longer primary
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/operation_context.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service.cpp | 100 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service_test.cpp | 22 |
4 files changed, 128 insertions, 7 deletions
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 7d81ee4ffb6..74e19e9e798 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -358,7 +358,9 @@ void OperationContext::markKilled(ErrorCodes::Error killCode) { } if (auto status = ErrorCodes::OK; _killCode.compareAndSwap(&status, killCode)) { - _baton->notify(); + if (_baton) { + _baton->notify(); + } } } diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index 437ce7b15fb..fd32c7ab36c 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -69,6 +69,82 @@ const auto _registryRegisterer = const Status kExecutorShutdownStatus(ErrorCodes::InterruptedDueToReplStateChange, "PrimaryOnlyService executor shut down due to stepDown"); + +/** + * Client decoration used by Clients that are a part of a PrimaryOnlyService. + */ +struct PrimaryOnlyServiceClientState { + PrimaryOnlyService* primaryOnlyService = nullptr; + bool allowOpCtxWhenServiceNotRunning = false; +}; + +const auto primaryOnlyServiceStateForClient = + Client::declareDecoration<PrimaryOnlyServiceClientState>(); + +/** + * A ClientObserver that adds a hook for every time an OpCtx is created on a thread that is part of + * a PrimaryOnlyService and ensures that the OpCtx is immediately interrupted if the associated + * service is not running at the time that the OpCtx is created. This protects against the case + * where work for a service is scheduled and then the node steps down and back up before the work + * creates an OpCtx. This works because even though the node has stepped back up already, the + * service isn't "running" until it's finished its recovery which involves waiting for all work + * from the previous term as primary to complete. + */ +class PrimaryOnlyServiceClientObserver final : public ServiceContext::ClientObserver { +public: + void onCreateClient(Client* client) override {} + void onDestroyClient(Client* client) override {} + + void onCreateOperationContext(OperationContext* opCtx) override { + auto client = opCtx->getClient(); + auto clientState = primaryOnlyServiceStateForClient(client); + if (!clientState.primaryOnlyService) { + // This OpCtx/Client is not a part of a PrimaryOnlyService + return; + } + + // Ensure this OpCtx will get interrupted at stepDown. + opCtx->setAlwaysInterruptAtStepDownOrUp(); + + // If the PrimaryOnlyService this OpCtx is a part of isn't running when it's created, then + // ensure the OpCtx starts off immediately interrupted. + if (!clientState.allowOpCtxWhenServiceNotRunning && + !clientState.primaryOnlyService->isRunning()) { + opCtx->markKilled(ErrorCodes::NotMaster); + } + } + void onDestroyOperationContext(OperationContext* opCtx) override {} +}; + +ServiceContext::ConstructorActionRegisterer primaryOnlyServiceClientObserverRegisterer{ + "PrimaryOnlyServiceClientObserver", [](ServiceContext* service) { + service->registerClientObserver(std::make_unique<PrimaryOnlyServiceClientObserver>()); + }}; + +/** + * Allows OpCtxs created on PrimaryOnlyService threads to remain uninterrupted, even if the service + * they are associated with isn't running. Used during the stepUp process to allow the database + * read required to rebuild a service and get it running in the first place. + * Does not prevent other forms of OpCtx interruption, such as from stepDown or calls to killOp. + */ +class AllowOpCtxWhenServiceNotRunningBlock { +public: + explicit AllowOpCtxWhenServiceNotRunningBlock(Client* client) + : _client(client), _clientState(&primaryOnlyServiceStateForClient(_client)) { + invariant(_clientState->primaryOnlyService); + invariant(_clientState->allowOpCtxWhenServiceNotRunning == false); + _clientState->allowOpCtxWhenServiceNotRunning = true; + } + ~AllowOpCtxWhenServiceNotRunningBlock() { + invariant(_clientState->allowOpCtxWhenServiceNotRunning == true); + _clientState->allowOpCtxWhenServiceNotRunning = false; + } + +private: + Client* _client; + PrimaryOnlyServiceClientState* _clientState; +}; + } // namespace PrimaryOnlyServiceRegistry* PrimaryOnlyServiceRegistry::get(ServiceContext* serviceContext) { @@ -152,6 +228,11 @@ void PrimaryOnlyServiceRegistry::shutdown() { PrimaryOnlyService::PrimaryOnlyService(ServiceContext* serviceContext) : _serviceContext(serviceContext) {} +bool PrimaryOnlyService::isRunning() const { + stdx::lock_guard lk(_mutex); + return _state == State::kRunning; +} + void PrimaryOnlyService::startup(OperationContext* opCtx) { // Initialize the thread pool options with the service-specific limits on pool size. ThreadPool::Options threadPoolOptions(getThreadPoolLimits()); @@ -159,9 +240,16 @@ void PrimaryOnlyService::startup(OperationContext* opCtx) { // Now add the options that are fixed for all PrimaryOnlyServices. threadPoolOptions.threadNamePrefix = getServiceName() + "-"; threadPoolOptions.poolName = getServiceName() + "ThreadPool"; - threadPoolOptions.onCreateThread = [](const std::string& threadName) { + threadPoolOptions.onCreateThread = [this](const std::string& threadName) { Client::initThread(threadName.c_str()); - AuthorizationSession::get(cc())->grantInternalAuthorization(&cc()); + auto client = Client::getCurrent(); + AuthorizationSession::get(*client)->grantInternalAuthorization(&cc()); + + stdx::lock_guard<Client> lk(*client); + client->setSystemOperationKillableByStepdown(lk); + + // Associate this Client with this PrimaryOnlyService + primaryOnlyServiceStateForClient(client).primaryOnlyService = this; }; auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); @@ -206,7 +294,7 @@ void PrimaryOnlyService::onStepUp(const OpTime& stepUpOpTime) { // Ensure that all tasks from the previous term have completed before allowing tasks to be // scheduled on the new executor. if (newThenOldScopedExecutor) { - // Shutdown happens in onStepDown of previous term, so we only need to join() here. + // shutdown() happens in onStepDown of previous term, so we only need to join() here. (*newThenOldScopedExecutor)->join(); } @@ -332,6 +420,12 @@ void PrimaryOnlyService::releaseAllInstances() { void PrimaryOnlyService::_rebuildInstances() noexcept { std::vector<BSONObj> stateDocuments; { + // The PrimaryOnlyServiceClientObserver will make any OpCtx created as part of a + // PrimaryOnlyService immediately get interrupted if the service is not in state kRunning. + // Since we are in State::kRebuilding here, we need to install a + // AllowOpCtxWhenServiceNotRunningBlock so that the database read we need to do can complete + // successfully. + AllowOpCtxWhenServiceNotRunningBlock allowOpCtxBlock(Client::getCurrent()); auto opCtx = cc().makeOperationContext(); DBDirectClient client(opCtx.get()); try { diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h index 81cfba734b2..6dc53082d05 100644 --- a/src/mongo/db/repl/primary_only_service.h +++ b/src/mongo/db/repl/primary_only_service.h @@ -210,6 +210,13 @@ public: */ void releaseAllInstances(); + /** + * Returns whether this service is currently running. This is true only when the node is in + * state PRIMARY *and* this service has finished all asynchronous work associated with resuming + * after stepUp. + */ + bool isRunning() const; + protected: /** * Constructs a new Instance object with the given initial state. @@ -248,7 +255,7 @@ private: ServiceContext* const _serviceContext; - Mutex _mutex = MONGO_MAKE_LATCH("PrimaryOnlyService::_mutex"); + mutable Mutex _mutex = MONGO_MAKE_LATCH("PrimaryOnlyService::_mutex"); // Condvar to receive notifications when _rebuildInstances has completed after stepUp. stdx::condition_variable _rebuildCV; diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp index 11ca617a82b..1ac45373d2d 100644 --- a/src/mongo/db/repl/primary_only_service_test.cpp +++ b/src/mongo/db/repl/primary_only_service_test.cpp @@ -59,6 +59,7 @@ MONGO_FAIL_POINT_DEFINE(TestServiceHangDuringInitialization); MONGO_FAIL_POINT_DEFINE(TestServiceHangDuringStateOne); MONGO_FAIL_POINT_DEFINE(TestServiceHangDuringStateTwo); MONGO_FAIL_POINT_DEFINE(TestServiceHangDuringCompletion); +MONGO_FAIL_POINT_DEFINE(TestServiceHangBeforeWritingStateDoc); } // namespace class TestService final : public PrimaryOnlyService { @@ -170,6 +171,12 @@ public: lk.unlock(); + // Hang before creating OpCtx so that we can test that OpCtxs created after stepping + // down still get interrupted. + if (MONGO_unlikely(TestServiceHangBeforeWritingStateDoc.shouldFail())) { + TestServiceHangBeforeWritingStateDoc.pauseWhileSet(); + } + auto opCtx = cc().makeOperationContext(); DBDirectClient client(opCtx.get()); if (targetState == State::kDone) { @@ -248,8 +255,6 @@ public: } void stepDown() { - ASSERT_OK(ReplicationCoordinator::get(getServiceContext()) - ->setFollowerMode(MemberState::RS_SECONDARY)); _registry->onStepDown(); } @@ -543,3 +548,16 @@ TEST_F(PrimaryOnlyServiceTest, RecreateInstancesFails) { instance->onCompletion().get(); ASSERT_EQ(TestService::State::kDone, instance->getState()); } + +TEST_F(PrimaryOnlyServiceTest, OpCtxInterruptedByStepdown) { + // Ensure that if work has already been scheduled on the executor, but hasn't yet created an + // OpCtx, and then we stepDown, that the OpCtx that gets created still gets interrupted. + auto timesEntered = TestServiceHangBeforeWritingStateDoc.setMode(FailPoint::alwaysOn); + + auto instance = TestService::Instance::getOrCreate(_service, BSON("_id" << 0 << "state" << 0)); + TestServiceHangBeforeWritingStateDoc.waitForTimesEntered(++timesEntered); + stepDown(); + TestServiceHangBeforeWritingStateDoc.setMode(FailPoint::off); + + ASSERT_EQ(ErrorCodes::NotMaster, instance->onCompletion().getNoThrow()); +} |