diff options
author | Mihai Andrei <mihai.andrei@mongodb.com> | 2020-02-06 15:58:11 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-13 19:18:40 +0000 |
commit | 00477b903ef7b06b4e561f8cca06f2ad580815a5 (patch) | |
tree | 455f900444ea9bb11576e44e0de5398e0c7b34d9 /src/mongo/db/db.cpp | |
parent | cd915cc25b0b27f8527089ac0b7c645b9c76cc42 (diff) | |
download | mongo-00477b903ef7b06b4e561f8cca06f2ad580815a5.tar.gz |
SERVER-45963 Introduce ReplicaSetNodeProcessInterface and a new TaskExecutor for it to use
Diffstat (limited to 'src/mongo/db/db.cpp')
-rw-r--r-- | src/mongo/db/db.cpp | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 59da6fe85a1..6444d5ecda4 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -102,6 +102,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/periodic_runner_job_abort_expired_transactions.h" #include "mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h" +#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/read_write_concern_defaults_cache_lookup_mongod.h" #include "mongo/db/repair_database_and_check_version.h" @@ -878,6 +879,21 @@ void setUpCatalog(ServiceContext* serviceContext) { Collection::Factory::set(serviceContext, std::make_unique<CollectionImpl::FactoryImpl>()); } +auto makeReplicaSetNodeExecutor(ServiceContext* serviceContext) { + ThreadPool::Options tpOptions; + tpOptions.threadNamePrefix = "ReplNodeDbWorker-"; + tpOptions.poolName = "ReplNodeDbWorkerThreadPool"; + tpOptions.maxThreads = ThreadPool::Options::kUnlimited; + tpOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + // TODO SERVER-45966 Add necessary hooks. + auto hookList = nullptr; + return std::make_unique<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(tpOptions), + executor::makeNetworkInterface("ReplNodeDbWorkerNetwork", nullptr, std::move(hookList))); +} + auto makeReplicationExecutor(ServiceContext* serviceContext) { ThreadPool::Options tpOptions; tpOptions.threadNamePrefix = "ReplCoord-"; @@ -928,6 +944,12 @@ void setUpReplication(ServiceContext* serviceContext) { replicationProcess, storageInterface, SecureRandom().nextInt64()); + // Only create a ReplicaSetNodeExecutor if sharding is disabled and replication is enabled. + // Note that sharding sets up its own executors for scheduling work to remote nodes. + if (!ShardingState::get(serviceContext)->enabled() && replCoord->isReplEnabled()) + ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor( + serviceContext, makeReplicaSetNodeExecutor(serviceContext)); + repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); repl::setOplogCollectionName(serviceContext); |