From bb5f249a0d1a69c2e42cda5c255b705a24d58ff9 Mon Sep 17 00:00:00 2001 From: Matthew Saltz Date: Wed, 5 Feb 2020 17:55:37 -0500 Subject: Revert "SERVER-45338 Make moveChunk robust to maxTimeMS expiring" This reverts commit 2985a71befe421902b5d15e9567e3b449e65ecdd. delete mode 100644 jstests/sharding/move_chunk_respects_maxtimems.js --- jstests/libs/chunk_manipulation_util.js | 37 +--------- jstests/sharding/migration_ignore_interrupts_3.js | 15 +++- jstests/sharding/migration_ignore_interrupts_4.js | 15 +++- jstests/sharding/move_chunk_respects_maxtimems.js | 57 ---------------- src/mongo/db/s/move_chunk_command.cpp | 83 ++++++----------------- 5 files changed, 49 insertions(+), 158 deletions(-) delete mode 100644 jstests/sharding/move_chunk_respects_maxtimems.js diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js index 94a9f786b19..bba74a17886 100644 --- a/jstests/libs/chunk_manipulation_util.js +++ b/jstests/libs/chunk_manipulation_util.js @@ -144,9 +144,7 @@ function waitForMoveChunkStep(shardConnection, stepNumber) { let op = in_progress.next(); inProgressStr += tojson(op); - // TODO (SERVER-45993): Remove the 4.2 and prior branch once 4.4 becomes last-stable. - if ((op.desc && op.desc === "MoveChunk") || - (op.command && op.command.moveChunk /* required for v4.2 and prior */)) { + if (op.command && op.command.moveChunk) { // Note: moveChunk in join mode will not have the "step" message. So keep on // looking if searchString is not found. if (op.msg && op.msg.startsWith(searchString)) { @@ -255,36 +253,3 @@ function runCommandDuringTransferMods( unpauseMoveChunkAtStep(fromShard, moveChunkStepNames.startedMoveChunk); joinMoveChunk(); } - -function killRunningMoveChunk(admin) { - let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]); - var abortedMigration = false; - let inProgressStr = ''; - let opIdsToKill = {}; - while (inProgressOps.hasNext()) { - let op = inProgressOps.next(); - inProgressStr += tojson(op); - - // For 4.4 binaries and later. - if (op.desc && op.desc === "MoveChunk") { - opIdsToKill["MoveChunk"] = op.opid; - } - // TODO (SERVER-45993): Remove this branch once 4.4 becomes last-stable. - // For 4.2 binaries and prior. - if (op.command && op.command.moveChunk) { - opIdsToKill["moveChunkCommand"] = op.opid; - } - } - - if (opIdsToKill.MoveChunk) { - admin.killOp(opIdsToKill.MoveChunk); - abortedMigration = true; - } else if (opIdsToKill.moveChunkCommand) { - // TODO (SERVER-45993): Remove this branch once 4.4 becomes last-stable. - admin.killOp(opIdsToKill.moveChunkCommand); - abortedMigration = true; - } - - assert.eq( - true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr); -} diff --git a/jstests/sharding/migration_ignore_interrupts_3.js b/jstests/sharding/migration_ignore_interrupts_3.js index 0b5b6a99b1c..9474643c60d 100644 --- a/jstests/sharding/migration_ignore_interrupts_3.js +++ b/jstests/sharding/migration_ignore_interrupts_3.js @@ -55,8 +55,19 @@ var joinMoveChunk = moveChunkParallel( waitForMigrateStep(shard1, migrateStepNames.deletedPriorDataInRange); // Abort migration on donor side, recipient is unaware. -killRunningMoveChunk(admin); - +let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]); +var abortedMigration = false; +let inProgressStr = ''; +while (inProgressOps.hasNext()) { + let op = inProgressOps.next(); + inProgressStr += tojson(op); + if (op.command.moveChunk) { + admin.killOp(op.opid); + abortedMigration = true; + } +} +assert.eq( + true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr); unpauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk); assert.throws(function() { joinMoveChunk(); diff --git a/jstests/sharding/migration_ignore_interrupts_4.js b/jstests/sharding/migration_ignore_interrupts_4.js index 7554b2ec3ae..3d4ad25be63 100644 --- a/jstests/sharding/migration_ignore_interrupts_4.js +++ b/jstests/sharding/migration_ignore_interrupts_4.js @@ -54,8 +54,19 @@ var joinMoveChunk = moveChunkParallel( waitForMigrateStep(shard1, migrateStepNames.cloned); // Abort migration on donor side, recipient is unaware -killRunningMoveChunk(admin); - +let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]); +var abortedMigration = false; +let inProgressStr = ''; +while (inProgressOps.hasNext()) { + let op = inProgressOps.next(); + inProgressStr += tojson(op); + if (op.command.moveChunk) { + admin.killOp(op.opid); + abortedMigration = true; + } +} +assert.eq( + true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr); unpauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk); assert.throws(function() { joinMoveChunk(); diff --git a/jstests/sharding/move_chunk_respects_maxtimems.js b/jstests/sharding/move_chunk_respects_maxtimems.js deleted file mode 100644 index 2ece4520f29..00000000000 --- a/jstests/sharding/move_chunk_respects_maxtimems.js +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Tests that if maxTimeMS is sent with a moveChunk command, the client thread that issued moveChunk - * will be interrupted when maxTimeMS is exceeded, but moveChunk will eventually succeed in the - * background. - * - * @tags: [multiversion_incompatible] - */ -(function() { - -"use strict"; - -load("jstests/libs/fail_point_util.js"); -load('jstests/libs/parallel_shell_helpers.js'); - -var st = new ShardingTest({shards: 2}); - -const dbName = "test"; -const collName = "foo"; -const ns = dbName + "." + collName; -let testDB = st.s.getDB(dbName); -let testColl = testDB.foo; - -// Create a sharded collection with one chunk on shard0. -assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); -assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName})); -assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}})); - -// Enable failpoint which will cause moveChunk to hang indefinitely. -let step1Failpoint = configureFailPoint(st.shard0, "moveChunkHangAtStep1"); - -const awaitResult = startParallelShell( - funWithArgs(function(ns, toShardName) { - // Send moveChunk with maxTimeMS. We set it to 15 seconds to ensure that the moveChunk - // command is run and the task to execute the moveChunk logic is launched before maxTimeMS - // expires. That way we can check below that a maxTimeMS timeout won't fail the migration. - assert.commandFailedWithCode( - db.adminCommand({moveChunk: ns, find: {x: 0}, to: toShardName, maxTimeMS: 15000}), - ErrorCodes.MaxTimeMSExpired); - }, ns, st.shard1.shardName), st.s.port); - -awaitResult(); -step1Failpoint.off(); - -jsTestLog("Waiting for moveChunk to succeed in the background"); - -// The moveChunk should eventually succeed in the background even though the client thread was -// interrupted. -assert.soon(() => { - var numChunksOnShard0 = - st.config.chunks.find({"ns": ns, "shard": st.shard0.shardName}).itcount(); - var numChunksOnShard1 = - st.config.chunks.find({"ns": ns, "shard": st.shard1.shardName}).itcount(); - return numChunksOnShard0 == 0 && numChunksOnShard1 == 1; -}); - -st.stop(); -})(); diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 73610bf71ff..68abc0bbb00 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -48,7 +48,6 @@ #include "mongo/s/request_types/migration_secondary_throttle_options.h" #include "mongo/s/request_types/move_chunk_request.h" #include "mongo/util/concurrency/notification.h" -#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" @@ -121,7 +120,7 @@ public: bool run(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, - BSONObjBuilder&) override { + BSONObjBuilder& result) override { auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); @@ -135,47 +134,31 @@ public: auto scopedMigration = uassertStatusOK( ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(moveChunkRequest)); + Status status = {ErrorCodes::InternalError, "Uninitialized value"}; + // Check if there is an existing migration running and if so, join it if (scopedMigration.mustExecute()) { - auto moveChunkComplete = - ExecutorFuture(_getExecutor()) - .then([moveChunkRequest, - scopedMigration = std::move(scopedMigration), - serviceContext = opCtx->getServiceContext()]() mutable { - ThreadClient tc("MoveChunk", serviceContext); - { - stdx::lock_guard lk(*tc.get()); - tc->setSystemOperationKillable(lk); - } - auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); - Status status = {ErrorCodes::InternalError, "Uninitialized value"}; - - try { - _runImpl(opCtx, moveChunkRequest); - status = Status::OK(); - } catch (const DBException& e) { - status = e.toStatus(); - if (status.code() == ErrorCodes::LockTimeout) { - ShardingStatistics::get(opCtx) - .countDonorMoveChunkLockTimeout.addAndFetch(1); - } - } catch (const std::exception& e) { - scopedMigration.signalComplete( - {ErrorCodes::InternalError, - str::stream() - << "Severe error occurred while running moveChunk command: " - << e.what()}); - throw; - } - - scopedMigration.signalComplete(status); - uassertStatusOK(status); - }); - moveChunkComplete.get(opCtx); + try { + _runImpl(opCtx, moveChunkRequest); + status = Status::OK(); + } catch (const DBException& e) { + status = e.toStatus(); + if (status.code() == ErrorCodes::LockTimeout) { + ShardingStatistics::get(opCtx).countDonorMoveChunkLockTimeout.addAndFetch(1); + } + } catch (const std::exception& e) { + scopedMigration.signalComplete( + {ErrorCodes::InternalError, + str::stream() + << "Severe error occurred while running moveChunk command: " << e.what()}); + throw; + } + + scopedMigration.signalComplete(status); } else { - uassertStatusOK(scopedMigration.waitForCompletion(opCtx)); + status = scopedMigration.waitForCompletion(opCtx); } + uassertStatusOK(status); if (moveChunkRequest.getWaitForDelete()) { // Ensure we capture the latest opTime in the system, since range deletion happens @@ -261,28 +244,6 @@ private: moveChunkHangAtStep6.pauseWhileSet(); } -private: - // Returns a single-threaded executor to be used to run moveChunk commands. The executor is - // initialized on the first call to this function. Uses a shared_ptr because a shared_ptr is - // required to work with ExecutorFutures. - static std::shared_ptr _getExecutor() { - static Mutex mutex = MONGO_MAKE_LATCH("MoveChunkExecutor::_mutex"); - static std::shared_ptr executor; - - stdx::lock_guard lg(mutex); - if (!executor) { - ThreadPool::Options options; - options.poolName = "MoveChunk"; - options.minThreads = 0; - // We limit the size of the thread pool to a single thread because currently there can - // only be one moveChunk operation on a shard at a time. - options.maxThreads = 1; - executor = std::make_shared(std::move(options)); - executor->startup(); - } - - return executor; - } } moveChunkCmd; } // namespace -- cgit v1.2.1