summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2021-03-25 15:25:49 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-29 14:59:13 +0000
commit5103a49830ec4e13c770cd3e2af4661fd8bf0d3f (patch)
tree519d4cb93bf2970ab96488765ca0c5631b512ead
parent95198a839f730dccf32ce8c39d2243d53354496e (diff)
downloadmongo-5103a49830ec4e13c770cd3e2af4661fd8bf0d3f.tar.gz
SERVER-52770 Add abortReshardCollection command for users to cancel the resharding operation
-rw-r--r--jstests/auth/lib/commands_lib.js29
-rw-r--r--jstests/core/views/views_all_commands.js2
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js2
-rw-r--r--jstests/sharding/database_versioning_all_commands.js1
-rw-r--r--jstests/sharding/libs/resharding_test_fixture.js13
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js2
-rw-r--r--jstests/sharding/resharding_abort_command.js329
-rw-r--r--jstests/sharding/resharding_fails_on_nonempty_stash.js2
-rw-r--r--jstests/sharding/resharding_recipient_broadcasts_abortReason.js2
-rw-r--r--jstests/sharding/safe_secondary_reads_drop_recreate.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js1
-rw-r--r--src/mongo/base/error_codes.yml4
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp175
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_agg_test.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp189
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h103
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp78
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/commands/SConscript1
-rw-r--r--src/mongo/s/commands/cluster_abort_reshard_collection_cmd.cpp105
-rw-r--r--src/mongo/s/request_types/abort_reshard_collection.idl53
26 files changed, 1006 insertions, 104 deletions
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index d812caf2e77..73f84f6b36d 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -195,6 +195,35 @@ var authCommandsLib = {
/************* TEST CASES ****************/
tests: [
+ {
+ testname: "abortReshardCollection",
+ command: {abortReshardCollection: "test.x"},
+ skipUnlessSharded: true,
+ testcases: [
+ {
+ runOnDb: adminDbName,
+ roles: Object.extend({enableSharding: 1}, roles_clusterManager),
+ privileges:
+ [{resource: {db: "test", collection: "x"}, actions: ["reshardCollection"]}],
+ expectFail: true
+ },
+ ]
+ },
+ {
+ testname: "_configsvrAbortReshardCollection",
+ command: {_configsvrAbortReshardCollection: "test.x"},
+ skipSharded: true,
+ testcases: [
+ {
+ runOnDb: adminDbName,
+ roles: {__system: 1},
+ privileges: [{resource: {cluster: true}, actions: ["internal"]}],
+ expectFail: true
+ },
+ {runOnDb: firstDbName, roles: {}},
+ {runOnDb: secondDbName, roles: {}}
+ ]
+ },
{
testname: "abortTxn",
command: {abortTransaction: 1},
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index 5735813c9ff..8d079145d9e 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -74,6 +74,7 @@ let viewsCommandTests = {
_addShard: {skip: isAnInternalCommand},
_cloneCatalogData: {skip: isAnInternalCommand},
_cloneCollectionOptionsFromPrimaryShard: {skip: isAnInternalCommand},
+ _configsvrAbortReshardCollection: {skip: isAnInternalCommand},
_configsvrAddShard: {skip: isAnInternalCommand},
_configsvrAddShardToZone: {skip: isAnInternalCommand},
_configsvrBalancerCollectionStatus: {skip: isAnInternalCommand},
@@ -133,6 +134,7 @@ let viewsCommandTests = {
_shardsvrShardCollection: {skip: isAnInternalCommand},
_transferMods: {skip: isAnInternalCommand},
_vectorClockPersist: {skip: isAnInternalCommand},
+ abortReshardCollection: {skip: isUnrelated},
abortTransaction: {skip: isUnrelated},
addShard: {skip: isUnrelated},
addShardToZone: {skip: isUnrelated},
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index 32a4fccdfaf..957c2369565 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -25,6 +25,7 @@ const isPrimaryOnly = "primary only";
const allCommands = {
_addShard: {skip: isPrimaryOnly},
_cloneCollectionOptionsFromPrimaryShard: {skip: isPrimaryOnly},
+ _configsvrAbortReshardCollection: {skip: isPrimaryOnly},
_configsvrAddShard: {skip: isPrimaryOnly},
_configsvrAddShardToZone: {skip: isPrimaryOnly},
_configsvrBalancerCollectionStatus: {skip: isPrimaryOnly},
@@ -82,6 +83,7 @@ const allCommands = {
_shardsvrRefineCollectionShardKey: {skip: isPrimaryOnly},
_transferMods: {skip: isPrimaryOnly},
_vectorClockPersist: {skip: isPrimaryOnly},
+ abortReshardCollection: {skip: isPrimaryOnly},
abortTransaction: {skip: isPrimaryOnly},
aggregate: {
command: {aggregate: collName, pipeline: [{$match: {}}], cursor: {}},
diff --git a/jstests/sharding/database_versioning_all_commands.js b/jstests/sharding/database_versioning_all_commands.js
index 1e5973d3654..26983182aa0 100644
--- a/jstests/sharding/database_versioning_all_commands.js
+++ b/jstests/sharding/database_versioning_all_commands.js
@@ -236,6 +236,7 @@ let testCases = {
_isSelf: {skip: "executes locally on mongos (not sent to any remote node)"},
_killOperations: {skip: "executes locally on mongos (not sent to any remote node)"},
_mergeAuthzCollections: {skip: "always targets the config server"},
+ abortReshardCollection: {skip: "always targets the config server"},
abortTransaction: {skip: "unversioned and uses special targetting rules"},
addShard: {skip: "not on a user database"},
addShardToZone: {skip: "not on a user database"},
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js
index f536f25435b..c2e7b1aa054 100644
--- a/jstests/sharding/libs/resharding_test_fixture.js
+++ b/jstests/sharding/libs/resharding_test_fixture.js
@@ -257,13 +257,17 @@ var ReshardingTest = class {
* assertions. This function is called in the critical section after a successful
* `reshardCollection` command has shuffled data, but before the response is returned to the
* client.
+ *
+ * @param postDecisionPersistedFn - a function for evaluating addition assertions after
+ * the decision has been persisted, but before the resharding operation finishes and returns
+ * to the client.
*/
withReshardingInBackground({newShardKeyPattern, newChunks},
duringReshardingFn = (tempNs) => {},
{
expectedErrorCode = ErrorCodes.OK,
postCheckConsistencyFn = (tempNs) => {},
- postAbortDecisionPersistedFn = () => {}
+ postDecisionPersistedFn = () => {}
} = {}) {
this._startReshardingInBackgroundAndAllowCommandFailure({newShardKeyPattern, newChunks},
expectedErrorCode);
@@ -276,7 +280,7 @@ var ReshardingTest = class {
this._callFunctionSafely(() => duringReshardingFn(this._tempNs));
this._checkConsistencyAndPostState(expectedErrorCode,
() => postCheckConsistencyFn(this._tempNs),
- () => postAbortDecisionPersistedFn());
+ () => postDecisionPersistedFn());
}
/** @private */
@@ -380,7 +384,7 @@ var ReshardingTest = class {
/** @private */
_checkConsistencyAndPostState(expectedErrorCode,
postCheckConsistencyFn = () => {},
- postAbortDecisionPersistedFn = () => {}) {
+ postDecisionPersistedFn = () => {}) {
let performCorrectnessChecks = true;
if (expectedErrorCode === ErrorCodes.OK) {
this._callFunctionSafely(() => {
@@ -414,13 +418,14 @@ var ReshardingTest = class {
}
this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off();
+ postDecisionPersistedFn();
this._pauseCoordinatorBeforeCompletionFailpoint.off();
});
} else {
this._callFunctionSafely(() => {
this._pauseCoordinatorInSteadyStateFailpoint.off();
this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off();
- postAbortDecisionPersistedFn();
+ postDecisionPersistedFn();
this._pauseCoordinatorBeforeCompletionFailpoint.off();
});
}
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index dd747984da8..13a03235913 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -74,6 +74,7 @@ let validateTestCase = function(test) {
let testCases = {
_addShard: {skip: "internal command"},
_cloneCollectionOptionsFromPrimaryShard: {skip: "internal command"},
+ _configsvrAbortReshardCollection: {skip: "internal command"},
_configsvrAddShard: {skip: "internal command"},
_configsvrAddShardToZone: {skip: "internal command"},
_configsvrBalancerCollectionStatus: {skip: "internal command"},
@@ -132,6 +133,7 @@ let testCases = {
_shardsvrShardCollection: {skip: "internal command"},
_transferMods: {skip: "internal command"},
_vectorClockPersist: {skip: "internal command"},
+ abortReshardCollection: {skip: "does not accept read or write concern"},
abortTransaction: {
setUp: function(conn) {
assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}}));
diff --git a/jstests/sharding/resharding_abort_command.js b/jstests/sharding/resharding_abort_command.js
new file mode 100644
index 00000000000..d103ca89668
--- /dev/null
+++ b/jstests/sharding/resharding_abort_command.js
@@ -0,0 +1,329 @@
+/**
+ * Test to make sure that the abort command interrupts a resharding operation that has not yet
+ * persisted a decision.
+ *
+ * @tags: [requires_fcv_49, uses_atclustertime]
+ */
+(function() {
+"use strict";
+load("jstests/libs/discover_topology.js");
+load("jstests/libs/parallelTester.js");
+load("jstests/sharding/libs/resharding_test_fixture.js");
+load("jstests/sharding/libs/resharding_test_util.js");
+
+const originalCollectionNs = "reshardingDb.coll";
+const enterAbortFailpointName = "reshardingPauseCoordinatorBeforeStartingErrorFlow";
+
+const nodeTypeEnum = {
+ COORDINATOR: 1,
+ DONOR: 2,
+ RECIPIENT: 3,
+ NO_EXTRA_FAILPOINTS_SENTINEL: 4
+};
+
+const abortLocationEnum = {
+ BEFORE_STEADY_STATE: 1,
+ BEFORE_DECISION_PERSISTED: 2,
+ AFTER_DECISION_PERSISTED: 3
+};
+
+let getConnStringsFromNodeType = (nodeType, reshardingTest, topology) => {
+ let connStrings = [];
+ if (nodeType == nodeTypeEnum.COORDINATOR) {
+ connStrings.push(topology.configsvr.nodes[0]);
+ } else if (nodeType == nodeTypeEnum.DONOR) {
+ for (let donor of reshardingTest.donorShardNames) {
+ connStrings.push(topology.shards[donor].primary);
+ }
+ } else if (nodeType == nodeTypeEnum.RECIPIENT) {
+ for (let recipient of reshardingTest.recipientShardNames) {
+ connStrings.push(topology.shards[recipient].primary);
+ }
+ } else if (nodeType == nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
+ } else {
+ throw 'unsupported node type in resharding abort test';
+ }
+
+ return connStrings;
+};
+
+let getMongosFromConnStrings = (connStrings) => {
+ let mongos = [];
+ for (let conn of connStrings) {
+ mongos.push(new Mongo(conn));
+ }
+ return mongos;
+};
+
+let generateFailpoints =
+ (failpointName, failpointNodeType, reshardingTest, toplogy, failpointMode = "alwaysOn") => {
+ const failpointTargetConnStrings =
+ getConnStringsFromNodeType(failpointNodeType, reshardingTest, toplogy);
+ const failpointHosts = getMongosFromConnStrings(failpointTargetConnStrings);
+
+ let failpoints = [];
+ for (let host of failpointHosts) {
+ failpoints.push(configureFailPoint(host, failpointName));
+ }
+
+ return failpoints;
+ };
+
+let generateAbortThread = (mongosConnString, ns, expectedErrorCodes) => {
+ return new Thread((mongosConnString, ns, expectedErrorCodes) => {
+ const mongos = new Mongo(mongosConnString);
+ if (expectedErrorCodes == ErrorCodes.OK) {
+ assert.commandWorked(mongos.adminCommand({abortReshardCollection: ns}));
+ } else {
+ assert.commandFailedWithCode(mongos.adminCommand({abortReshardCollection: ns}),
+ expectedErrorCodes);
+ }
+ }, mongosConnString, ns, expectedErrorCodes);
+};
+
+let triggerAbortAndCoordinateFailpoints = (failpointName,
+ failpointNodeType,
+ reshardingTest,
+ topology,
+ mongos,
+ configsvr,
+ abortThread,
+ failpoints,
+ executeBeforeWaitingOnFailpointsFn,
+ executeAfterWaitingOnFailpointsFn,
+ executeAfterAbortingFn) => {
+ if (executeBeforeWaitingOnFailpointsFn) {
+ jsTestLog(`Executing the before-waiting-on-failpoint function`);
+ executeBeforeWaitingOnFailpointsFn(mongos, originalCollectionNs);
+ }
+
+ if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
+ jsTestLog(`Wait for the failpoint ${failpointName} to be reached on all applicable nodes`);
+ for (let failpoint of failpoints) {
+ failpoint.wait();
+ }
+ }
+
+ if (executeAfterWaitingOnFailpointsFn) {
+ jsTestLog(`Executing the after-waiting-on-failpoint function`);
+ executeAfterWaitingOnFailpointsFn(mongos, originalCollectionNs);
+ }
+
+ jsTestLog(`Wait for the coordinator to recognize that it's been aborted`);
+
+ const enterAbortFailpoint = configureFailPoint(configsvr, enterAbortFailpointName);
+ abortThread.start();
+ enterAbortFailpoint.wait();
+
+ if (executeAfterAbortingFn) {
+ jsTestLog(`Executing the after-aborting function`);
+ executeAfterAbortingFn(mongos, originalCollectionNs);
+ }
+
+ enterAbortFailpoint.off();
+
+ if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
+ jsTestLog(`Turn off the failpoint ${
+ failpointName} to allow both the abort and the resharding operation to complete`);
+ for (let failpoint of failpoints) {
+ failpoint.off();
+ }
+ }
+};
+
+let triggerPostDecisionPersistedAbort = (mongos, abortThread) => {
+ assert.soon(() => {
+ // It's possible that after the decision has been persisted, the
+ // coordinator document could be in either of the two specified states,
+ // or will have been completely deleted. Await any of these conditions
+ // in order to test the abort command's inability to abort after a
+ // persisted decision.
+ const coordinatorDoc =
+ mongos.getCollection('config.reshardingOperations').findOne({ns: originalCollectionNs});
+ return coordinatorDoc == null ||
+ (coordinatorDoc.state === "decision-persisted" || coordinatorDoc.state === "done");
+ });
+
+ abortThread.start();
+};
+
+const runAbortWithFailpoint = (failpointName, failpointNodeType, abortLocation, {
+ executeAtStartOfReshardingFn = null,
+ executeBeforeWaitingOnFailpointsFn = null,
+ executeAfterWaitingOnFailpointsFn = null,
+ executeAfterAbortingFn = null,
+} = {}) => {
+ const reshardingTest =
+ new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true});
+ reshardingTest.setup();
+
+ const donorShardNames = reshardingTest.donorShardNames;
+ const recipientShardNames = reshardingTest.recipientShardNames;
+
+ const sourceCollection = reshardingTest.createShardedCollection({
+ ns: "reshardingDb.coll",
+ shardKeyPattern: {oldKey: 1},
+ chunks: [
+ {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
+ {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]},
+ ],
+ });
+
+ const mongos = sourceCollection.getMongo();
+ const topology = DiscoverTopology.findConnectedNodes(mongos);
+ const configsvr = new Mongo(topology.configsvr.nodes[0]);
+
+ let expectedAbortErrorCodes = ErrorCodes.OK;
+ let expectedReshardingErrorCode = ErrorCodes.ReshardCollectionAborted;
+
+ // If the abort is going to happen after the decision is persisted, it's expected that the
+ // resharding operation will have finished without error, and that the abort itself will
+ // error.
+ if (abortLocation == abortLocationEnum.AFTER_DECISION_PERSISTED) {
+ expectedAbortErrorCodes =
+ [ErrorCodes.ReshardCollectionCommitted, ErrorCodes.NoSuchReshardCollection];
+ expectedReshardingErrorCode = ErrorCodes.OK;
+ }
+
+ const abortThread = generateAbortThread(
+ topology.mongos.nodes[0], originalCollectionNs, expectedAbortErrorCodes);
+
+ let failpoints = [];
+ if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
+ failpoints = generateFailpoints(failpointName, failpointNodeType, reshardingTest, topology);
+ }
+
+ reshardingTest.withReshardingInBackground(
+ {
+ newShardKeyPattern: {newKey: 1},
+ newChunks: [
+ {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]},
+ {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]},
+ ],
+ },
+ () => {
+ if (executeAtStartOfReshardingFn) {
+ jsTestLog(`Executing the start-of-resharding fn`);
+ executeAtStartOfReshardingFn(
+ reshardingTest, topology, mongos, originalCollectionNs);
+ }
+
+ if (abortLocation == abortLocationEnum.BEFORE_STEADY_STATE) {
+ triggerAbortAndCoordinateFailpoints(failpointName,
+ failpointNodeType,
+ reshardingTest,
+ topology,
+ mongos,
+ configsvr,
+ abortThread,
+ failpoints,
+ executeBeforeWaitingOnFailpointsFn,
+ executeAfterWaitingOnFailpointsFn,
+ executeAfterAbortingFn);
+ }
+ },
+ {
+ expectedErrorCode: expectedReshardingErrorCode,
+ postCheckConsistencyFn: () => {
+ if (abortLocation == abortLocationEnum.BEFORE_DECISION_PERSISTED) {
+ triggerAbortAndCoordinateFailpoints(failpointName,
+ failpointNodeType,
+ reshardingTest,
+ topology,
+ mongos,
+ configsvr,
+ abortThread,
+ failpoints,
+ executeBeforeWaitingOnFailpointsFn,
+ executeAfterWaitingOnFailpointsFn,
+ executeAfterAbortingFn);
+ }
+ },
+ postDecisionPersistedFn: () => {
+ if (abortLocation == abortLocationEnum.AFTER_DECISION_PERSISTED) {
+ triggerPostDecisionPersistedAbort(mongos, abortThread);
+ }
+ }
+ });
+ reshardingTest.teardown();
+
+ abortThread.join();
+};
+
+runAbortWithFailpoint("reshardingPauseRecipientBeforeCloning",
+ nodeTypeEnum.RECIPIENT,
+ abortLocationEnum.BEFORE_STEADY_STATE);
+runAbortWithFailpoint("reshardingPauseRecipientDuringCloning",
+ nodeTypeEnum.RECIPIENT,
+ abortLocationEnum.BEFORE_STEADY_STATE);
+
+runAbortWithFailpoint("reshardingPauseRecipientDuringOplogApplication",
+ nodeTypeEnum.RECIPIENT,
+ abortLocationEnum.BEFORE_STEADY_STATE,
+ {
+ executeAfterWaitingOnFailpointsFn: (mongos, ns) => {
+ assert.commandWorked(mongos.getCollection(ns).insert([
+ {_id: 0, oldKey: -10, newKey: -10},
+ {_id: 1, oldKey: 10, newKey: -10},
+ {_id: 2, oldKey: -10, newKey: 10},
+ {_id: 3, oldKey: 10, newKey: 10},
+ ]));
+ },
+ });
+
+// Rely on the resharding_test_fixture's built-in failpoint that hangs before switching to
+// the blocking writes state.
+runAbortWithFailpoint(
+ null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.BEFORE_STEADY_STATE, {
+ executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => {
+ assert.soon(() => {
+ const coordinatorDoc =
+ mongos.getCollection('config.reshardingOperations').findOne({ns: ns});
+ if (coordinatorDoc == null) {
+ return false;
+ }
+
+ jsTestLog(tojson(coordinatorDoc));
+
+ for (const shardEntry of coordinatorDoc.recipientShards) {
+ if (shardEntry.mutableState.state !== "steady-state") {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ },
+ });
+
+runAbortWithFailpoint(
+ null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.AFTER_DECISION_PERSISTED);
+
+// TODO SERVER-55506 Uncomment and fix this case after the _flushReshardingStateChange command has
+// been emplaced.
+/*
+runAbortWithFailpoint("reshardingDonorPausesAfterEmplacingCriticalSection", nodeTypeEnum.DONOR,
+abortLocationEnum.BEFORE_DECISION_PERSISTED,
+{
+ executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => {
+ assert.soon(() => {
+ const coordinatorDoc = mongos.getCollection('config.reshardingOperations').findOne({ns:
+ns}); return coordinatorDoc != null && coordinatorDoc.state === "applying";
+ });
+
+ generateFailpoints("reshardingPauseRecipientDuringOplogApplication", nodeTypeEnum.RECIPIENT,
+reshardingTest, topology);
+
+ assert.commandWorked(mongos.getCollection(ns).insert([
+ {_id: 0, oldKey: -10, newKey: -10},
+ {_id: 1, oldKey: 10, newKey: -10},
+ {_id: 2, oldKey: -10, newKey: 10},
+ {_id: 3, oldKey: 10, newKey: 10},
+ ]));
+ },
+ executeAfterAbortingFn: (reshardingTest, topology, mongos, ns) => {
+ generateFailpoints("reshardingPauseRecipientDuringOplogApplication", nodeTypeEnum.RECIPIENT,
+reshardingTest, topology, "off");
+ }
+});*/
+})();
diff --git a/jstests/sharding/resharding_fails_on_nonempty_stash.js b/jstests/sharding/resharding_fails_on_nonempty_stash.js
index 8af6db926f9..32f45168db3 100644
--- a/jstests/sharding/resharding_fails_on_nonempty_stash.js
+++ b/jstests/sharding/resharding_fails_on_nonempty_stash.js
@@ -60,7 +60,7 @@ reshardingTest.withReshardingInBackground(
},
{
expectedErrorCode: 5356800,
- postAbortDecisionPersistedFn: () => {
+ postDecisionPersistedFn: () => {
ReshardingTestUtil.assertRecipientAbortsLocally(
recipient1Conn, recipient1Conn.shardName, "reshardingDb.coll", 5356800);
}
diff --git a/jstests/sharding/resharding_recipient_broadcasts_abortReason.js b/jstests/sharding/resharding_recipient_broadcasts_abortReason.js
index b1631b1a62e..4c5b8298665 100644
--- a/jstests/sharding/resharding_recipient_broadcasts_abortReason.js
+++ b/jstests/sharding/resharding_recipient_broadcasts_abortReason.js
@@ -61,7 +61,7 @@ reshardingTest.withReshardingInBackground(
},
{
expectedErrorCode: ErrorCodes.InternalError,
- postAbortDecisionPersistedFn: () => {
+ postDecisionPersistedFn: () => {
ReshardingTestUtil.assertAllParticipantsReportAbortToCoordinator(
configsvr, inputCollection.getFullName(), ErrorCodes.InternalError);
}
diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js
index deb9f8cb965..5851502f5a2 100644
--- a/jstests/sharding/safe_secondary_reads_drop_recreate.js
+++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js
@@ -71,6 +71,7 @@ let testCases = {
_recvChunkStart: {skip: "primary only"},
_recvChunkStatus: {skip: "primary only"},
_transferMods: {skip: "primary only"},
+ abortReshardCollection: {skip: "primary only"},
abortTransaction: {skip: "primary only"},
addShard: {skip: "primary only"},
addShardToZone: {skip: "primary only"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index 22953b46ecc..c6a41a6d463 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -83,6 +83,7 @@ let testCases = {
_recvChunkStart: {skip: "primary only"},
_recvChunkStatus: {skip: "primary only"},
_transferMods: {skip: "primary only"},
+ abortReshardCollection: {skip: "primary only"},
abortTransaction: {skip: "primary only"},
addShard: {skip: "primary only"},
addShardToZone: {skip: "primary only"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index 3ed86ba2893..aea60e6afc0 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -74,6 +74,7 @@ let testCases = {
_recvChunkStart: {skip: "primary only"},
_recvChunkStatus: {skip: "primary only"},
_transferMods: {skip: "primary only"},
+ abortReshardCollection: {skip: "primary only"},
abortTransaction: {skip: "primary only"},
addShard: {skip: "primary only"},
addShardToZone: {skip: "primary only"},
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 63f44f460c1..ade1b613d3f 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -420,6 +420,10 @@ error_codes:
- {code: 338, name: ReshardCollectionInProgress}
+ - {code: 339, name: NoSuchReshardCollection}
+ - {code: 340, name: ReshardCollectionCommitted}
+ - {code: 341, name: ReshardCollectionAborted}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 6667851b1d0..3987a737a7b 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -310,6 +310,7 @@ env.Library(
'cleanup_orphaned_cmd.cpp',
'clone_catalog_data_command.cpp',
'clone_collection_options_from_primary_shard_cmd.cpp',
+ 'config/configsvr_abort_reshard_collection_command.cpp',
'config/configsvr_add_shard_command.cpp',
'config/configsvr_add_shard_to_zone_command.cpp',
'config/configsvr_balancer_collection_status_command.cpp',
diff --git a/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp b/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp
new file mode 100644
index 00000000000..00f85ce51e6
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp
@@ -0,0 +1,175 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/s/resharding/resharding_coordinator_service.h"
+#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/request_types/abort_reshard_collection_gen.h"
+
+namespace mongo {
+namespace {
+
+UUID retrieveReshardingUUID(OperationContext* opCtx, const NamespaceString& ns) {
+ repl::ReadConcernArgs::get(opCtx) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto collEntry = catalogClient->getCollection(opCtx, ns);
+
+ uassert(ErrorCodes::NoSuchReshardCollection,
+ "Could not find resharding-related metadata that matches the given namespace",
+ collEntry.getReshardingFields());
+
+ return collEntry.getReshardingFields()->getReshardingUUID();
+}
+
+void assertExistsReshardingDocument(OperationContext* opCtx, UUID reshardingUUID) {
+ PersistentTaskStore<ReshardingCoordinatorDocument> store(
+ NamespaceString::kConfigReshardingOperationsNamespace);
+
+ boost::optional<ReshardingCoordinatorDocument> docOptional;
+ store.forEach(opCtx,
+ QUERY(ReshardingCoordinatorDocument::kReshardingUUIDFieldName << reshardingUUID),
+ [&](const ReshardingCoordinatorDocument& doc) {
+ docOptional.emplace(doc);
+ return false;
+ });
+ uassert(ErrorCodes::NoSuchReshardCollection,
+ "Could not find resharding document to abort resharding operation",
+ !!docOptional);
+}
+
+auto assertGetReshardingMachine(OperationContext* opCtx, UUID reshardingUUID) {
+ auto machine = resharding::tryGetReshardingStateMachine<
+ ReshardingCoordinatorService,
+ ReshardingCoordinatorService::ReshardingCoordinator,
+ ReshardingCoordinatorDocument>(opCtx, reshardingUUID);
+
+ uassert(ErrorCodes::NoSuchReshardCollection,
+ "Could not find in-progress resharding operation to abort",
+ machine);
+ return *machine;
+}
+
+class ConfigsvrAbortReshardCollectionCommand final
+ : public TypedCommand<ConfigsvrAbortReshardCollectionCommand> {
+public:
+ using Request = ConfigsvrAbortReshardCollection;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+
+ opCtx->setAlwaysInterruptAtStepDownOrUp();
+
+ uassert(ErrorCodes::IllegalOperation,
+ "_configsvrAbortReshardCollection can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+ uassert(ErrorCodes::InvalidOptions,
+ "_configsvrAbortReshardCollection must be called with majority writeConcern",
+ opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority);
+
+ const auto reshardingUUID = retrieveReshardingUUID(opCtx, ns());
+
+ LOGV2(5403501,
+ "Aborting resharding operation",
+ "namespace"_attr = ns(),
+ "reshardingUUID"_attr = reshardingUUID);
+
+ assertExistsReshardingDocument(opCtx, reshardingUUID);
+
+ auto machine = assertGetReshardingMachine(opCtx, reshardingUUID);
+ auto future = machine->getCompletionFuture();
+ machine->abort();
+
+ auto completionStatus = future.getNoThrow(opCtx);
+
+ // Receiving this error from the state machine indicates that resharding was
+ // successfully aborted, and that the abort command should return OK.
+ if (completionStatus == ErrorCodes::ReshardCollectionAborted) {
+ return;
+ }
+
+ // Receiving an OK status from the machine after attempting to abort the resharding
+ // operation indicates that the resharding operation ignored the abort attempt. The
+ // resharding operation only ignores the abort attempt if the decision was already
+ // persisted, implying that the resharding operation was in an unabortable state.
+ uassert(ErrorCodes::ReshardCollectionCommitted,
+ "Can't abort resharding operation after the decision has been persisted",
+ completionStatus != Status::OK());
+
+ // Return any other status to the client.
+ uassertStatusOK(completionStatus);
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return request().getCommandParameter();
+ }
+
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+
+ std::string help() const override {
+ return "Internal command, which is exported by the sharding config server. Do not call "
+ "directly. Aborts any in-progress resharding operations for this collection.";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+} configsvrAbortReshardCollectionCmd;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index 8e8829fcb38..2a7063b18eb 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -143,7 +143,8 @@ public:
coordinatorDoc.setNumInitialChunks(request().getNumInitialChunks());
auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
- auto service = registry->lookupServiceByName(kReshardingCoordinatorServiceName);
+ auto service =
+ registry->lookupServiceByName(ReshardingCoordinatorService::kServiceName);
auto instance = ReshardingCoordinatorService::ReshardingCoordinator::getOrCreate(
opCtx, service, coordinatorDoc.toBSON());
diff --git a/src/mongo/db/s/resharding/resharding_agg_test.cpp b/src/mongo/db/s/resharding/resharding_agg_test.cpp
index 7a4e7720bca..3196dacd64e 100644
--- a/src/mongo/db/s/resharding/resharding_agg_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_agg_test.cpp
@@ -40,6 +40,8 @@
namespace mongo {
namespace {
+using namespace fmt::literals;
+
/**
* Mock interface to allow specifiying mock results for the lookup pipeline.
*/
@@ -279,7 +281,8 @@ protected:
}
const NamespaceString _remoteOplogNss{"local.oplog.rs"};
- const NamespaceString _localOplogBufferNss{"config.localReshardingOplogBuffer.xxx.yyy"};
+ const NamespaceString _localOplogBufferNss{"{}.{}xxx.yyy"_format(
+ NamespaceString::kConfigDb, NamespaceString::kReshardingLocalOplogBufferPrefix)};
const NamespaceString _crudNss{"test.foo"};
// Use a constant value so unittests can store oplog entries as extended json strings in code.
const UUID _reshardingCollUUID =
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 3ef29568df1..e76a0f93d03 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -66,7 +66,9 @@ using namespace fmt::literals;
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCloning);
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorInSteadyState);
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeDecisionPersisted);
-MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCompletion)
+MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCompletion);
+MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeStartingErrorFlow);
+MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforePersistingStateTransition);
const std::string kReshardingCoordinatorActiveIndexName = "ReshardingCoordinatorActiveIndex";
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
@@ -808,7 +810,7 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingCoordinatorService::constructInstance(
BSONObj initialState) const {
- return std::make_shared<ReshardingCoordinator>(std::move(initialState));
+ return std::make_shared<ReshardingCoordinator>(this, std::move(initialState));
}
ExecutorFuture<void> ReshardingCoordinatorService::_rebuildService(
@@ -836,9 +838,11 @@ ExecutorFuture<void> ReshardingCoordinatorService::_rebuildService(
.on(**executor, CancellationToken::uncancelable());
}
-ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const BSONObj& state)
+ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(
+ const ReshardingCoordinatorService* coordinatorService, const BSONObj& state)
: PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(),
_id(state["_id"].wrap().getOwned()),
+ _coordinatorService(coordinatorService),
_coordinatorDoc(ReshardingCoordinatorDocument::parse(
IDLParserErrorContext("ReshardingCoordinatorStateDoc"), state)) {
_reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>();
@@ -914,9 +918,9 @@ BSONObj createFlushReshardingStateChangeCommand(const NamespaceString& nss) {
BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
}
-SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
- std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const CancellationToken& token) noexcept {
+ExecutorFuture<ReshardingCoordinatorDocument>
+ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDecision(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept {
return ExecutorFuture<void>(**executor)
.then([this, executor] { _insertCoordDocAndChangeOrigCollEntry(); })
.then([this, executor] { _calculateParticipantsAndChunksThenWriteToDisk(); })
@@ -926,18 +930,45 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
.then([this, executor] { _tellAllRecipientsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); })
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
- .then([this, executor, token] {
+ .then([this, executor] {
// TODO SERVER-53916 to verify that the following runs only after the last recipient
// shard reports to the coordinator that it has entered "steady-state".
- return waitForMinimumOperationDuration(**executor, token);
+ return waitForMinimumOperationDuration(**executor, _ctHolder->getAbortToken());
})
.then([this, executor] { return _awaitAllRecipientsFinishedApplying(executor); })
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); })
- .then([this](const ReshardingCoordinatorDocument& updatedCoordinatorDoc) {
+ .onError([this, self = shared_from_this(), executor](
+ Status status) -> StatusWith<ReshardingCoordinatorDocument> {
+ {
+ auto opCtx = cc().makeOperationContext();
+ reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get());
+ }
+
+ if (_ctHolder->isSteppingOrShuttingDown()) {
+ return status;
+ }
+
+ // If the abort cancellation token was triggered, implying that a user ran the abort
+ // command, override with the abort error code.
+ if (_ctHolder->isAborted()) {
+ status = {ErrorCodes::ReshardCollectionAborted, status.reason()};
+ }
+
+ _onAbort(executor, status);
+ return status;
+ });
+}
+
+ExecutorFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishReshardOperation(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const ReshardingCoordinatorDocument& updatedCoordinatorDoc) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then([this, self = shared_from_this(), executor, updatedCoordinatorDoc] {
return _persistDecision(updatedCoordinatorDoc);
})
- .then([this, executor] {
+ .then([this, self = shared_from_this(), executor] {
_tellAllParticipantsToRefresh(_coordinatorDoc.getSourceNss(), executor);
})
.then([this, self = shared_from_this(), executor] {
@@ -946,39 +977,29 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
// keep 'this' pointer alive for the remaining callbacks.
return _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(executor);
})
- .onError([this, self = shared_from_this(), token, executor](Status status) {
+ .onError([this, self = shared_from_this(), executor](Status status) {
{
- stdx::lock_guard<Latch> lg(_mutex);
- if (_completionPromise.getFuture().isReady()) {
- // interrupt() was called before we got here.
- return status;
- }
+ auto opCtx = cc().makeOperationContext();
+ reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get());
}
- auto nss = _coordinatorDoc.getSourceNss();
-
- LOGV2(4956902,
- "Resharding failed",
- "namespace"_attr = nss.ns(),
- "newShardKeyPattern"_attr = _coordinatorDoc.getReshardingKey(),
- "error"_attr = status);
-
- if (_coordinatorDoc.getState() == CoordinatorStateEnum::kUnused) {
+ if (_ctHolder->isSteppingOrShuttingDown()) {
return status;
}
- _updateCoordinatorDocStateAndCatalogEntries(
- CoordinatorStateEnum::kError, _coordinatorDoc, boost::none, boost::none, status);
-
- _tellAllParticipantsToRefresh(nss, executor);
-
- // Wait for all participants to acknowledge the operation reached an unrecoverable
- // error.
- future_util::withCancellation(
- _reshardingCoordinatorObserver->awaitAllParticipantsDoneAborting(), token)
- .get();
-
- return status;
+ LOGV2_FATAL(5277000,
+ "Unrecoverable error past the point resharding was guaranteed to succeed",
+ "error"_attr = redact(status));
+ });
+}
+SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& stepdownToken) noexcept {
+ _ctHolder = std::make_unique<CoordinatorCancellationTokenHolder>(stepdownToken);
+ return _runUntilReadyToPersistDecision(executor)
+ .then([this, self = shared_from_this(), executor](
+ const ReshardingCoordinatorDocument& updatedCoordinatorDoc) {
+ return _persistDecisionAndFinishReshardOperation(executor, updatedCoordinatorDoc);
})
.onCompletion([this, self = shared_from_this()](Status status) {
// TODO SERVER-53914 depending on where we load metrics at the start of the operation,
@@ -990,31 +1011,64 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
}
auto opCtx = cc().makeOperationContext();
- reshardingPauseCoordinatorBeforeCompletion.pauseWhileSet(opCtx.get());
+ reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled(
+ opCtx.get(), _ctHolder->getStepdownToken());
stdx::lock_guard<Latch> lg(_mutex);
- if (_completionPromise.getFuture().isReady()) {
- // interrupt() was called before we got here.
- return;
- }
-
if (status.isOK()) {
_completionPromise.emplaceValue();
} else {
_completionPromise.setError(status);
}
+
+ return status;
+ })
+ .thenRunOn(_coordinatorService->getInstanceCleanupExecutor())
+ .onCompletion([this, self = shared_from_this()](Status status) {
+ // On stepdown or shutdown, the _scopedExecutor may have already been shut down.
+ // Schedule cleanup work on the parent executor.
+ if (!status.isOK()) {
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_completionPromise.getFuture().isReady()) {
+ _completionPromise.setError(status);
+ }
+ }
+ _reshardingCoordinatorObserver->interrupt(status);
+ }
})
.semi();
}
-void ReshardingCoordinatorService::ReshardingCoordinator::interrupt(Status status) {
- // Resolve any unresolved promises to avoid hanging.
- _reshardingCoordinatorObserver->interrupt(status);
+void ReshardingCoordinatorService::ReshardingCoordinator::_onAbort(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const Status& status) {
+ auto nss = _coordinatorDoc.getSourceNss();
- stdx::lock_guard<Latch> lg(_mutex);
- if (!_completionPromise.getFuture().isReady()) {
- _completionPromise.setError(status);
+ LOGV2(4956902,
+ "Resharding failed",
+ "namespace"_attr = nss.ns(),
+ "newShardKeyPattern"_attr = _coordinatorDoc.getReshardingKey(),
+ "error"_attr = status);
+
+ if (_coordinatorDoc.getState() == CoordinatorStateEnum::kUnused) {
+ return;
}
+
+ _updateCoordinatorDocStateAndCatalogEntries(
+ CoordinatorStateEnum::kError, _coordinatorDoc, boost::none, boost::none, status);
+
+ _tellAllParticipantsToRefresh(nss, executor);
+
+ // Wait for all participants to acknowledge the operation reached an unrecoverable
+ // error.
+ future_util::withCancellation(
+ _reshardingCoordinatorObserver->awaitAllParticipantsDoneAborting(),
+ _ctHolder->getStepdownToken())
+ .get();
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::abort() {
+ _ctHolder->abort();
}
boost::optional<BSONObj> ReshardingCoordinatorService::ReshardingCoordinator::reportForCurrentOp(
@@ -1140,12 +1194,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
return ExecutorFuture<void>(**executor, Status::OK());
}
- return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate()
+ return future_util::withCancellation(
+ _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate(),
+ _ctHolder->getAbortToken())
.thenRunOn(**executor)
.then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
{
auto opCtx = cc().makeOperationContext();
- reshardingPauseCoordinatorBeforeCloning.pauseWhileSet(opCtx.get());
+ reshardingPauseCoordinatorBeforeCloning.pauseWhileSetAndNotCanceled(
+ opCtx.get(), _ctHolder->getAbortToken());
}
auto highestMinFetchTimestamp =
@@ -1165,7 +1222,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
return ExecutorFuture<void>(**executor, Status::OK());
}
- return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning()
+ return future_util::withCancellation(
+ _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning(),
+ _ctHolder->getAbortToken())
.thenRunOn(**executor)
.then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kApplying,
@@ -1180,12 +1239,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
return ExecutorFuture<void>(**executor, Status::OK());
}
- return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying()
+ return future_util::withCancellation(
+ _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying(),
+ _ctHolder->getAbortToken())
.thenRunOn(**executor)
.then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
{
auto opCtx = cc().makeOperationContext();
- reshardingPauseCoordinatorInSteadyState.pauseWhileSet(opCtx.get());
+ reshardingPauseCoordinatorInSteadyState.pauseWhileSetAndNotCanceled(
+ opCtx.get(), _ctHolder->getAbortToken());
}
this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites,
@@ -1193,15 +1255,18 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
});
}
-SharedSemiFuture<ReshardingCoordinatorDocument>
+ExecutorFuture<ReshardingCoordinatorDocument>
ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kBlockingWrites) {
// If in recovery, just return the existing _stateDoc.
- return _coordinatorDoc;
+ return ExecutorFuture<ReshardingCoordinatorDocument>(**executor, _coordinatorDoc);
}
- return _reshardingCoordinatorObserver->awaitAllRecipientsInStrictConsistency();
+ return future_util::withCancellation(
+ _reshardingCoordinatorObserver->awaitAllRecipientsInStrictConsistency(),
+ _ctHolder->getAbortToken())
+ .thenRunOn(**executor);
}
Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecision(
@@ -1214,7 +1279,8 @@ Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisi
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDecisionPersisted);
auto opCtx = cc().makeOperationContext();
- reshardingPauseCoordinatorBeforeDecisionPersisted.pauseWhileSet(opCtx.get());
+ reshardingPauseCoordinatorBeforeDecisionPersisted.pauseWhileSetAndNotCanceled(
+ opCtx.get(), _ctHolder->getAbortToken());
// The new epoch and timestamp to use for the resharded collection to indicate that the
// collection is a new incarnation of the namespace
@@ -1250,9 +1316,12 @@ ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::
_reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection().thenRunOn(
**executor));
- return whenAllSucceed(std::move(futures))
+ // We only allow the stepdown token to cancel operations after progressing past
+ // kDecisionPersisted.
+ return future_util::withCancellation(whenAllSucceed(std::move(futures)),
+ _ctHolder->getStepdownToken())
.thenRunOn(**executor)
- .then([executor](const auto& coordinatorDocsChangedOnDisk) {
+ .then([this, executor](const auto& coordinatorDocsChangedOnDisk) {
auto opCtx = cc().makeOperationContext();
resharding::removeCoordinatorDocAndReshardingFields(opCtx.get(),
coordinatorDocsChangedOnDisk[1]);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index 5d939ebdcee..b99accd3050 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -96,10 +96,72 @@ std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntri
class ServiceContext;
class OperationContext;
-constexpr StringData kReshardingCoordinatorServiceName = "ReshardingCoordinatorService"_sd;
+/**
+ * Construct to encapsulate cancellation tokens and related semantics on the ReshardingCoordinator.
+ */
+class CoordinatorCancellationTokenHolder {
+public:
+ CoordinatorCancellationTokenHolder(CancellationToken stepdownToken)
+ : _stepdownToken(stepdownToken),
+ _abortSource(CancellationSource(stepdownToken)),
+ _abortToken(_abortSource.token()) {}
+
+ /**
+ * Returns whether the any token has been canceled.
+ */
+ bool isCanceled() {
+ return _stepdownToken.isCanceled() || _abortToken.isCanceled();
+ }
+
+ /**
+ * Returns whether the abort token has been canceled, indicating that the resharding operation
+ * was explicitly aborted by an external user.
+ */
+ bool isAborted() {
+ return !_stepdownToken.isCanceled() && _abortToken.isCanceled();
+ }
+
+ /**
+ * Returns whether the stepdownToken has been canceled, indicating that the shard's underlying
+ * replica set node is stepping down or shutting down.
+ */
+ bool isSteppingOrShuttingDown() {
+ return _stepdownToken.isCanceled();
+ }
+
+ /**
+ * Cancels the source created by this class, in order to indicate to holders of the abortToken
+ * that the resharding operation has been aborted.
+ */
+ void abort() {
+ _abortSource.cancel();
+ }
+
+ const CancellationToken& getStepdownToken() {
+ return _stepdownToken;
+ }
+
+ const CancellationToken& getAbortToken() {
+ return _abortToken;
+ }
+
+private:
+ // The token passed in by the PrimaryOnlyService runner that is canceled when this shard's
+ // underlying replica set node is stepping down or shutting down.
+ CancellationToken _stepdownToken;
+
+ // The source created by inheriting from the stepdown token.
+ CancellationSource _abortSource;
+
+ // The token to wait on in cases where a user wants to wait on either a resharding operation
+ // being aborted or the replica set node stepping/shutting down.
+ CancellationToken _abortToken;
+};
class ReshardingCoordinatorService final : public repl::PrimaryOnlyService {
public:
+ static constexpr StringData kServiceName = "ReshardingCoordinatorService"_sd;
+
explicit ReshardingCoordinatorService(ServiceContext* serviceContext)
: PrimaryOnlyService(serviceContext) {}
~ReshardingCoordinatorService() = default;
@@ -107,7 +169,7 @@ public:
class ReshardingCoordinator;
StringData getServiceName() const override {
- return kReshardingCoordinatorServiceName;
+ return kServiceName;
}
NamespaceString getStateDocumentsNS() const override {
@@ -130,13 +192,19 @@ private:
class ReshardingCoordinatorService::ReshardingCoordinator final
: public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> {
public:
- explicit ReshardingCoordinator(const BSONObj& state);
+ explicit ReshardingCoordinator(const ReshardingCoordinatorService* coordinatorService,
+ const BSONObj& state);
~ReshardingCoordinator();
SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
- void interrupt(Status status) override;
+ void interrupt(Status status) override {}
+
+ /**
+ * Attempts to cancel the underlying resharding operation using the abort token.
+ */
+ void abort();
/**
* Replace in-memory representation of the CoordinatorDoc
@@ -164,6 +232,25 @@ private:
};
/**
+ * Runs resharding up through preparing to persist the decision.
+ */
+ ExecutorFuture<ReshardingCoordinatorDocument> _runUntilReadyToPersistDecision(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept;
+
+ /**
+ * Runs resharding through persisting the decision until cleanup.
+ */
+ ExecutorFuture<void> _persistDecisionAndFinishReshardOperation(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const ReshardingCoordinatorDocument& updatedCoordinatorDoc) noexcept;
+
+ /**
+ * Runs cleanup logic that only applies to abort.
+ */
+ void _onAbort(const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const Status& status);
+
+ /**
* Does the following writes:
* 1. Inserts the coordinator document into config.reshardingOperations
* 2. Adds reshardingFields to the config.collections entry for the original collection
@@ -210,7 +297,7 @@ private:
* Waits on _reshardingCoordinatorObserver to notify that all recipients have entered
* strict-consistency.
*/
- SharedSemiFuture<ReshardingCoordinatorDocument> _awaitAllRecipientsInStrictConsistency(
+ ExecutorFuture<ReshardingCoordinatorDocument> _awaitAllRecipientsInStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
/**
@@ -271,6 +358,9 @@ private:
// collection. The object looks like: {_id: 'reshardingUUID'}
const InstanceID _id;
+ // The primary-only service instance corresponding to the coordinator instance. Not owned.
+ const ReshardingCoordinatorService* const _coordinatorService;
+
// Observes writes that indicate state changes for this resharding operation and notifies
// 'this' when all donors/recipients have entered some state so that 'this' can transition
// states.
@@ -279,6 +369,9 @@ private:
// The updated coordinator state document.
ReshardingCoordinatorDocument _coordinatorDoc;
+ // Holds the cancellation tokens relevant to the ReshardingCoordinator.
+ std::unique_ptr<CoordinatorCancellationTokenHolder> _ctHolder;
+
// Protects promises below.
mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_mutex");
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
index 341bbbb92f6..fb5a5958c13 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -46,6 +46,8 @@
namespace mongo {
namespace {
+using namespace fmt::literals;
+
const ReshardingDonorOplogId kResumeFromBeginning{Timestamp::min(), Timestamp::min()};
repl::MutableOplogEntry makeOplog(const NamespaceString& nss,
@@ -191,7 +193,8 @@ public:
}
private:
- const NamespaceString _oplogNss{"config.localReshardingOplogBuffer.xxx.yyy"};
+ const NamespaceString _oplogNss{"{}.{}xxx.yyy"_format(
+ NamespaceString::kConfigDb, NamespaceString::kReshardingLocalOplogBufferPrefix)};
const NamespaceString _crudNss{"test.foo"};
const UUID _uuid{UUID::gen()};
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp
index fae4b4d4eac..2b6e9da1ded 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp
@@ -47,7 +47,7 @@ namespace {
std::shared_ptr<ReshardingCoordinatorObserver> getReshardingCoordinatorObserver(
OperationContext* opCtx, const BSONObj& reshardingId) {
auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
- auto service = registry->lookupServiceByName(kReshardingCoordinatorServiceName);
+ auto service = registry->lookupServiceByName(ReshardingCoordinatorService::kServiceName);
auto instance =
ReshardingCoordinatorService::ReshardingCoordinator::lookup(opCtx, service, reshardingId);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 29d02d18c2f..0e8cde51878 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -65,6 +65,9 @@
namespace mongo {
MONGO_FAIL_POINT_DEFINE(removeRecipientDocFailpoint);
+MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientBeforeCloning);
+MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringCloning);
+MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringOplogApplication);
namespace {
@@ -471,6 +474,11 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
auto* serviceContext = Client::getCurrent()->getServiceContext();
auto fetchTimestamp = *_fetchTimestamp;
+ {
+ auto opCtx = cc().makeOperationContext();
+ reshardingPauseRecipientBeforeCloning.pauseWhileSet(opCtx.get());
+ }
+
_collectionCloner = std::make_unique<ReshardingCollectionCloner>(
std::make_unique<ReshardingCollectionCloner::Env>(_metrics()),
ShardKeyPattern{_metadata.getReshardingKey()},
@@ -526,35 +534,42 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
}));
}
- return whenAllSucceed(
- _collectionCloner
- ->run(**executor,
- abortToken,
- CancelableOperationContextFactory(abortToken, _markKilledExecutor))
- .thenRunOn(**executor),
- (*executor)
- ->sleepFor(_minimumOperationDuration, abortToken)
- .then([this, executor, abortToken] {
- if (_txnCloners.empty()) {
- return SemiFuture<void>::makeReady();
- }
-
- auto serviceContext = Client::getCurrent()->getServiceContext();
-
- std::vector<ExecutorFuture<void>> txnClonerFutures;
- for (auto&& txnCloner : _txnCloners) {
- txnClonerFutures.push_back(
- txnCloner->run(serviceContext, **executor, abortToken));
- }
-
- return whenAllSucceed(std::move(txnClonerFutures));
- }))
- .thenRunOn(**executor)
- .then([this] {
- // ReshardingTxnCloners must complete before the recipient transitions to kApplying to
- // avoid errors caused by donor shards unpinning the fetchTimestamp.
- _transitionState(RecipientStateEnum::kApplying);
- });
+ auto cloneFinishFuture =
+ whenAllSucceed(_collectionCloner
+ ->run(**executor,
+ abortToken,
+ CancelableOperationContextFactory(abortToken, _markKilledExecutor))
+ .thenRunOn(**executor),
+ (*executor)
+ ->sleepFor(_minimumOperationDuration, abortToken)
+ .then([this, executor, abortToken] {
+ if (_txnCloners.empty()) {
+ return SemiFuture<void>::makeReady();
+ }
+
+ auto serviceContext = Client::getCurrent()->getServiceContext();
+
+ std::vector<ExecutorFuture<void>> txnClonerFutures;
+ for (auto&& txnCloner : _txnCloners) {
+ txnClonerFutures.push_back(
+ txnCloner->run(serviceContext, **executor, abortToken));
+ }
+
+ return whenAllSucceed(std::move(txnClonerFutures));
+ }))
+ .thenRunOn(**executor)
+ .then([this] {
+ // ReshardingTxnCloners must complete before the recipient transitions to kApplying
+ // to avoid errors caused by donor shards unpinning the fetchTimestamp.
+ _transitionState(RecipientStateEnum::kApplying);
+ });
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ reshardingPauseRecipientDuringCloning.pauseWhileSet(opCtx.get());
+ }
+
+ return cloneFinishFuture;
}
ExecutorFuture<void>
@@ -660,6 +675,11 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
}));
}
+ {
+ auto opCtx = cc().makeOperationContext();
+ reshardingPauseRecipientDuringOplogApplication.pauseWhileSet(opCtx.get());
+ }
+
return whenAllSucceed(std::move(futuresToWaitOn))
.thenRunOn(**executor)
.then([stashCollections] {
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 24e58751dfd..3d0ac9e264e 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -146,6 +146,7 @@ env.Library(
'database_version.cpp',
'database_version.idl',
'mongod_and_mongos_server_parameters.idl',
+ 'request_types/abort_reshard_collection.idl',
'request_types/add_shard_request_type.cpp',
'request_types/add_shard_to_zone_request_type.cpp',
'request_types/balance_chunk_request_type.cpp',
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 27f57770b88..4028d7648c0 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -29,6 +29,7 @@ env.Library(
'cluster_available_query_options_cmd.cpp',
'cluster_balancer_collection_status_cmd.cpp',
'cluster_build_info.cpp',
+ 'cluster_abort_reshard_collection_cmd.cpp',
'cluster_clear_jumbo_flag_cmd.cpp',
'cluster_coll_stats_cmd.cpp',
'cluster_collection_mod_cmd.cpp',
diff --git a/src/mongo/s/commands/cluster_abort_reshard_collection_cmd.cpp b/src/mongo/s/commands/cluster_abort_reshard_collection_cmd.cpp
new file mode 100644
index 00000000000..cfe23077441
--- /dev/null
+++ b/src/mongo/s/commands/cluster_abort_reshard_collection_cmd.cpp
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/request_types/abort_reshard_collection_gen.h"
+
+namespace mongo {
+namespace {
+
+class AbortReshardCollectionCommand : public TypedCommand<AbortReshardCollectionCommand> {
+public:
+ using Request = AbortReshardCollection;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ const NamespaceString& nss = ns();
+
+ LOGV2(5403500, "Beginning reshard abort operation", "namespace"_attr = ns());
+
+ ConfigsvrAbortReshardCollection configsvrAbortReshardCollection(nss);
+ configsvrAbortReshardCollection.setDbName(request().getDbName());
+
+ auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ CommandHelpers::appendMajorityWriteConcern(
+ configsvrAbortReshardCollection.toBSON({}), opCtx->getWriteConcern()),
+ Shard::RetryPolicy::kIdempotent));
+ uassertStatusOK(cmdResponse.commandStatus);
+ uassertStatusOK(cmdResponse.writeConcernStatus);
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return request().getCommandParameter();
+ }
+
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(ns()),
+ ActionType::reshardCollection));
+ }
+ };
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ std::string help() const override {
+ return "Abort any in-progress resharding operations for this collection.";
+ }
+};
+
+MONGO_REGISTER_TEST_COMMAND(AbortReshardCollectionCommand);
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/request_types/abort_reshard_collection.idl b/src/mongo/s/request_types/abort_reshard_collection.idl
new file mode 100644
index 00000000000..301a3e27cc6
--- /dev/null
+++ b/src/mongo/s/request_types/abort_reshard_collection.idl
@@ -0,0 +1,53 @@
+# Copyright (C) 2021-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# abortReshardCollection IDL file
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+commands:
+ abortReshardCollection:
+ description: "The public abortReshardCollection command on mongos."
+ command_name: abortReshardCollection
+ strict: false
+ namespace: type
+ api_version: ""
+ type: namespacestring
+
+ _configsvrAbortReshardCollection:
+ command_name: _configsvrAbortReshardCollection
+ cpp_name: ConfigsvrAbortReshardCollection
+ description: "The internal abortReshardCollection command on the config server."
+ strict: false
+ namespace: type
+ api_version: ""
+ type: namespacestring