diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2020-02-03 11:38:56 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-05 20:35:37 +0000 |
commit | 2985a71befe421902b5d15e9567e3b449e65ecdd (patch) | |
tree | d21eed42d68b50121b724a04425aae8ed0dfb2a5 /src/mongo/db/s/move_chunk_command.cpp | |
parent | adc06b21b6f36151c89b44f2de3afc5f96a00752 (diff) | |
download | mongo-2985a71befe421902b5d15e9567e3b449e65ecdd.tar.gz |
SERVER-45338 Make moveChunk robust to maxTimeMS expiring
create mode 100644 jstests/sharding/move_chunk_respects_maxtimems.js
Diffstat (limited to 'src/mongo/db/s/move_chunk_command.cpp')
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 83 |
1 files changed, 61 insertions, 22 deletions
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 68abc0bbb00..73610bf71ff 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -48,6 +48,7 @@ #include "mongo/s/request_types/migration_secondary_throttle_options.h" #include "mongo/s/request_types/move_chunk_request.h" #include "mongo/util/concurrency/notification.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" @@ -120,7 +121,7 @@ public: bool run(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, - BSONObjBuilder& result) override { + BSONObjBuilder&) override { auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); @@ -134,31 +135,47 @@ public: auto scopedMigration = uassertStatusOK( ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(moveChunkRequest)); - Status status = {ErrorCodes::InternalError, "Uninitialized value"}; - // Check if there is an existing migration running and if so, join it if (scopedMigration.mustExecute()) { - try { - _runImpl(opCtx, moveChunkRequest); - status = Status::OK(); - } catch (const DBException& e) { - status = e.toStatus(); - if (status.code() == ErrorCodes::LockTimeout) { - ShardingStatistics::get(opCtx).countDonorMoveChunkLockTimeout.addAndFetch(1); - } - } catch (const std::exception& e) { - scopedMigration.signalComplete( - {ErrorCodes::InternalError, - str::stream() - << "Severe error occurred while running moveChunk command: " << e.what()}); - throw; - } - - scopedMigration.signalComplete(status); + auto moveChunkComplete = + ExecutorFuture<void>(_getExecutor()) + .then([moveChunkRequest, + scopedMigration = std::move(scopedMigration), + serviceContext = opCtx->getServiceContext()]() mutable { + ThreadClient tc("MoveChunk", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + Status status = {ErrorCodes::InternalError, "Uninitialized value"}; + + try { + _runImpl(opCtx, moveChunkRequest); + status = Status::OK(); + } catch (const DBException& e) { + status = e.toStatus(); + if (status.code() == ErrorCodes::LockTimeout) { + ShardingStatistics::get(opCtx) + .countDonorMoveChunkLockTimeout.addAndFetch(1); + } + } catch (const std::exception& e) { + scopedMigration.signalComplete( + {ErrorCodes::InternalError, + str::stream() + << "Severe error occurred while running moveChunk command: " + << e.what()}); + throw; + } + + scopedMigration.signalComplete(status); + uassertStatusOK(status); + }); + moveChunkComplete.get(opCtx); } else { - status = scopedMigration.waitForCompletion(opCtx); + uassertStatusOK(scopedMigration.waitForCompletion(opCtx)); } - uassertStatusOK(status); if (moveChunkRequest.getWaitForDelete()) { // Ensure we capture the latest opTime in the system, since range deletion happens @@ -244,6 +261,28 @@ private: moveChunkHangAtStep6.pauseWhileSet(); } +private: + // Returns a single-threaded executor to be used to run moveChunk commands. The executor is + // initialized on the first call to this function. Uses a shared_ptr because a shared_ptr is + // required to work with ExecutorFutures. + static std::shared_ptr<ThreadPool> _getExecutor() { + static Mutex mutex = MONGO_MAKE_LATCH("MoveChunkExecutor::_mutex"); + static std::shared_ptr<ThreadPool> executor; + + stdx::lock_guard<Latch> lg(mutex); + if (!executor) { + ThreadPool::Options options; + options.poolName = "MoveChunk"; + options.minThreads = 0; + // We limit the size of the thread pool to a single thread because currently there can + // only be one moveChunk operation on a shard at a time. + options.maxThreads = 1; + executor = std::make_shared<ThreadPool>(std::move(options)); + executor->startup(); + } + + return executor; + } } moveChunkCmd; } // namespace |