summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2020-08-25 13:37:37 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-27 04:08:44 +0000
commitab874e203ae565060de279237bd6d0a41f6ab2c6 (patch)
treedbc4a6ae186e0ef92c7d7b421a863e28dcdaf0c3 /src
parentcc4f24459ee9504fe4952d52820e82978be475fe (diff)
downloadmongo-ab874e203ae565060de279237bd6d0a41f6ab2c6.tar.gz
SERVER-49238 Ensure that OpCtxs on PrimaryOnlyService threads are interrupted if the node is no longer primary
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/operation_context.cpp4
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp100
-rw-r--r--src/mongo/db/repl/primary_only_service.h9
-rw-r--r--src/mongo/db/repl/primary_only_service_test.cpp22
4 files changed, 128 insertions, 7 deletions
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 7d81ee4ffb6..74e19e9e798 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -358,7 +358,9 @@ void OperationContext::markKilled(ErrorCodes::Error killCode) {
}
if (auto status = ErrorCodes::OK; _killCode.compareAndSwap(&status, killCode)) {
- _baton->notify();
+ if (_baton) {
+ _baton->notify();
+ }
}
}
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index 437ce7b15fb..fd32c7ab36c 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -69,6 +69,82 @@ const auto _registryRegisterer =
const Status kExecutorShutdownStatus(ErrorCodes::InterruptedDueToReplStateChange,
"PrimaryOnlyService executor shut down due to stepDown");
+
+/**
+ * Client decoration used by Clients that are a part of a PrimaryOnlyService.
+ */
+struct PrimaryOnlyServiceClientState {
+ PrimaryOnlyService* primaryOnlyService = nullptr;
+ bool allowOpCtxWhenServiceNotRunning = false;
+};
+
+const auto primaryOnlyServiceStateForClient =
+ Client::declareDecoration<PrimaryOnlyServiceClientState>();
+
+/**
+ * A ClientObserver that adds a hook for every time an OpCtx is created on a thread that is part of
+ * a PrimaryOnlyService and ensures that the OpCtx is immediately interrupted if the associated
+ * service is not running at the time that the OpCtx is created. This protects against the case
+ * where work for a service is scheduled and then the node steps down and back up before the work
+ * creates an OpCtx. This works because even though the node has stepped back up already, the
+ * service isn't "running" until it's finished its recovery which involves waiting for all work
+ * from the previous term as primary to complete.
+ */
+class PrimaryOnlyServiceClientObserver final : public ServiceContext::ClientObserver {
+public:
+ void onCreateClient(Client* client) override {}
+ void onDestroyClient(Client* client) override {}
+
+ void onCreateOperationContext(OperationContext* opCtx) override {
+ auto client = opCtx->getClient();
+ auto clientState = primaryOnlyServiceStateForClient(client);
+ if (!clientState.primaryOnlyService) {
+ // This OpCtx/Client is not a part of a PrimaryOnlyService
+ return;
+ }
+
+ // Ensure this OpCtx will get interrupted at stepDown.
+ opCtx->setAlwaysInterruptAtStepDownOrUp();
+
+ // If the PrimaryOnlyService this OpCtx is a part of isn't running when it's created, then
+ // ensure the OpCtx starts off immediately interrupted.
+ if (!clientState.allowOpCtxWhenServiceNotRunning &&
+ !clientState.primaryOnlyService->isRunning()) {
+ opCtx->markKilled(ErrorCodes::NotMaster);
+ }
+ }
+ void onDestroyOperationContext(OperationContext* opCtx) override {}
+};
+
+ServiceContext::ConstructorActionRegisterer primaryOnlyServiceClientObserverRegisterer{
+ "PrimaryOnlyServiceClientObserver", [](ServiceContext* service) {
+ service->registerClientObserver(std::make_unique<PrimaryOnlyServiceClientObserver>());
+ }};
+
+/**
+ * Allows OpCtxs created on PrimaryOnlyService threads to remain uninterrupted, even if the service
+ * they are associated with isn't running. Used during the stepUp process to allow the database
+ * read required to rebuild a service and get it running in the first place.
+ * Does not prevent other forms of OpCtx interruption, such as from stepDown or calls to killOp.
+ */
+class AllowOpCtxWhenServiceNotRunningBlock {
+public:
+ explicit AllowOpCtxWhenServiceNotRunningBlock(Client* client)
+ : _client(client), _clientState(&primaryOnlyServiceStateForClient(_client)) {
+ invariant(_clientState->primaryOnlyService);
+ invariant(_clientState->allowOpCtxWhenServiceNotRunning == false);
+ _clientState->allowOpCtxWhenServiceNotRunning = true;
+ }
+ ~AllowOpCtxWhenServiceNotRunningBlock() {
+ invariant(_clientState->allowOpCtxWhenServiceNotRunning == true);
+ _clientState->allowOpCtxWhenServiceNotRunning = false;
+ }
+
+private:
+ Client* _client;
+ PrimaryOnlyServiceClientState* _clientState;
+};
+
} // namespace
PrimaryOnlyServiceRegistry* PrimaryOnlyServiceRegistry::get(ServiceContext* serviceContext) {
@@ -152,6 +228,11 @@ void PrimaryOnlyServiceRegistry::shutdown() {
PrimaryOnlyService::PrimaryOnlyService(ServiceContext* serviceContext)
: _serviceContext(serviceContext) {}
+bool PrimaryOnlyService::isRunning() const {
+ stdx::lock_guard lk(_mutex);
+ return _state == State::kRunning;
+}
+
void PrimaryOnlyService::startup(OperationContext* opCtx) {
// Initialize the thread pool options with the service-specific limits on pool size.
ThreadPool::Options threadPoolOptions(getThreadPoolLimits());
@@ -159,9 +240,16 @@ void PrimaryOnlyService::startup(OperationContext* opCtx) {
// Now add the options that are fixed for all PrimaryOnlyServices.
threadPoolOptions.threadNamePrefix = getServiceName() + "-";
threadPoolOptions.poolName = getServiceName() + "ThreadPool";
- threadPoolOptions.onCreateThread = [](const std::string& threadName) {
+ threadPoolOptions.onCreateThread = [this](const std::string& threadName) {
Client::initThread(threadName.c_str());
- AuthorizationSession::get(cc())->grantInternalAuthorization(&cc());
+ auto client = Client::getCurrent();
+ AuthorizationSession::get(*client)->grantInternalAuthorization(&cc());
+
+ stdx::lock_guard<Client> lk(*client);
+ client->setSystemOperationKillableByStepdown(lk);
+
+ // Associate this Client with this PrimaryOnlyService
+ primaryOnlyServiceStateForClient(client).primaryOnlyService = this;
};
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
@@ -206,7 +294,7 @@ void PrimaryOnlyService::onStepUp(const OpTime& stepUpOpTime) {
// Ensure that all tasks from the previous term have completed before allowing tasks to be
// scheduled on the new executor.
if (newThenOldScopedExecutor) {
- // Shutdown happens in onStepDown of previous term, so we only need to join() here.
+ // shutdown() happens in onStepDown of previous term, so we only need to join() here.
(*newThenOldScopedExecutor)->join();
}
@@ -332,6 +420,12 @@ void PrimaryOnlyService::releaseAllInstances() {
void PrimaryOnlyService::_rebuildInstances() noexcept {
std::vector<BSONObj> stateDocuments;
{
+ // The PrimaryOnlyServiceClientObserver will make any OpCtx created as part of a
+ // PrimaryOnlyService immediately get interrupted if the service is not in state kRunning.
+ // Since we are in State::kRebuilding here, we need to install a
+ // AllowOpCtxWhenServiceNotRunningBlock so that the database read we need to do can complete
+ // successfully.
+ AllowOpCtxWhenServiceNotRunningBlock allowOpCtxBlock(Client::getCurrent());
auto opCtx = cc().makeOperationContext();
DBDirectClient client(opCtx.get());
try {
diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h
index 81cfba734b2..6dc53082d05 100644
--- a/src/mongo/db/repl/primary_only_service.h
+++ b/src/mongo/db/repl/primary_only_service.h
@@ -210,6 +210,13 @@ public:
*/
void releaseAllInstances();
+ /**
+ * Returns whether this service is currently running. This is true only when the node is in
+ * state PRIMARY *and* this service has finished all asynchronous work associated with resuming
+ * after stepUp.
+ */
+ bool isRunning() const;
+
protected:
/**
* Constructs a new Instance object with the given initial state.
@@ -248,7 +255,7 @@ private:
ServiceContext* const _serviceContext;
- Mutex _mutex = MONGO_MAKE_LATCH("PrimaryOnlyService::_mutex");
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("PrimaryOnlyService::_mutex");
// Condvar to receive notifications when _rebuildInstances has completed after stepUp.
stdx::condition_variable _rebuildCV;
diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp
index 11ca617a82b..1ac45373d2d 100644
--- a/src/mongo/db/repl/primary_only_service_test.cpp
+++ b/src/mongo/db/repl/primary_only_service_test.cpp
@@ -59,6 +59,7 @@ MONGO_FAIL_POINT_DEFINE(TestServiceHangDuringInitialization);
MONGO_FAIL_POINT_DEFINE(TestServiceHangDuringStateOne);
MONGO_FAIL_POINT_DEFINE(TestServiceHangDuringStateTwo);
MONGO_FAIL_POINT_DEFINE(TestServiceHangDuringCompletion);
+MONGO_FAIL_POINT_DEFINE(TestServiceHangBeforeWritingStateDoc);
} // namespace
class TestService final : public PrimaryOnlyService {
@@ -170,6 +171,12 @@ public:
lk.unlock();
+ // Hang before creating OpCtx so that we can test that OpCtxs created after stepping
+ // down still get interrupted.
+ if (MONGO_unlikely(TestServiceHangBeforeWritingStateDoc.shouldFail())) {
+ TestServiceHangBeforeWritingStateDoc.pauseWhileSet();
+ }
+
auto opCtx = cc().makeOperationContext();
DBDirectClient client(opCtx.get());
if (targetState == State::kDone) {
@@ -248,8 +255,6 @@ public:
}
void stepDown() {
- ASSERT_OK(ReplicationCoordinator::get(getServiceContext())
- ->setFollowerMode(MemberState::RS_SECONDARY));
_registry->onStepDown();
}
@@ -543,3 +548,16 @@ TEST_F(PrimaryOnlyServiceTest, RecreateInstancesFails) {
instance->onCompletion().get();
ASSERT_EQ(TestService::State::kDone, instance->getState());
}
+
+TEST_F(PrimaryOnlyServiceTest, OpCtxInterruptedByStepdown) {
+ // Ensure that if work has already been scheduled on the executor, but hasn't yet created an
+ // OpCtx, and then we stepDown, that the OpCtx that gets created still gets interrupted.
+ auto timesEntered = TestServiceHangBeforeWritingStateDoc.setMode(FailPoint::alwaysOn);
+
+ auto instance = TestService::Instance::getOrCreate(_service, BSON("_id" << 0 << "state" << 0));
+ TestServiceHangBeforeWritingStateDoc.waitForTimesEntered(++timesEntered);
+ stepDown();
+ TestServiceHangBeforeWritingStateDoc.setMode(FailPoint::off);
+
+ ASSERT_EQ(ErrorCodes::NotMaster, instance->onCompletion().getNoThrow());
+}