summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2020-02-06 17:22:14 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-10 19:18:37 +0000
commit32770d4dcb8317dc7c4f03fe120fc5682821d137 (patch)
tree5c98dc89d6d9c84fd3b104516c5bcd4f2420fb4a /src/mongo/db/s
parent46d2b6ceee0f161ca77bc5077c1465c6ca8faeae (diff)
downloadmongo-32770d4dcb8317dc7c4f03fe120fc5682821d137.tar.gz
SERVER-45338 Make moveChunk robust to maxTimeMS expiring (fix auth after revert)
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp87
1 files changed, 65 insertions, 22 deletions
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 68abc0bbb00..645930188ac 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -48,6 +48,7 @@
#include "mongo/s/request_types/migration_secondary_throttle_options.h"
#include "mongo/s/request_types/move_chunk_request.h"
#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
@@ -120,7 +121,7 @@ public:
bool run(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ BSONObjBuilder&) override {
auto shardingState = ShardingState::get(opCtx);
uassertStatusOK(shardingState->canAcceptShardedCommands());
@@ -134,31 +135,51 @@ public:
auto scopedMigration = uassertStatusOK(
ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(moveChunkRequest));
- Status status = {ErrorCodes::InternalError, "Uninitialized value"};
-
// Check if there is an existing migration running and if so, join it
if (scopedMigration.mustExecute()) {
- try {
- _runImpl(opCtx, moveChunkRequest);
- status = Status::OK();
- } catch (const DBException& e) {
- status = e.toStatus();
- if (status.code() == ErrorCodes::LockTimeout) {
- ShardingStatistics::get(opCtx).countDonorMoveChunkLockTimeout.addAndFetch(1);
- }
- } catch (const std::exception& e) {
- scopedMigration.signalComplete(
- {ErrorCodes::InternalError,
- str::stream()
- << "Severe error occurred while running moveChunk command: " << e.what()});
- throw;
- }
-
- scopedMigration.signalComplete(status);
+ auto moveChunkComplete =
+ ExecutorFuture<void>(_getExecutor())
+ .then([moveChunkRequest,
+ scopedMigration = std::move(scopedMigration),
+ serviceContext = opCtx->getServiceContext()]() mutable {
+ ThreadClient tc("MoveChunk", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+ auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+ // Note: This internal authorization is tied to the lifetime of the client.
+ AuthorizationSession::get(opCtx->getClient())
+ ->grantInternalAuthorization(opCtx->getClient());
+
+ Status status = {ErrorCodes::InternalError, "Uninitialized value"};
+
+ try {
+ _runImpl(opCtx, moveChunkRequest);
+ status = Status::OK();
+ } catch (const DBException& e) {
+ status = e.toStatus();
+ if (status.code() == ErrorCodes::LockTimeout) {
+ ShardingStatistics::get(opCtx)
+ .countDonorMoveChunkLockTimeout.addAndFetch(1);
+ }
+ } catch (const std::exception& e) {
+ scopedMigration.signalComplete(
+ {ErrorCodes::InternalError,
+ str::stream()
+ << "Severe error occurred while running moveChunk command: "
+ << e.what()});
+ throw;
+ }
+
+ scopedMigration.signalComplete(status);
+ uassertStatusOK(status);
+ });
+ moveChunkComplete.get(opCtx);
} else {
- status = scopedMigration.waitForCompletion(opCtx);
+ uassertStatusOK(scopedMigration.waitForCompletion(opCtx));
}
- uassertStatusOK(status);
if (moveChunkRequest.getWaitForDelete()) {
// Ensure we capture the latest opTime in the system, since range deletion happens
@@ -244,6 +265,28 @@ private:
moveChunkHangAtStep6.pauseWhileSet();
}
+private:
+ // Returns a single-threaded executor to be used to run moveChunk commands. The executor is
+ // initialized on the first call to this function. Uses a shared_ptr because a shared_ptr is
+ // required to work with ExecutorFutures.
+ static std::shared_ptr<ThreadPool> _getExecutor() {
+ static Mutex mutex = MONGO_MAKE_LATCH("MoveChunkExecutor::_mutex");
+ static std::shared_ptr<ThreadPool> executor;
+
+ stdx::lock_guard<Latch> lg(mutex);
+ if (!executor) {
+ ThreadPool::Options options;
+ options.poolName = "MoveChunk";
+ options.minThreads = 0;
+ // We limit the size of the thread pool to a single thread because currently there can
+ // only be one moveChunk operation on a shard at a time.
+ options.maxThreads = 1;
+ executor = std::make_shared<ThreadPool>(std::move(options));
+ executor->startup();
+ }
+
+ return executor;
+ }
} moveChunkCmd;
} // namespace