diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2021-05-28 07:29:43 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-28 14:49:45 +0000 |
commit | 267abecb4ec46030a9b85194b9367a1410f1968a (patch) | |
tree | 335b0f9990cb3b1a893751119fc91a0f0c6b71b1 | |
parent | 208570c252e57eedcd63dec87ca26edd1d8e44c1 (diff) | |
download | mongo-267abecb4ec46030a9b85194b9367a1410f1968a.tar.gz |
SERVER-56525 Make thread pool for submitting range deletion tasks a decoration on ServiceContext
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/range_deletion_util_test.cpp | 2 |
5 files changed, 58 insertions, 30 deletions
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index a3879a152c6..033d8c3b009 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -1326,7 +1326,7 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { // The migrationutil executor must be shut down before shutting down the CatalogCacheLoader. // Otherwise, it may try to schedule work on the CatalogCacheLoader and fail. LOGV2_OPTIONS(4784921, {LogComponent::kSharding}, "Shutting down the MigrationUtilExecutor"); - auto migrationUtilExecutor = migrationutil::getMigrationUtilExecutor(); + auto migrationUtilExecutor = migrationutil::getMigrationUtilExecutor(serviceContext); migrationUtilExecutor->shutdown(); migrationUtilExecutor->join(); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 1ce8f1c70dd..54fccaa8444 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -468,12 +468,11 @@ env.CppUnitTest( 'migration_chunk_cloner_source_legacy_test.cpp', 'migration_destination_manager_test.cpp', 'migration_session_id_test.cpp', - 'migration_util_test.cpp' if not env.TargetOSIs('windows') else [], + 'migration_util_test.cpp', 'namespace_metadata_change_notifications_test.cpp', 'op_observer_sharding_test.cpp', 'persistent_task_queue_test.cpp', - 'range_deletion_util_test.cpp' if not env.TargetOSIs( - 'windows') else [], + 'range_deletion_util_test.cpp', 'resharding_collection_test.cpp', 'resharding_destined_recipient_test.cpp', 'resharding/resharding_agg_test.cpp', diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index c6578126cf3..4ab49ac38fc 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -110,6 +110,52 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, WriteConcernOptions::kNoTimeout); + +class MigrationUtilExecutor { +public: + MigrationUtilExecutor() + : _executor(std::make_shared<executor::ThreadPoolTaskExecutor>( + _makePool(), executor::makeNetworkInterface("MigrationUtil-TaskExecutor"))) {} + + void shutDownAndJoin() { + _executor->shutdown(); + _executor->join(); + } + + std::shared_ptr<executor::ThreadPoolTaskExecutor> getExecutor() { + stdx::lock_guard<Latch> lg(_mutex); + if (!_started) { + _executor->startup(); + _started = true; + } + return _executor; + } + +private: + std::unique_ptr<ThreadPool> _makePool() { + ThreadPool::Options options; + options.poolName = "MoveChunk"; + options.minThreads = 0; + options.maxThreads = 16; + return std::make_unique<ThreadPool>(std::move(options)); + } + + std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; + + // TODO SERVER-57253: get rid of _mutex and _started fields + Mutex _mutex = MONGO_MAKE_LATCH("MigrationUtilExecutor::_mutex"); + bool _started = false; +}; + +const auto migrationUtilExecutorDecoration = + ServiceContext::declareDecoration<MigrationUtilExecutor>(); +const ServiceContext::ConstructorActionRegisterer migrationUtilExecutorRegisterer{ + "MigrationUtilExecutor", + [](ServiceContext* service) { + // TODO SERVER-57253: start migration util executor at decoration construction time + }, + [](ServiceContext* service) { migrationUtilExecutorDecoration(service).shutDownAndJoin(); }}; + template <typename Cmd> void sendToRecipient(OperationContext* opCtx, const ShardId& recipientId, @@ -194,27 +240,9 @@ void retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( } // namespace -std::shared_ptr<executor::ThreadPoolTaskExecutor> getMigrationUtilExecutor() { - static Mutex mutex = MONGO_MAKE_LATCH("MigrationUtilExecutor::_mutex"); - static std::shared_ptr<executor::ThreadPoolTaskExecutor> executor; - - stdx::lock_guard<Latch> lg(mutex); - if (!executor) { - auto makePool = [] { - ThreadPool::Options options; - options.poolName = "MoveChunk"; - options.minThreads = 0; - options.maxThreads = 16; - return std::make_unique<ThreadPool>(std::move(options)); - }; - - executor = std::make_shared<executor::ThreadPoolTaskExecutor>( - makePool(), executor::makeNetworkInterface("MigrationUtil-TaskExecutor")); - - executor->startup(); - } - - return executor; +std::shared_ptr<executor::ThreadPoolTaskExecutor> getMigrationUtilExecutor( + ServiceContext* serviceContext) { + return migrationUtilExecutorDecoration(serviceContext).getExecutor(); } BSONObj makeMigrationStatusDocument(const NamespaceString& nss, @@ -346,7 +374,7 @@ ExecutorFuture<void> cleanUpRange(ServiceContext* serviceContext, ExecutorFuture<void> submitRangeDeletionTask(OperationContext* opCtx, const RangeDeletionTask& deletionTask) { const auto serviceContext = opCtx->getServiceContext(); - auto executor = getMigrationUtilExecutor(); + auto executor = getMigrationUtilExecutor(serviceContext); return ExecutorFuture<void>(executor) .then([=] { ThreadClient tc(kRangeDeletionThreadName, serviceContext); @@ -475,7 +503,7 @@ void submitPendingDeletions(OperationContext* opCtx) { void resubmitRangeDeletionsOnStepUp(ServiceContext* serviceContext) { LOGV2(22028, "Starting pending deletion submission thread."); - ExecutorFuture<void>(getMigrationUtilExecutor()) + ExecutorFuture<void>(getMigrationUtilExecutor(serviceContext)) .then([serviceContext] { ThreadClient tc("ResubmitRangeDeletions", serviceContext); { @@ -888,7 +916,7 @@ void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) { << doc.getMigrationSessionId().toString() << " on collection " << nss); - ExecutorFuture<void>(getMigrationUtilExecutor()) + ExecutorFuture<void>(getMigrationUtilExecutor(opCtx->getServiceContext())) .then([serviceContext = opCtx->getServiceContext(), nss, mbg] { ThreadClient tc("TriggerMigrationRecovery", serviceContext); { diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index 22d6ca693e3..918371be77e 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -78,7 +78,8 @@ ChunkRange extendOrTruncateBoundsForMetadata(const CollectionMetadata& metadata, * The executor is initialized on the first call to this function. Uses a shared_ptr * because a shared_ptr is required to work with ExecutorFutures. */ -std::shared_ptr<executor::ThreadPoolTaskExecutor> getMigrationUtilExecutor(); +std::shared_ptr<executor::ThreadPoolTaskExecutor> getMigrationUtilExecutor( + ServiceContext* serviceContext); /** * Creates a query object that can used to find overlapping ranges in the pending range deletions diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index 1839724b826..2566738bf6e 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -86,7 +86,7 @@ public: DBDirectClient client(operationContext()); client.dropCollection(kNss.ns()); - while (migrationutil::getMigrationUtilExecutor()->hasTasks()) { + while (migrationutil::getMigrationUtilExecutor(getServiceContext())->hasTasks()) { continue; } |