diff options
author | Randolph Tan <randolph@10gen.com> | 2023-03-06 17:08:07 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-07 21:18:15 +0000 |
commit | 84a03cb20852a2dc0247481f7da7a023688bd146 (patch) | |
tree | 58d2b09e1d2e4a783245dd1b73d279141ce7276d | |
parent | 589a6f66b7d8aca96b23e315996c2a697a82ec2b (diff) | |
download | mongo-84a03cb20852a2dc0247481f7da7a023688bd146.tar.gz |
SERVER-74501 Fix MigrationBatchFetcher/Inserter completion reliance to not spawn an extra cleanup thread
(cherry picked from commit bf4b3ae9b3eb5c376201135155ef6738bfef0e20)
-rw-r--r-- | jstests/sharding/move_chunk_concurrent_cloning.js | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_fetcher.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_fetcher.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_inserter.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_inserter.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/migration_batch_mock_inserter.h | 3 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.h | 7 |
7 files changed, 55 insertions, 24 deletions
diff --git a/jstests/sharding/move_chunk_concurrent_cloning.js b/jstests/sharding/move_chunk_concurrent_cloning.js index fd26af208d8..8677f9314bb 100644 --- a/jstests/sharding/move_chunk_concurrent_cloning.js +++ b/jstests/sharding/move_chunk_concurrent_cloning.js @@ -106,10 +106,10 @@ const runParallelMoveChunk = (numThreads) => { MongoRunner.stopMongod(staticMongod); }; -// Run test 10 times with random concurrency levels. -for (let i = 1; i <= 5; i++) { +runParallelMoveChunk(1); + +// Run test a few times with random concurrency levels. +for (let i = 1; i <= 4; i++) { runParallelMoveChunk(Math.floor(Math.random() * 31) + 1); } -} - - )(); +})(); diff --git a/src/mongo/db/s/migration_batch_fetcher.cpp b/src/mongo/db/s/migration_batch_fetcher.cpp index 8622379449f..46282cbf04e 100644 --- a/src/mongo/db/s/migration_batch_fetcher.cpp +++ b/src/mongo/db/s/migration_batch_fetcher.cpp @@ -73,7 +73,8 @@ MigrationBatchFetcher<Inserter>::MigrationBatchFetcher( _collectionUuid(collectionId), _migrationId{migrationId}, _writeConcern{writeConcern}, - _isParallelFetchingSupported{parallelFetchingSupported} { + _isParallelFetchingSupported{parallelFetchingSupported}, + _secondaryThrottleTicket(1) { _inserterWorkers->startup(); } @@ -164,7 +165,8 @@ void MigrationBatchFetcher<Inserter>::_runFetcher() try { _collectionUuid, _migrationProgress, _migrationId, - _chunkMigrationConcurrency}; + _chunkMigrationConcurrency, + &_secondaryThrottleTicket}; _inserterWorkers->schedule([batchSize, fetchTime, @@ -216,8 +218,14 @@ MigrationBatchFetcher<Inserter>::~MigrationBatchFetcher() { LOGV2(6718401, "Shutting down and joining inserter threads for migration {migrationId}", "migrationId"_attr = _migrationId); + + // Call waitForIdle first since join can spawn another thread while ignoring the maxPoolSize + // to finish the pending task. This is safe as long as ThreadPool::shutdown can't be + // interleaved with this call. + _inserterWorkers->waitForIdle(); _inserterWorkers->shutdown(); _inserterWorkers->join(); + LOGV2(6718415, "Inserter threads for migration {migrationId} joined", "migrationId"_attr = _migrationId); diff --git a/src/mongo/db/s/migration_batch_fetcher.h b/src/mongo/db/s/migration_batch_fetcher.h index b26f72153b1..ca7eddc54a8 100644 --- a/src/mongo/db/s/migration_batch_fetcher.h +++ b/src/mongo/db/s/migration_batch_fetcher.h @@ -40,6 +40,7 @@ #include "mongo/s/shard_id.h" #include "mongo/util/cancellation.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/producer_consumer_queue.h" #pragma once @@ -137,6 +138,8 @@ private: // Indicates if source is prepared to service _migrateClone requests in parallel. bool _isParallelFetchingSupported; + TicketHolder _secondaryThrottleTicket; + // Given session id and namespace, create migrateCloneRequest. // Only should be created once for the lifetime of the object. BSONObj _createMigrateCloneRequest() const { diff --git a/src/mongo/db/s/migration_batch_inserter.cpp b/src/mongo/db/s/migration_batch_inserter.cpp index 963832c671a..d1df6c1a1b8 100644 --- a/src/mongo/db/s/migration_batch_inserter.cpp +++ b/src/mongo/db/s/migration_batch_inserter.cpp @@ -169,20 +169,28 @@ void MigrationBatchInserter::run(Status status) const try { _migrationProgress->incNumBytes(batchClonedBytes); if (_writeConcern.needToWaitForOtherNodes() && _threadCount == 1) { - runWithoutSession(_outerOpCtx, [&] { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - _writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - LOGV2_WARNING(22011, - "secondaryThrottle on, but doc insert timed out; continuing", - "migrationId"_attr = _migrationId.toBSON()); - } else { - uassertStatusOK(replStatus.status); - } - }); + if (_secondaryThrottleTicket->tryAcquire()) { + TicketHolderReleaser ticketReleaser(_secondaryThrottleTicket); + + runWithoutSession(_outerOpCtx, [&] { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + _writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + LOGV2_WARNING(22011, + "secondaryThrottle on, but doc insert timed out; continuing", + "migrationId"_attr = _migrationId.toBSON()); + } else { + uassertStatusOK(replStatus.status); + } + }); + } else { + // Ticket should always be available unless thread pool max size 1 setting is not + // being respected. + dassert(false); + } } sleepmillis(migrateCloneInsertionBatchDelayMS.load()); diff --git a/src/mongo/db/s/migration_batch_inserter.h b/src/mongo/db/s/migration_batch_inserter.h index c3223921602..369babb69da 100644 --- a/src/mongo/db/s/migration_batch_inserter.h +++ b/src/mongo/db/s/migration_batch_inserter.h @@ -45,6 +45,7 @@ #include "mongo/db/write_concern_options.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/grid.h" +#include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/uuid.h" #pragma once @@ -106,7 +107,8 @@ public: const UUID& collectionUuid, std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress, const UUID& migrationId, - int threadCount) + int threadCount, + TicketHolder* secondaryThrottleTicket) : _outerOpCtx{outerOpCtx}, _innerOpCtx{innerOpCtx}, _batch{batch}, @@ -116,7 +118,8 @@ public: _collectionUuid{collectionUuid}, _migrationProgress{migrationProgress}, _migrationId{migrationId}, - _threadCount{threadCount} {} + _threadCount{threadCount}, + _secondaryThrottleTicket{secondaryThrottleTicket} {} static void onCreateThread(const std::string& threadName); @@ -131,6 +134,7 @@ private: std::shared_ptr<MigrationCloningProgressSharedState> _migrationProgress; UUID _migrationId; int _threadCount; + TicketHolder* _secondaryThrottleTicket; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_batch_mock_inserter.h b/src/mongo/db/s/migration_batch_mock_inserter.h index 8b4b766480c..edbe0117246 100644 --- a/src/mongo/db/s/migration_batch_mock_inserter.h +++ b/src/mongo/db/s/migration_batch_mock_inserter.h @@ -56,7 +56,8 @@ public: UUID, std::shared_ptr<MigrationCloningProgressSharedState>, UUID, - int) {} + int, + TicketHolder*) {} static void onCreateThread(const std::string& threadName) {} diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index 29acd9e09c0..1cbd6b8b263 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -147,6 +147,11 @@ public: // from ThreadPoolInterface void startup() override; void shutdown() override; + + /** + * Joins all scheduled tasks. Can also spawn a free thread that ignores maxThread options to + * execute pending tasks. + */ void join() override; /** @@ -158,6 +163,8 @@ public: * * May be called multiple times, by multiple threads. May not be called by a task in the thread * pool. + * + * Not safe to use when shutdown can be called concurrently. */ void waitForIdle(); |