summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2023-03-06 17:08:07 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-07 21:18:15 +0000
commit84a03cb20852a2dc0247481f7da7a023688bd146 (patch)
tree58d2b09e1d2e4a783245dd1b73d279141ce7276d
parent589a6f66b7d8aca96b23e315996c2a697a82ec2b (diff)
downloadmongo-84a03cb20852a2dc0247481f7da7a023688bd146.tar.gz
SERVER-74501 Fix MigrationBatchFetcher/Inserter completion reliance to not spawn an extra cleanup thread
(cherry picked from commit bf4b3ae9b3eb5c376201135155ef6738bfef0e20)
-rw-r--r--jstests/sharding/move_chunk_concurrent_cloning.js10
-rw-r--r--src/mongo/db/s/migration_batch_fetcher.cpp12
-rw-r--r--src/mongo/db/s/migration_batch_fetcher.h3
-rw-r--r--src/mongo/db/s/migration_batch_inserter.cpp36
-rw-r--r--src/mongo/db/s/migration_batch_inserter.h8
-rw-r--r--src/mongo/db/s/migration_batch_mock_inserter.h3
-rw-r--r--src/mongo/util/concurrency/thread_pool.h7
7 files changed, 55 insertions, 24 deletions
diff --git a/jstests/sharding/move_chunk_concurrent_cloning.js b/jstests/sharding/move_chunk_concurrent_cloning.js
index fd26af208d8..8677f9314bb 100644
--- a/jstests/sharding/move_chunk_concurrent_cloning.js
+++ b/jstests/sharding/move_chunk_concurrent_cloning.js
@@ -106,10 +106,10 @@ const runParallelMoveChunk = (numThreads) => {
MongoRunner.stopMongod(staticMongod);
};
-// Run test 10 times with random concurrency levels.
-for (let i = 1; i <= 5; i++) {
+runParallelMoveChunk(1);
+
+// Run test a few times with random concurrency levels.
+for (let i = 1; i <= 4; i++) {
runParallelMoveChunk(Math.floor(Math.random() * 31) + 1);
}
-}
-
- )();
+})();
diff --git a/src/mongo/db/s/migration_batch_fetcher.cpp b/src/mongo/db/s/migration_batch_fetcher.cpp
index 8622379449f..46282cbf04e 100644
--- a/src/mongo/db/s/migration_batch_fetcher.cpp
+++ b/src/mongo/db/s/migration_batch_fetcher.cpp
@@ -73,7 +73,8 @@ MigrationBatchFetcher<Inserter>::MigrationBatchFetcher(
_collectionUuid(collectionId),
_migrationId{migrationId},
_writeConcern{writeConcern},
- _isParallelFetchingSupported{parallelFetchingSupported} {
+ _isParallelFetchingSupported{parallelFetchingSupported},
+ _secondaryThrottleTicket(1) {
_inserterWorkers->startup();
}
@@ -164,7 +165,8 @@ void MigrationBatchFetcher<Inserter>::_runFetcher() try {
_collectionUuid,
_migrationProgress,
_migrationId,
- _chunkMigrationConcurrency};
+ _chunkMigrationConcurrency,
+ &_secondaryThrottleTicket};
_inserterWorkers->schedule([batchSize,
fetchTime,
@@ -216,8 +218,14 @@ MigrationBatchFetcher<Inserter>::~MigrationBatchFetcher() {
LOGV2(6718401,
"Shutting down and joining inserter threads for migration {migrationId}",
"migrationId"_attr = _migrationId);
+
+ // Call waitForIdle first since join can spawn another thread while ignoring the maxPoolSize
+ // to finish the pending task. This is safe as long as ThreadPool::shutdown can't be
+ // interleaved with this call.
+ _inserterWorkers->waitForIdle();
_inserterWorkers->shutdown();
_inserterWorkers->join();
+
LOGV2(6718415,
"Inserter threads for migration {migrationId} joined",
"migrationId"_attr = _migrationId);
diff --git a/src/mongo/db/s/migration_batch_fetcher.h b/src/mongo/db/s/migration_batch_fetcher.h
index b26f72153b1..ca7eddc54a8 100644
--- a/src/mongo/db/s/migration_batch_fetcher.h
+++ b/src/mongo/db/s/migration_batch_fetcher.h
@@ -40,6 +40,7 @@
#include "mongo/s/shard_id.h"
#include "mongo/util/cancellation.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/producer_consumer_queue.h"
#pragma once
@@ -137,6 +138,8 @@ private:
// Indicates if source is prepared to service _migrateClone requests in parallel.
bool _isParallelFetchingSupported;
+ TicketHolder _secondaryThrottleTicket;
+
// Given session id and namespace, create migrateCloneRequest.
// Only should be created once for the lifetime of the object.
BSONObj _createMigrateCloneRequest() const {
diff --git a/src/mongo/db/s/migration_batch_inserter.cpp b/src/mongo/db/s/migration_batch_inserter.cpp
index 963832c671a..d1df6c1a1b8 100644
--- a/src/mongo/db/s/migration_batch_inserter.cpp
+++ b/src/mongo/db/s/migration_batch_inserter.cpp
@@ -169,20 +169,28 @@ void MigrationBatchInserter::run(Status status) const try {
_migrationProgress->incNumBytes(batchClonedBytes);
if (_writeConcern.needToWaitForOtherNodes() && _threadCount == 1) {
- runWithoutSession(_outerOpCtx, [&] {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- LOGV2_WARNING(22011,
- "secondaryThrottle on, but doc insert timed out; continuing",
- "migrationId"_attr = _migrationId.toBSON());
- } else {
- uassertStatusOK(replStatus.status);
- }
- });
+ if (_secondaryThrottleTicket->tryAcquire()) {
+ TicketHolderReleaser ticketReleaser(_secondaryThrottleTicket);
+
+ runWithoutSession(_outerOpCtx, [&] {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ LOGV2_WARNING(22011,
+ "secondaryThrottle on, but doc insert timed out; continuing",
+ "migrationId"_attr = _migrationId.toBSON());
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
+ });
+ } else {
+ // Ticket should always be available unless thread pool max size 1 setting is not
+ // being respected.
+ dassert(false);
+ }
}
sleepmillis(migrateCloneInsertionBatchDelayMS.load());
diff --git a/src/mongo/db/s/migration_batch_inserter.h b/src/mongo/db/s/migration_batch_inserter.h
index c3223921602..369babb69da 100644
--- a/src/mongo/db/s/migration_batch_inserter.h
+++ b/src/mongo/db/s/migration_batch_inserter.h
@@ -45,6 +45,7 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/grid.h"
+#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/uuid.h"
#pragma once
@@ -106,7 +107,8 @@ public:
const UUID& collectionUuid,
std::shared_ptr<MigrationCloningProgressSharedState> migrationProgress,
const UUID& migrationId,
- int threadCount)
+ int threadCount,
+ TicketHolder* secondaryThrottleTicket)
: _outerOpCtx{outerOpCtx},
_innerOpCtx{innerOpCtx},
_batch{batch},
@@ -116,7 +118,8 @@ public:
_collectionUuid{collectionUuid},
_migrationProgress{migrationProgress},
_migrationId{migrationId},
- _threadCount{threadCount} {}
+ _threadCount{threadCount},
+ _secondaryThrottleTicket{secondaryThrottleTicket} {}
static void onCreateThread(const std::string& threadName);
@@ -131,6 +134,7 @@ private:
std::shared_ptr<MigrationCloningProgressSharedState> _migrationProgress;
UUID _migrationId;
int _threadCount;
+ TicketHolder* _secondaryThrottleTicket;
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_batch_mock_inserter.h b/src/mongo/db/s/migration_batch_mock_inserter.h
index 8b4b766480c..edbe0117246 100644
--- a/src/mongo/db/s/migration_batch_mock_inserter.h
+++ b/src/mongo/db/s/migration_batch_mock_inserter.h
@@ -56,7 +56,8 @@ public:
UUID,
std::shared_ptr<MigrationCloningProgressSharedState>,
UUID,
- int) {}
+ int,
+ TicketHolder*) {}
static void onCreateThread(const std::string& threadName) {}
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index 29acd9e09c0..1cbd6b8b263 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -147,6 +147,11 @@ public:
// from ThreadPoolInterface
void startup() override;
void shutdown() override;
+
+ /**
+ * Joins all scheduled tasks. Can also spawn a free thread that ignores maxThread options to
+ * execute pending tasks.
+ */
void join() override;
/**
@@ -158,6 +163,8 @@ public:
*
* May be called multiple times, by multiple threads. May not be called by a task in the thread
* pool.
+ *
+ * Not safe to use when shutdown can be called concurrently.
*/
void waitForIdle();