summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2021-05-28 07:29:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-28 14:49:45 +0000
commit267abecb4ec46030a9b85194b9367a1410f1968a (patch)
tree335b0f9990cb3b1a893751119fc91a0f0c6b71b1
parent208570c252e57eedcd63dec87ca26edd1d8e44c1 (diff)
downloadmongo-267abecb4ec46030a9b85194b9367a1410f1968a.tar.gz
SERVER-56525 Make thread pool for submitting range deletion tasks a decoration on ServiceContext
-rw-r--r--src/mongo/db/mongod_main.cpp2
-rw-r--r--src/mongo/db/s/SConscript5
-rw-r--r--src/mongo/db/s/migration_util.cpp76
-rw-r--r--src/mongo/db/s/migration_util.h3
-rw-r--r--src/mongo/db/s/range_deletion_util_test.cpp2
5 files changed, 58 insertions, 30 deletions
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index a3879a152c6..033d8c3b009 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -1326,7 +1326,7 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
// The migrationutil executor must be shut down before shutting down the CatalogCacheLoader.
// Otherwise, it may try to schedule work on the CatalogCacheLoader and fail.
LOGV2_OPTIONS(4784921, {LogComponent::kSharding}, "Shutting down the MigrationUtilExecutor");
- auto migrationUtilExecutor = migrationutil::getMigrationUtilExecutor();
+ auto migrationUtilExecutor = migrationutil::getMigrationUtilExecutor(serviceContext);
migrationUtilExecutor->shutdown();
migrationUtilExecutor->join();
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 1ce8f1c70dd..54fccaa8444 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -468,12 +468,11 @@ env.CppUnitTest(
'migration_chunk_cloner_source_legacy_test.cpp',
'migration_destination_manager_test.cpp',
'migration_session_id_test.cpp',
- 'migration_util_test.cpp' if not env.TargetOSIs('windows') else [],
+ 'migration_util_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
'op_observer_sharding_test.cpp',
'persistent_task_queue_test.cpp',
- 'range_deletion_util_test.cpp' if not env.TargetOSIs(
- 'windows') else [],
+ 'range_deletion_util_test.cpp',
'resharding_collection_test.cpp',
'resharding_destined_recipient_test.cpp',
'resharding/resharding_agg_test.cpp',
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index c6578126cf3..4ab49ac38fc 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -110,6 +110,52 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
WriteConcernOptions::kNoTimeout);
+
+class MigrationUtilExecutor {
+public:
+ MigrationUtilExecutor()
+ : _executor(std::make_shared<executor::ThreadPoolTaskExecutor>(
+ _makePool(), executor::makeNetworkInterface("MigrationUtil-TaskExecutor"))) {}
+
+ void shutDownAndJoin() {
+ _executor->shutdown();
+ _executor->join();
+ }
+
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> getExecutor() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_started) {
+ _executor->startup();
+ _started = true;
+ }
+ return _executor;
+ }
+
+private:
+ std::unique_ptr<ThreadPool> _makePool() {
+ ThreadPool::Options options;
+ options.poolName = "MoveChunk";
+ options.minThreads = 0;
+ options.maxThreads = 16;
+ return std::make_unique<ThreadPool>(std::move(options));
+ }
+
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
+
+ // TODO SERVER-57253: get rid of _mutex and _started fields
+ Mutex _mutex = MONGO_MAKE_LATCH("MigrationUtilExecutor::_mutex");
+ bool _started = false;
+};
+
+const auto migrationUtilExecutorDecoration =
+ ServiceContext::declareDecoration<MigrationUtilExecutor>();
+const ServiceContext::ConstructorActionRegisterer migrationUtilExecutorRegisterer{
+ "MigrationUtilExecutor",
+ [](ServiceContext* service) {
+ // TODO SERVER-57253: start migration util executor at decoration construction time
+ },
+ [](ServiceContext* service) { migrationUtilExecutorDecoration(service).shutDownAndJoin(); }};
+
template <typename Cmd>
void sendToRecipient(OperationContext* opCtx,
const ShardId& recipientId,
@@ -194,27 +240,9 @@ void retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
} // namespace
-std::shared_ptr<executor::ThreadPoolTaskExecutor> getMigrationUtilExecutor() {
- static Mutex mutex = MONGO_MAKE_LATCH("MigrationUtilExecutor::_mutex");
- static std::shared_ptr<executor::ThreadPoolTaskExecutor> executor;
-
- stdx::lock_guard<Latch> lg(mutex);
- if (!executor) {
- auto makePool = [] {
- ThreadPool::Options options;
- options.poolName = "MoveChunk";
- options.minThreads = 0;
- options.maxThreads = 16;
- return std::make_unique<ThreadPool>(std::move(options));
- };
-
- executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
- makePool(), executor::makeNetworkInterface("MigrationUtil-TaskExecutor"));
-
- executor->startup();
- }
-
- return executor;
+std::shared_ptr<executor::ThreadPoolTaskExecutor> getMigrationUtilExecutor(
+ ServiceContext* serviceContext) {
+ return migrationUtilExecutorDecoration(serviceContext).getExecutor();
}
BSONObj makeMigrationStatusDocument(const NamespaceString& nss,
@@ -346,7 +374,7 @@ ExecutorFuture<void> cleanUpRange(ServiceContext* serviceContext,
ExecutorFuture<void> submitRangeDeletionTask(OperationContext* opCtx,
const RangeDeletionTask& deletionTask) {
const auto serviceContext = opCtx->getServiceContext();
- auto executor = getMigrationUtilExecutor();
+ auto executor = getMigrationUtilExecutor(serviceContext);
return ExecutorFuture<void>(executor)
.then([=] {
ThreadClient tc(kRangeDeletionThreadName, serviceContext);
@@ -475,7 +503,7 @@ void submitPendingDeletions(OperationContext* opCtx) {
void resubmitRangeDeletionsOnStepUp(ServiceContext* serviceContext) {
LOGV2(22028, "Starting pending deletion submission thread.");
- ExecutorFuture<void>(getMigrationUtilExecutor())
+ ExecutorFuture<void>(getMigrationUtilExecutor(serviceContext))
.then([serviceContext] {
ThreadClient tc("ResubmitRangeDeletions", serviceContext);
{
@@ -888,7 +916,7 @@ void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) {
<< doc.getMigrationSessionId().toString()
<< " on collection " << nss);
- ExecutorFuture<void>(getMigrationUtilExecutor())
+ ExecutorFuture<void>(getMigrationUtilExecutor(opCtx->getServiceContext()))
.then([serviceContext = opCtx->getServiceContext(), nss, mbg] {
ThreadClient tc("TriggerMigrationRecovery", serviceContext);
{
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index 22d6ca693e3..918371be77e 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -78,7 +78,8 @@ ChunkRange extendOrTruncateBoundsForMetadata(const CollectionMetadata& metadata,
* The executor is initialized on the first call to this function. Uses a shared_ptr
* because a shared_ptr is required to work with ExecutorFutures.
*/
-std::shared_ptr<executor::ThreadPoolTaskExecutor> getMigrationUtilExecutor();
+std::shared_ptr<executor::ThreadPoolTaskExecutor> getMigrationUtilExecutor(
+ ServiceContext* serviceContext);
/**
* Creates a query object that can used to find overlapping ranges in the pending range deletions
diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp
index 1839724b826..2566738bf6e 100644
--- a/src/mongo/db/s/range_deletion_util_test.cpp
+++ b/src/mongo/db/s/range_deletion_util_test.cpp
@@ -86,7 +86,7 @@ public:
DBDirectClient client(operationContext());
client.dropCollection(kNss.ns());
- while (migrationutil::getMigrationUtilExecutor()->hasTasks()) {
+ while (migrationutil::getMigrationUtilExecutor(getServiceContext())->hasTasks()) {
continue;
}