diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2020-02-06 17:22:14 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-10 19:18:37 +0000 |
commit | 32770d4dcb8317dc7c4f03fe120fc5682821d137 (patch) | |
tree | 5c98dc89d6d9c84fd3b104516c5bcd4f2420fb4a /src/mongo | |
parent | 46d2b6ceee0f161ca77bc5077c1465c6ca8faeae (diff) | |
download | mongo-32770d4dcb8317dc7c4f03fe120fc5682821d137.tar.gz |
SERVER-45338 Make moveChunk robust to maxTimeMS expiring (fix auth after revert)
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 87 |
1 files changed, 65 insertions, 22 deletions
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 68abc0bbb00..645930188ac 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -48,6 +48,7 @@ #include "mongo/s/request_types/migration_secondary_throttle_options.h" #include "mongo/s/request_types/move_chunk_request.h" #include "mongo/util/concurrency/notification.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" @@ -120,7 +121,7 @@ public: bool run(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, - BSONObjBuilder& result) override { + BSONObjBuilder&) override { auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); @@ -134,31 +135,51 @@ public: auto scopedMigration = uassertStatusOK( ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(moveChunkRequest)); - Status status = {ErrorCodes::InternalError, "Uninitialized value"}; - // Check if there is an existing migration running and if so, join it if (scopedMigration.mustExecute()) { - try { - _runImpl(opCtx, moveChunkRequest); - status = Status::OK(); - } catch (const DBException& e) { - status = e.toStatus(); - if (status.code() == ErrorCodes::LockTimeout) { - ShardingStatistics::get(opCtx).countDonorMoveChunkLockTimeout.addAndFetch(1); - } - } catch (const std::exception& e) { - scopedMigration.signalComplete( - {ErrorCodes::InternalError, - str::stream() - << "Severe error occurred while running moveChunk command: " << e.what()}); - throw; - } - - scopedMigration.signalComplete(status); + auto moveChunkComplete = + ExecutorFuture<void>(_getExecutor()) + .then([moveChunkRequest, + scopedMigration = std::move(scopedMigration), + serviceContext = opCtx->getServiceContext()]() mutable { + ThreadClient tc("MoveChunk", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + // Note: This internal authorization is tied to the lifetime of the client. + AuthorizationSession::get(opCtx->getClient()) + ->grantInternalAuthorization(opCtx->getClient()); + + Status status = {ErrorCodes::InternalError, "Uninitialized value"}; + + try { + _runImpl(opCtx, moveChunkRequest); + status = Status::OK(); + } catch (const DBException& e) { + status = e.toStatus(); + if (status.code() == ErrorCodes::LockTimeout) { + ShardingStatistics::get(opCtx) + .countDonorMoveChunkLockTimeout.addAndFetch(1); + } + } catch (const std::exception& e) { + scopedMigration.signalComplete( + {ErrorCodes::InternalError, + str::stream() + << "Severe error occurred while running moveChunk command: " + << e.what()}); + throw; + } + + scopedMigration.signalComplete(status); + uassertStatusOK(status); + }); + moveChunkComplete.get(opCtx); } else { - status = scopedMigration.waitForCompletion(opCtx); + uassertStatusOK(scopedMigration.waitForCompletion(opCtx)); } - uassertStatusOK(status); if (moveChunkRequest.getWaitForDelete()) { // Ensure we capture the latest opTime in the system, since range deletion happens @@ -244,6 +265,28 @@ private: moveChunkHangAtStep6.pauseWhileSet(); } +private: + // Returns a single-threaded executor to be used to run moveChunk commands. The executor is + // initialized on the first call to this function. Uses a shared_ptr because a shared_ptr is + // required to work with ExecutorFutures. + static std::shared_ptr<ThreadPool> _getExecutor() { + static Mutex mutex = MONGO_MAKE_LATCH("MoveChunkExecutor::_mutex"); + static std::shared_ptr<ThreadPool> executor; + + stdx::lock_guard<Latch> lg(mutex); + if (!executor) { + ThreadPool::Options options; + options.poolName = "MoveChunk"; + options.minThreads = 0; + // We limit the size of the thread pool to a single thread because currently there can + // only be one moveChunk operation on a shard at a time. + options.maxThreads = 1; + executor = std::make_shared<ThreadPool>(std::move(options)); + executor->startup(); + } + + return executor; + } } moveChunkCmd; } // namespace |