diff options
author | Spencer T Brody <spencer@mongodb.com> | 2020-07-27 16:10:30 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-31 21:23:21 +0000 |
commit | cf05a11f71768a4ae692d1ff83a5e016d29cec1c (patch) | |
tree | e14f5a1ef8faa43a7a8d0e88bddc99916dab208a | |
parent | 0e0614dfac5e0ba89e355a5822abfee9bd6766a7 (diff) | |
download | mongo-cf05a11f71768a4ae692d1ff83a5e016d29cec1c.tar.gz |
SERVER-49239 Move ownership of TaskExecutor up to PrimaryOnlyService, out of the Instances
-rw-r--r-- | src/mongo/db/repl/SConscript | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service.cpp | 78 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service.h | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/primary_only_service_test.cpp | 43 |
4 files changed, 82 insertions, 84 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 6dcdb4801ee..e06386f1469 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1590,7 +1590,14 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/logical_time_metadata_hook', + '$BUILD_DIR/mongo/executor/connection_pool_executor', + '$BUILD_DIR/mongo/executor/network_interface', + '$BUILD_DIR/mongo/executor/network_interface_factory', + '$BUILD_DIR/mongo/executor/network_interface_thread_pool', + '$BUILD_DIR/mongo/executor/network_interface_tl', '$BUILD_DIR/mongo/executor/scoped_task_executor', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', '$BUILD_DIR/mongo/s/write_ops/batch_write_types', 'wait_for_majority_service', ], diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index 9351ad28fdf..eaeb1b32e02 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -33,15 +33,23 @@ #include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replica_set_aware_service.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/service_context.h" +#include "mongo/executor/network_connection_hook.h" +#include "mongo/executor/network_interface.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/util/concurrency/thread_pool.h" namespace mongo { namespace repl { @@ -55,6 +63,9 @@ const auto _registryRegisterer = ReplicaSetAwareServiceRegistry::Registerer<PrimaryOnlyServiceRegistry>( "PrimaryOnlyServiceRegistry"); +const Status kExecutorShutdownStatus(ErrorCodes::InterruptedDueToReplStateChange, + "PrimaryOnlyService executor shut down due to stepDown"); + // Throws on error. void insertDocument(OperationContext* opCtx, const NamespaceString& collectionName, @@ -96,6 +107,12 @@ PrimaryOnlyService* PrimaryOnlyServiceRegistry::lookupService(StringData service return servicePtr; } +void PrimaryOnlyServiceRegistry::onStartup(OperationContext* opCtx) { + for (auto& service : _services) { + service.second->startup(opCtx); + } +} + void PrimaryOnlyServiceRegistry::onStepUpComplete(OperationContext*, long long term) { for (auto& service : _services) { service.second->onStepUp(term); @@ -117,8 +134,26 @@ void PrimaryOnlyServiceRegistry::shutdown() { PrimaryOnlyService::PrimaryOnlyService(ServiceContext* serviceContext) : _serviceContext(serviceContext) {} +void PrimaryOnlyService::startup(OperationContext* opCtx) { + ThreadPool::Options threadPoolOptions; + threadPoolOptions.threadNamePrefix = getServiceName() + "-"; + threadPoolOptions.poolName = getServiceName() + "ThreadPool"; + threadPoolOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + AuthorizationSession::get(cc())->grantInternalAuthorization(&cc()); + }; + + auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); + hookList->addHook(std::make_unique<rpc::LogicalTimeMetadataHook>(opCtx->getServiceContext())); + _executor = std::make_shared<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(threadPoolOptions), + executor::makeNetworkInterface(getServiceName() + "Network", nullptr, std::move(hookList))); + _executor->startup(); +} + void PrimaryOnlyService::onStepUp(long long term) { - auto executor2 = getTaskExecutor(); + auto newThenOldScopedExecutor = + std::make_shared<executor::ScopedTaskExecutor>(_executor, kExecutorShutdownStatus); { stdx::lock_guard lk(_mutex); @@ -128,46 +163,45 @@ void PrimaryOnlyService::onStepUp(long long term) { _term = term; _state = State::kRunning; - // Install a new executor, while moving the old one into 'executor2' so it can be accessed - // outside of _mutex. - _executor.swap(executor2); + // Install a new executor, while moving the old one into 'newThenOldScopedExecutor' so it + // can be accessed outside of _mutex. + _scopedExecutor.swap(newThenOldScopedExecutor); } - // Ensure that all tasks from the previous term have completed. - if (executor2) { - (*executor2)->join(); + // Ensure that all tasks from the previous term have completed before allowing tasks to be + // scheduled on the new executor. + if (newThenOldScopedExecutor) { + (*newThenOldScopedExecutor)->join(); } } void PrimaryOnlyService::onStepDown() { stdx::lock_guard lk(_mutex); - if (_executor) { - (*_executor)->shutdown(); + if (_scopedExecutor) { + (*_scopedExecutor)->shutdown(); } _state = State::kPaused; _instances.clear(); } void PrimaryOnlyService::shutdown() { - { - std::unique_ptr<executor::ScopedTaskExecutor> savedExecutor; - { - stdx::lock_guard lk(_mutex); + std::shared_ptr<executor::TaskExecutor> savedExecutor; - _executor.swap(savedExecutor); - _state = State::kShutdown; - _instances.clear(); - } + { + stdx::lock_guard lk(_mutex); - if (savedExecutor) { - (*savedExecutor)->shutdown(); - (*savedExecutor)->join(); - } + _executor.swap(savedExecutor); + _scopedExecutor.reset(); + _state = State::kShutdown; + _instances.clear(); } - shutdownImpl(); + if (savedExecutor) { + savedExecutor->shutdown(); + savedExecutor->join(); + } } std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::getOrCreateInstance( diff --git a/src/mongo/db/repl/primary_only_service.h b/src/mongo/db/repl/primary_only_service.h index 7c96c61a604..3cd16b8897a 100644 --- a/src/mongo/db/repl/primary_only_service.h +++ b/src/mongo/db/repl/primary_only_service.h @@ -175,14 +175,13 @@ public: virtual NamespaceString getStateDocumentsNS() const = 0; /** - * Virtual shutdown method where derived services can put their specific shutdown logic. + * Constructs and starts up _executor. */ - virtual void shutdownImpl() = 0; + void startup(OperationContext* opCtx); /** - * Releases all running Instances, then shuts down and joins _executor, ensuring that there are - * no remaining tasks running. - * Ends by calling shutdownImpl so that derived services can clean up their local state. + * Releases all running Instances, then shuts down and joins _executor, ensuring that + * there are no remaining tasks running. */ void shutdown(); @@ -203,11 +202,6 @@ public: protected: /** - * Returns a ScopedTaskExecutor used to run instances of this service. - */ - virtual std::unique_ptr<executor::ScopedTaskExecutor> getTaskExecutor() = 0; - - /** * Constructs a new Instance object with the given initial state. */ virtual std::shared_ptr<Instance> constructInstance(const BSONObj& initialState) const = 0; @@ -233,16 +227,18 @@ private: Mutex _mutex = MONGO_MAKE_LATCH("PrimaryOnlyService::_mutex"); - // A ScopedTaskExecutor that is used to schedule calls to runOnce against Instance objects. - // PrimaryOnlyService implementations are responsible for creating a TaskExecutor configured - // with the desired options. The size of the thread pool within the TaskExecutor limits the - // number of Instances of this PrimaryOnlyService that can be actively running on a thread - // simultaneously (though it does not limit the number of Instance objects that can - // simultaneously exist). - // This ScopedTaskExecutor wraps the TaskExecutor owned by the PrimaryOnlyService - // implementation, and is created at stepUp and destroyed at stepDown so that all outstanding - // tasks get interrupted. - std::unique_ptr<executor::ScopedTaskExecutor> _executor; + // A ScopedTaskExecutor that is used to perform all work run on behalf of an Instance. + // This ScopedTaskExecutor wraps _executor and is created at stepUp and destroyed at + // stepDown so that all outstanding tasks get interrupted. + std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; + + // The concrete TaskExecutor backing _scopedExecutor. While _scopedExecutor is created and + // destroyed with each stepUp/stepDown, _executor persists for the lifetime of the process. We + // want _executor to survive failover to prevent us from having to reallocate lots of + // thread and connection resources on every stepUp. Service instances should never have access + // to _executor directly, they should only ever use _scopedExecutor so that they get the + // guarantee that all outstanding tasks are interrupted at stepDown. + std::shared_ptr<executor::TaskExecutor> _executor; enum class State { kRunning, @@ -291,7 +287,7 @@ public: */ void shutdown(); - void onStartup(OperationContext*) final {} + void onStartup(OperationContext*) final; void onStepUpBegin(OperationContext*, long long term) final {} void onBecomeArbiter() final {} void onStepUpComplete(OperationContext*, long long term) final; diff --git a/src/mongo/db/repl/primary_only_service_test.cpp b/src/mongo/db/repl/primary_only_service_test.cpp index 2989f6e6980..4926cb56256 100644 --- a/src/mongo/db/repl/primary_only_service_test.cpp +++ b/src/mongo/db/repl/primary_only_service_test.cpp @@ -29,10 +29,9 @@ #include <memory> -#include "mongo/db/auth/authorization_session.h" + #include "mongo/db/client.h" #include "mongo/db/dbdirectclient.h" -#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/oplog.h" @@ -40,14 +39,8 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/executor/network_connection_hook.h" -#include "mongo/executor/network_interface.h" -#include "mongo/executor/network_interface_factory.h" -#include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" -#include "mongo/util/concurrency/thread_pool.h" using namespace mongo; using namespace mongo::repl; @@ -56,28 +49,9 @@ constexpr StringData kTestServiceName = "TestService"_sd; class TestService final : public PrimaryOnlyService { public: - explicit TestService(ServiceContext* serviceContext) - : PrimaryOnlyService(serviceContext), _executor(makeTaskExecutor(serviceContext)) { - _executor->startup(); - } + explicit TestService(ServiceContext* serviceContext) : PrimaryOnlyService(serviceContext) {} ~TestService() = default; - std::shared_ptr<executor::TaskExecutor> makeTaskExecutor(ServiceContext* serviceContext) { - ThreadPool::Options threadPoolOptions; - threadPoolOptions.threadNamePrefix = getServiceName() + "-"; - threadPoolOptions.poolName = getServiceName() + "ThreadPool"; - threadPoolOptions.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - AuthorizationSession::get(cc())->grantInternalAuthorization(&cc()); - }; - - auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); - hookList->addHook(std::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext)); - return std::make_shared<executor::ThreadPoolTaskExecutor>( - std::make_unique<ThreadPool>(threadPoolOptions), - executor::makeNetworkInterface("TestServiceNetwork", nullptr, std::move(hookList))); - } - StringData getServiceName() const override { return kTestServiceName; } @@ -91,16 +65,6 @@ public: return std::make_shared<TestService::Instance>(initialState); } - std::unique_ptr<executor::ScopedTaskExecutor> getTaskExecutor() override { - return std::make_unique<executor::ScopedTaskExecutor>(_executor); - } - - void shutdownImpl() override { - _executor->shutdown(); - _executor->join(); - _executor.reset(); - } - class Instance final : public PrimaryOnlyService::TypedInstance<Instance> { public: Instance(const BSONObj& state) @@ -117,9 +81,6 @@ public: private: BSONObj _state; }; - -private: - std::shared_ptr<executor::TaskExecutor> _executor; }; class PrimaryOnlyServiceTest : public ServiceContextMongoDTest { |