diff options
-rw-r--r-- | jstests/libs/chunk_manipulation_util.js | 37 | ||||
-rw-r--r-- | jstests/sharding/migration_ignore_interrupts_3.js | 15 | ||||
-rw-r--r-- | jstests/sharding/migration_ignore_interrupts_4.js | 15 | ||||
-rw-r--r-- | jstests/sharding/move_chunk_respects_maxtimems.js | 57 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 83 |
5 files changed, 158 insertions, 49 deletions
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js index bba74a17886..94a9f786b19 100644 --- a/jstests/libs/chunk_manipulation_util.js +++ b/jstests/libs/chunk_manipulation_util.js @@ -144,7 +144,9 @@ function waitForMoveChunkStep(shardConnection, stepNumber) { let op = in_progress.next(); inProgressStr += tojson(op); - if (op.command && op.command.moveChunk) { + // TODO (SERVER-45993): Remove the 4.2 and prior branch once 4.4 becomes last-stable. + if ((op.desc && op.desc === "MoveChunk") || + (op.command && op.command.moveChunk /* required for v4.2 and prior */)) { // Note: moveChunk in join mode will not have the "step" message. So keep on // looking if searchString is not found. if (op.msg && op.msg.startsWith(searchString)) { @@ -253,3 +255,36 @@ function runCommandDuringTransferMods( unpauseMoveChunkAtStep(fromShard, moveChunkStepNames.startedMoveChunk); joinMoveChunk(); } + +function killRunningMoveChunk(admin) { + let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]); + var abortedMigration = false; + let inProgressStr = ''; + let opIdsToKill = {}; + while (inProgressOps.hasNext()) { + let op = inProgressOps.next(); + inProgressStr += tojson(op); + + // For 4.4 binaries and later. + if (op.desc && op.desc === "MoveChunk") { + opIdsToKill["MoveChunk"] = op.opid; + } + // TODO (SERVER-45993): Remove this branch once 4.4 becomes last-stable. + // For 4.2 binaries and prior. + if (op.command && op.command.moveChunk) { + opIdsToKill["moveChunkCommand"] = op.opid; + } + } + + if (opIdsToKill.MoveChunk) { + admin.killOp(opIdsToKill.MoveChunk); + abortedMigration = true; + } else if (opIdsToKill.moveChunkCommand) { + // TODO (SERVER-45993): Remove this branch once 4.4 becomes last-stable. + admin.killOp(opIdsToKill.moveChunkCommand); + abortedMigration = true; + } + + assert.eq( + true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr); +} diff --git a/jstests/sharding/migration_ignore_interrupts_3.js b/jstests/sharding/migration_ignore_interrupts_3.js index 9474643c60d..0b5b6a99b1c 100644 --- a/jstests/sharding/migration_ignore_interrupts_3.js +++ b/jstests/sharding/migration_ignore_interrupts_3.js @@ -55,19 +55,8 @@ var joinMoveChunk = moveChunkParallel( waitForMigrateStep(shard1, migrateStepNames.deletedPriorDataInRange); // Abort migration on donor side, recipient is unaware. -let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]); -var abortedMigration = false; -let inProgressStr = ''; -while (inProgressOps.hasNext()) { - let op = inProgressOps.next(); - inProgressStr += tojson(op); - if (op.command.moveChunk) { - admin.killOp(op.opid); - abortedMigration = true; - } -} -assert.eq( - true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr); +killRunningMoveChunk(admin); + unpauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk); assert.throws(function() { joinMoveChunk(); diff --git a/jstests/sharding/migration_ignore_interrupts_4.js b/jstests/sharding/migration_ignore_interrupts_4.js index 3d4ad25be63..7554b2ec3ae 100644 --- a/jstests/sharding/migration_ignore_interrupts_4.js +++ b/jstests/sharding/migration_ignore_interrupts_4.js @@ -54,19 +54,8 @@ var joinMoveChunk = moveChunkParallel( waitForMigrateStep(shard1, migrateStepNames.cloned); // Abort migration on donor side, recipient is unaware -let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]); -var abortedMigration = false; -let inProgressStr = ''; -while (inProgressOps.hasNext()) { - let op = inProgressOps.next(); - inProgressStr += tojson(op); - if (op.command.moveChunk) { - admin.killOp(op.opid); - abortedMigration = true; - } -} -assert.eq( - true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr); +killRunningMoveChunk(admin); + unpauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk); assert.throws(function() { joinMoveChunk(); diff --git a/jstests/sharding/move_chunk_respects_maxtimems.js b/jstests/sharding/move_chunk_respects_maxtimems.js new file mode 100644 index 00000000000..2ece4520f29 --- /dev/null +++ b/jstests/sharding/move_chunk_respects_maxtimems.js @@ -0,0 +1,57 @@ +/** + * Tests that if maxTimeMS is sent with a moveChunk command, the client thread that issued moveChunk + * will be interrupted when maxTimeMS is exceeded, but moveChunk will eventually succeed in the + * background. + * + * @tags: [multiversion_incompatible] + */ +(function() { + +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load('jstests/libs/parallel_shell_helpers.js'); + +var st = new ShardingTest({shards: 2}); + +const dbName = "test"; +const collName = "foo"; +const ns = dbName + "." + collName; +let testDB = st.s.getDB(dbName); +let testColl = testDB.foo; + +// Create a sharded collection with one chunk on shard0. +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName})); +assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}})); + +// Enable failpoint which will cause moveChunk to hang indefinitely. +let step1Failpoint = configureFailPoint(st.shard0, "moveChunkHangAtStep1"); + +const awaitResult = startParallelShell( + funWithArgs(function(ns, toShardName) { + // Send moveChunk with maxTimeMS. We set it to 15 seconds to ensure that the moveChunk + // command is run and the task to execute the moveChunk logic is launched before maxTimeMS + // expires. That way we can check below that a maxTimeMS timeout won't fail the migration. + assert.commandFailedWithCode( + db.adminCommand({moveChunk: ns, find: {x: 0}, to: toShardName, maxTimeMS: 15000}), + ErrorCodes.MaxTimeMSExpired); + }, ns, st.shard1.shardName), st.s.port); + +awaitResult(); +step1Failpoint.off(); + +jsTestLog("Waiting for moveChunk to succeed in the background"); + +// The moveChunk should eventually succeed in the background even though the client thread was +// interrupted. +assert.soon(() => { + var numChunksOnShard0 = + st.config.chunks.find({"ns": ns, "shard": st.shard0.shardName}).itcount(); + var numChunksOnShard1 = + st.config.chunks.find({"ns": ns, "shard": st.shard1.shardName}).itcount(); + return numChunksOnShard0 == 0 && numChunksOnShard1 == 1; +}); + +st.stop(); +})(); diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 68abc0bbb00..73610bf71ff 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -48,6 +48,7 @@ #include "mongo/s/request_types/migration_secondary_throttle_options.h" #include "mongo/s/request_types/move_chunk_request.h" #include "mongo/util/concurrency/notification.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" @@ -120,7 +121,7 @@ public: bool run(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, - BSONObjBuilder& result) override { + BSONObjBuilder&) override { auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); @@ -134,31 +135,47 @@ public: auto scopedMigration = uassertStatusOK( ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(moveChunkRequest)); - Status status = {ErrorCodes::InternalError, "Uninitialized value"}; - // Check if there is an existing migration running and if so, join it if (scopedMigration.mustExecute()) { - try { - _runImpl(opCtx, moveChunkRequest); - status = Status::OK(); - } catch (const DBException& e) { - status = e.toStatus(); - if (status.code() == ErrorCodes::LockTimeout) { - ShardingStatistics::get(opCtx).countDonorMoveChunkLockTimeout.addAndFetch(1); - } - } catch (const std::exception& e) { - scopedMigration.signalComplete( - {ErrorCodes::InternalError, - str::stream() - << "Severe error occurred while running moveChunk command: " << e.what()}); - throw; - } - - scopedMigration.signalComplete(status); + auto moveChunkComplete = + ExecutorFuture<void>(_getExecutor()) + .then([moveChunkRequest, + scopedMigration = std::move(scopedMigration), + serviceContext = opCtx->getServiceContext()]() mutable { + ThreadClient tc("MoveChunk", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + Status status = {ErrorCodes::InternalError, "Uninitialized value"}; + + try { + _runImpl(opCtx, moveChunkRequest); + status = Status::OK(); + } catch (const DBException& e) { + status = e.toStatus(); + if (status.code() == ErrorCodes::LockTimeout) { + ShardingStatistics::get(opCtx) + .countDonorMoveChunkLockTimeout.addAndFetch(1); + } + } catch (const std::exception& e) { + scopedMigration.signalComplete( + {ErrorCodes::InternalError, + str::stream() + << "Severe error occurred while running moveChunk command: " + << e.what()}); + throw; + } + + scopedMigration.signalComplete(status); + uassertStatusOK(status); + }); + moveChunkComplete.get(opCtx); } else { - status = scopedMigration.waitForCompletion(opCtx); + uassertStatusOK(scopedMigration.waitForCompletion(opCtx)); } - uassertStatusOK(status); if (moveChunkRequest.getWaitForDelete()) { // Ensure we capture the latest opTime in the system, since range deletion happens @@ -244,6 +261,28 @@ private: moveChunkHangAtStep6.pauseWhileSet(); } +private: + // Returns a single-threaded executor to be used to run moveChunk commands. The executor is + // initialized on the first call to this function. Uses a shared_ptr because a shared_ptr is + // required to work with ExecutorFutures. + static std::shared_ptr<ThreadPool> _getExecutor() { + static Mutex mutex = MONGO_MAKE_LATCH("MoveChunkExecutor::_mutex"); + static std::shared_ptr<ThreadPool> executor; + + stdx::lock_guard<Latch> lg(mutex); + if (!executor) { + ThreadPool::Options options; + options.poolName = "MoveChunk"; + options.minThreads = 0; + // We limit the size of the thread pool to a single thread because currently there can + // only be one moveChunk operation on a shard at a time. + options.maxThreads = 1; + executor = std::make_shared<ThreadPool>(std::move(options)); + executor->startup(); + } + + return executor; + } } moveChunkCmd; } // namespace |