summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2020-02-03 11:38:56 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-05 20:35:37 +0000
commit2985a71befe421902b5d15e9567e3b449e65ecdd (patch)
treed21eed42d68b50121b724a04425aae8ed0dfb2a5
parentadc06b21b6f36151c89b44f2de3afc5f96a00752 (diff)
downloadmongo-2985a71befe421902b5d15e9567e3b449e65ecdd.tar.gz
SERVER-45338 Make moveChunk robust to maxTimeMS expiring
create mode 100644 jstests/sharding/move_chunk_respects_maxtimems.js
-rw-r--r--jstests/libs/chunk_manipulation_util.js37
-rw-r--r--jstests/sharding/migration_ignore_interrupts_3.js15
-rw-r--r--jstests/sharding/migration_ignore_interrupts_4.js15
-rw-r--r--jstests/sharding/move_chunk_respects_maxtimems.js57
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp83
5 files changed, 158 insertions, 49 deletions
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js
index bba74a17886..94a9f786b19 100644
--- a/jstests/libs/chunk_manipulation_util.js
+++ b/jstests/libs/chunk_manipulation_util.js
@@ -144,7 +144,9 @@ function waitForMoveChunkStep(shardConnection, stepNumber) {
let op = in_progress.next();
inProgressStr += tojson(op);
- if (op.command && op.command.moveChunk) {
+ // TODO (SERVER-45993): Remove the 4.2 and prior branch once 4.4 becomes last-stable.
+ if ((op.desc && op.desc === "MoveChunk") ||
+ (op.command && op.command.moveChunk /* required for v4.2 and prior */)) {
// Note: moveChunk in join mode will not have the "step" message. So keep on
// looking if searchString is not found.
if (op.msg && op.msg.startsWith(searchString)) {
@@ -253,3 +255,36 @@ function runCommandDuringTransferMods(
unpauseMoveChunkAtStep(fromShard, moveChunkStepNames.startedMoveChunk);
joinMoveChunk();
}
+
+function killRunningMoveChunk(admin) {
+ let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]);
+ var abortedMigration = false;
+ let inProgressStr = '';
+ let opIdsToKill = {};
+ while (inProgressOps.hasNext()) {
+ let op = inProgressOps.next();
+ inProgressStr += tojson(op);
+
+ // For 4.4 binaries and later.
+ if (op.desc && op.desc === "MoveChunk") {
+ opIdsToKill["MoveChunk"] = op.opid;
+ }
+ // TODO (SERVER-45993): Remove this branch once 4.4 becomes last-stable.
+ // For 4.2 binaries and prior.
+ if (op.command && op.command.moveChunk) {
+ opIdsToKill["moveChunkCommand"] = op.opid;
+ }
+ }
+
+ if (opIdsToKill.MoveChunk) {
+ admin.killOp(opIdsToKill.MoveChunk);
+ abortedMigration = true;
+ } else if (opIdsToKill.moveChunkCommand) {
+ // TODO (SERVER-45993): Remove this branch once 4.4 becomes last-stable.
+ admin.killOp(opIdsToKill.moveChunkCommand);
+ abortedMigration = true;
+ }
+
+ assert.eq(
+ true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr);
+}
diff --git a/jstests/sharding/migration_ignore_interrupts_3.js b/jstests/sharding/migration_ignore_interrupts_3.js
index 9474643c60d..0b5b6a99b1c 100644
--- a/jstests/sharding/migration_ignore_interrupts_3.js
+++ b/jstests/sharding/migration_ignore_interrupts_3.js
@@ -55,19 +55,8 @@ var joinMoveChunk = moveChunkParallel(
waitForMigrateStep(shard1, migrateStepNames.deletedPriorDataInRange);
// Abort migration on donor side, recipient is unaware.
-let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]);
-var abortedMigration = false;
-let inProgressStr = '';
-while (inProgressOps.hasNext()) {
- let op = inProgressOps.next();
- inProgressStr += tojson(op);
- if (op.command.moveChunk) {
- admin.killOp(op.opid);
- abortedMigration = true;
- }
-}
-assert.eq(
- true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr);
+killRunningMoveChunk(admin);
+
unpauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk);
assert.throws(function() {
joinMoveChunk();
diff --git a/jstests/sharding/migration_ignore_interrupts_4.js b/jstests/sharding/migration_ignore_interrupts_4.js
index 3d4ad25be63..7554b2ec3ae 100644
--- a/jstests/sharding/migration_ignore_interrupts_4.js
+++ b/jstests/sharding/migration_ignore_interrupts_4.js
@@ -54,19 +54,8 @@ var joinMoveChunk = moveChunkParallel(
waitForMigrateStep(shard1, migrateStepNames.cloned);
// Abort migration on donor side, recipient is unaware
-let inProgressOps = admin.aggregate([{$currentOp: {'allUsers': true}}]);
-var abortedMigration = false;
-let inProgressStr = '';
-while (inProgressOps.hasNext()) {
- let op = inProgressOps.next();
- inProgressStr += tojson(op);
- if (op.command.moveChunk) {
- admin.killOp(op.opid);
- abortedMigration = true;
- }
-}
-assert.eq(
- true, abortedMigration, "Failed to abort migration, current running ops: " + inProgressStr);
+killRunningMoveChunk(admin);
+
unpauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk);
assert.throws(function() {
joinMoveChunk();
diff --git a/jstests/sharding/move_chunk_respects_maxtimems.js b/jstests/sharding/move_chunk_respects_maxtimems.js
new file mode 100644
index 00000000000..2ece4520f29
--- /dev/null
+++ b/jstests/sharding/move_chunk_respects_maxtimems.js
@@ -0,0 +1,57 @@
+/**
+ * Tests that if maxTimeMS is sent with a moveChunk command, the client thread that issued moveChunk
+ * will be interrupted when maxTimeMS is exceeded, but moveChunk will eventually succeed in the
+ * background.
+ *
+ * @tags: [multiversion_incompatible]
+ */
+(function() {
+
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load('jstests/libs/parallel_shell_helpers.js');
+
+var st = new ShardingTest({shards: 2});
+
+const dbName = "test";
+const collName = "foo";
+const ns = dbName + "." + collName;
+let testDB = st.s.getDB(dbName);
+let testColl = testDB.foo;
+
+// Create a sharded collection with one chunk on shard0.
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName}));
+assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
+
+// Enable failpoint which will cause moveChunk to hang indefinitely.
+let step1Failpoint = configureFailPoint(st.shard0, "moveChunkHangAtStep1");
+
+const awaitResult = startParallelShell(
+ funWithArgs(function(ns, toShardName) {
+ // Send moveChunk with maxTimeMS. We set it to 15 seconds to ensure that the moveChunk
+ // command is run and the task to execute the moveChunk logic is launched before maxTimeMS
+ // expires. That way we can check below that a maxTimeMS timeout won't fail the migration.
+ assert.commandFailedWithCode(
+ db.adminCommand({moveChunk: ns, find: {x: 0}, to: toShardName, maxTimeMS: 15000}),
+ ErrorCodes.MaxTimeMSExpired);
+ }, ns, st.shard1.shardName), st.s.port);
+
+awaitResult();
+step1Failpoint.off();
+
+jsTestLog("Waiting for moveChunk to succeed in the background");
+
+// The moveChunk should eventually succeed in the background even though the client thread was
+// interrupted.
+assert.soon(() => {
+ var numChunksOnShard0 =
+ st.config.chunks.find({"ns": ns, "shard": st.shard0.shardName}).itcount();
+ var numChunksOnShard1 =
+ st.config.chunks.find({"ns": ns, "shard": st.shard1.shardName}).itcount();
+ return numChunksOnShard0 == 0 && numChunksOnShard1 == 1;
+});
+
+st.stop();
+})();
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 68abc0bbb00..73610bf71ff 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -48,6 +48,7 @@
#include "mongo/s/request_types/migration_secondary_throttle_options.h"
#include "mongo/s/request_types/move_chunk_request.h"
#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
@@ -120,7 +121,7 @@ public:
bool run(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ BSONObjBuilder&) override {
auto shardingState = ShardingState::get(opCtx);
uassertStatusOK(shardingState->canAcceptShardedCommands());
@@ -134,31 +135,47 @@ public:
auto scopedMigration = uassertStatusOK(
ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(moveChunkRequest));
- Status status = {ErrorCodes::InternalError, "Uninitialized value"};
-
// Check if there is an existing migration running and if so, join it
if (scopedMigration.mustExecute()) {
- try {
- _runImpl(opCtx, moveChunkRequest);
- status = Status::OK();
- } catch (const DBException& e) {
- status = e.toStatus();
- if (status.code() == ErrorCodes::LockTimeout) {
- ShardingStatistics::get(opCtx).countDonorMoveChunkLockTimeout.addAndFetch(1);
- }
- } catch (const std::exception& e) {
- scopedMigration.signalComplete(
- {ErrorCodes::InternalError,
- str::stream()
- << "Severe error occurred while running moveChunk command: " << e.what()});
- throw;
- }
-
- scopedMigration.signalComplete(status);
+ auto moveChunkComplete =
+ ExecutorFuture<void>(_getExecutor())
+ .then([moveChunkRequest,
+ scopedMigration = std::move(scopedMigration),
+ serviceContext = opCtx->getServiceContext()]() mutable {
+ ThreadClient tc("MoveChunk", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+ auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+ Status status = {ErrorCodes::InternalError, "Uninitialized value"};
+
+ try {
+ _runImpl(opCtx, moveChunkRequest);
+ status = Status::OK();
+ } catch (const DBException& e) {
+ status = e.toStatus();
+ if (status.code() == ErrorCodes::LockTimeout) {
+ ShardingStatistics::get(opCtx)
+ .countDonorMoveChunkLockTimeout.addAndFetch(1);
+ }
+ } catch (const std::exception& e) {
+ scopedMigration.signalComplete(
+ {ErrorCodes::InternalError,
+ str::stream()
+ << "Severe error occurred while running moveChunk command: "
+ << e.what()});
+ throw;
+ }
+
+ scopedMigration.signalComplete(status);
+ uassertStatusOK(status);
+ });
+ moveChunkComplete.get(opCtx);
} else {
- status = scopedMigration.waitForCompletion(opCtx);
+ uassertStatusOK(scopedMigration.waitForCompletion(opCtx));
}
- uassertStatusOK(status);
if (moveChunkRequest.getWaitForDelete()) {
// Ensure we capture the latest opTime in the system, since range deletion happens
@@ -244,6 +261,28 @@ private:
moveChunkHangAtStep6.pauseWhileSet();
}
+private:
+ // Returns a single-threaded executor to be used to run moveChunk commands. The executor is
+ // initialized on the first call to this function. Uses a shared_ptr because a shared_ptr is
+ // required to work with ExecutorFutures.
+ static std::shared_ptr<ThreadPool> _getExecutor() {
+ static Mutex mutex = MONGO_MAKE_LATCH("MoveChunkExecutor::_mutex");
+ static std::shared_ptr<ThreadPool> executor;
+
+ stdx::lock_guard<Latch> lg(mutex);
+ if (!executor) {
+ ThreadPool::Options options;
+ options.poolName = "MoveChunk";
+ options.minThreads = 0;
+ // We limit the size of the thread pool to a single thread because currently there can
+ // only be one moveChunk operation on a shard at a time.
+ options.maxThreads = 1;
+ executor = std::make_shared<ThreadPool>(std::move(options));
+ executor->startup();
+ }
+
+ return executor;
+ }
} moveChunkCmd;
} // namespace