diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2020-02-20 13:46:31 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-20 20:49:40 +0000 |
commit | a8fdddf98dd47b65b05ce28d41214ba87446f626 (patch) | |
tree | 8c0e94d19f73e60c7c58ff4bf662a42ae8554638 /src/mongo | |
parent | c73b7af4489ed839ab9f337bad08d9a6c93220e4 (diff) | |
download | mongo-a8fdddf98dd47b65b05ce28d41214ba87446f626.tar.gz |
SERVER-46075 Make new executor for submitting range deletion tasks
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/collection_sharding_state_factory_shard.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state_factory_shard.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 50 |
3 files changed, 48 insertions, 26 deletions
diff --git a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp index 15a4d035873..626564fff79 100644 --- a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp +++ b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp @@ -48,20 +48,22 @@ CollectionShardingStateFactoryShard::~CollectionShardingStateFactoryShard() { } void CollectionShardingStateFactoryShard::join() { - if (_taskExecutor) { - _taskExecutor->shutdown(); - _taskExecutor->join(); + if (_rangeDeletionExecutor) { + _rangeDeletionExecutor->shutdown(); + _rangeDeletionExecutor->join(); } } std::unique_ptr<CollectionShardingState> CollectionShardingStateFactoryShard::make( const NamespaceString& nss) { - return std::make_unique<CollectionShardingRuntime>(_serviceContext, nss, _getExecutor()); + return std::make_unique<CollectionShardingRuntime>( + _serviceContext, nss, _getRangeDeletionExecutor()); } -std::shared_ptr<executor::TaskExecutor> CollectionShardingStateFactoryShard::_getExecutor() { +std::shared_ptr<executor::TaskExecutor> +CollectionShardingStateFactoryShard::_getRangeDeletionExecutor() { stdx::lock_guard<Latch> lg(_mutex); - if (!_taskExecutor) { + if (!_rangeDeletionExecutor) { const std::string kExecName("CollectionRangeDeleter-TaskExecutor"); auto net = executor::makeNetworkInterface(kExecName); @@ -70,11 +72,11 @@ std::shared_ptr<executor::TaskExecutor> CollectionShardingStateFactoryShard::_ge std::make_shared<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); taskExecutor->startup(); - _taskExecutor = std::move(taskExecutor); + _rangeDeletionExecutor = std::move(taskExecutor); } - return _taskExecutor; + return _rangeDeletionExecutor; } -} // namespace mongo
\ No newline at end of file +} // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state_factory_shard.h b/src/mongo/db/s/collection_sharding_state_factory_shard.h index f24929b4694..a3a0c04e82a 100644 --- a/src/mongo/db/s/collection_sharding_state_factory_shard.h +++ b/src/mongo/db/s/collection_sharding_state_factory_shard.h @@ -45,13 +45,13 @@ public: std::unique_ptr<CollectionShardingState> make(const NamespaceString& nss) override; private: - std::shared_ptr<executor::TaskExecutor> _getExecutor(); + std::shared_ptr<executor::TaskExecutor> _getRangeDeletionExecutor(); // Serializes the instantiation of the task executor Mutex _mutex = MONGO_MAKE_LATCH("CollectionShardingStateFactoryShard::_mutex"); // Required to be a shared_ptr since it is used as an executor for ExecutorFutures. - std::shared_ptr<executor::TaskExecutor> _taskExecutor = {nullptr}; + std::shared_ptr<executor::TaskExecutor> _rangeDeletionExecutor = {nullptr}; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index c7eb2183d85..760f058ad52 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -59,6 +59,7 @@ #include "mongo/s/client/shard.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/ensure_chunk_version_is_greater_than_gen.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -103,6 +104,27 @@ void sendToRecipient(OperationContext* opCtx, uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(response)); } +// Returns an executor to be used to run commands related to submitting tasks to the range deleter. +// The executor is initialized on the first call to this function. Uses a shared_ptr +// because a shared_ptr is required to work with ExecutorFutures. +static std::shared_ptr<ThreadPool> getMigrationUtilExecutor() { + static Mutex mutex = MONGO_MAKE_LATCH("MigrationUtilExecutor::_mutex"); + static std::shared_ptr<ThreadPool> executor; + + stdx::lock_guard<Latch> lg(mutex); + if (!executor) { + ThreadPool::Options options; + options.poolName = "MoveChunk"; + options.minThreads = 0; + options.maxThreads = 16; + executor = std::make_shared<ThreadPool>(std::move(options)); + executor->startup(); + } + + return executor; +} + + } // namespace BSONObj makeMigrationStatusDocument(const NamespaceString& nss, @@ -138,7 +160,7 @@ bool checkForConflictingDeletions(OperationContext* opCtx, ExecutorFuture<void> submitRangeDeletionTask(OperationContext* opCtx, const RangeDeletionTask& deletionTask) { const auto serviceContext = opCtx->getServiceContext(); - return ExecutorFuture<void>(Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor()) + return ExecutorFuture<void>(getMigrationUtilExecutor()) .then([=] { ThreadClient tc(kRangeDeletionThreadName, serviceContext); { @@ -242,19 +264,18 @@ void submitPendingDeletions(OperationContext* opCtx) { void resubmitRangeDeletionsOnStepUp(ServiceContext* serviceContext) { LOGV2(22028, "Starting pending deletion submission thread."); - auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); - - ExecutorFuture<void>(executor).getAsync([serviceContext](const Status& status) { - ThreadClient tc("ResubmitRangeDeletions", serviceContext); - { - stdx::lock_guard<Client> lk(*tc.get()); - tc->setSystemOperationKillable(lk); - } + ExecutorFuture<void>(getMigrationUtilExecutor()) + .getAsync([serviceContext](const Status& status) { + ThreadClient tc("ResubmitRangeDeletions", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } - auto opCtx = tc->makeOperationContext(); + auto opCtx = tc->makeOperationContext(); - submitPendingDeletions(opCtx.get()); - }); + submitPendingDeletions(opCtx.get()); + }); } void dropRangeDeletionsCollection(OperationContext* opCtx) { @@ -626,8 +647,7 @@ void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const Namespa void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { LOGV2(22037, "Starting migration coordinator stepup recovery thread."); - auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); - ExecutorFuture<void>(executor) + ExecutorFuture<void>(getMigrationUtilExecutor()) .then([serviceContext] { ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); { @@ -659,7 +679,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { "lastOpTime"_attr = lastOpTime); return WaitForMajorityService::get(serviceContext).waitUntilMajority(lastOpTime); }) - .thenRunOn(executor) + .thenRunOn(getMigrationUtilExecutor()) .then([serviceContext]() { ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); { |