diff options
author | Blake Oler <blake.oler@mongodb.com> | 2021-03-25 15:25:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-29 14:59:13 +0000 |
commit | 5103a49830ec4e13c770cd3e2af4661fd8bf0d3f (patch) | |
tree | 519d4cb93bf2970ab96488765ca0c5631b512ead | |
parent | 95198a839f730dccf32ce8c39d2243d53354496e (diff) | |
download | mongo-5103a49830ec4e13c770cd3e2af4661fd8bf0d3f.tar.gz |
SERVER-52770 Add abortReshardCollection command for users to cancel the resharding operation
26 files changed, 1006 insertions, 104 deletions
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js index d812caf2e77..73f84f6b36d 100644 --- a/jstests/auth/lib/commands_lib.js +++ b/jstests/auth/lib/commands_lib.js @@ -195,6 +195,35 @@ var authCommandsLib = { /************* TEST CASES ****************/ tests: [ + { + testname: "abortReshardCollection", + command: {abortReshardCollection: "test.x"}, + skipUnlessSharded: true, + testcases: [ + { + runOnDb: adminDbName, + roles: Object.extend({enableSharding: 1}, roles_clusterManager), + privileges: + [{resource: {db: "test", collection: "x"}, actions: ["reshardCollection"]}], + expectFail: true + }, + ] + }, + { + testname: "_configsvrAbortReshardCollection", + command: {_configsvrAbortReshardCollection: "test.x"}, + skipSharded: true, + testcases: [ + { + runOnDb: adminDbName, + roles: {__system: 1}, + privileges: [{resource: {cluster: true}, actions: ["internal"]}], + expectFail: true + }, + {runOnDb: firstDbName, roles: {}}, + {runOnDb: secondDbName, roles: {}} + ] + }, { testname: "abortTxn", command: {abortTransaction: 1}, diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index 5735813c9ff..8d079145d9e 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -74,6 +74,7 @@ let viewsCommandTests = { _addShard: {skip: isAnInternalCommand}, _cloneCatalogData: {skip: isAnInternalCommand}, _cloneCollectionOptionsFromPrimaryShard: {skip: isAnInternalCommand}, + _configsvrAbortReshardCollection: {skip: isAnInternalCommand}, _configsvrAddShard: {skip: isAnInternalCommand}, _configsvrAddShardToZone: {skip: isAnInternalCommand}, _configsvrBalancerCollectionStatus: {skip: isAnInternalCommand}, @@ -133,6 +134,7 @@ let viewsCommandTests = { _shardsvrShardCollection: {skip: isAnInternalCommand}, _transferMods: {skip: isAnInternalCommand}, _vectorClockPersist: {skip: isAnInternalCommand}, + abortReshardCollection: {skip: isUnrelated}, abortTransaction: {skip: isUnrelated}, addShard: {skip: isUnrelated}, addShardToZone: {skip: isUnrelated}, diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index 32a4fccdfaf..957c2369565 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -25,6 +25,7 @@ const isPrimaryOnly = "primary only"; const allCommands = { _addShard: {skip: isPrimaryOnly}, _cloneCollectionOptionsFromPrimaryShard: {skip: isPrimaryOnly}, + _configsvrAbortReshardCollection: {skip: isPrimaryOnly}, _configsvrAddShard: {skip: isPrimaryOnly}, _configsvrAddShardToZone: {skip: isPrimaryOnly}, _configsvrBalancerCollectionStatus: {skip: isPrimaryOnly}, @@ -82,6 +83,7 @@ const allCommands = { _shardsvrRefineCollectionShardKey: {skip: isPrimaryOnly}, _transferMods: {skip: isPrimaryOnly}, _vectorClockPersist: {skip: isPrimaryOnly}, + abortReshardCollection: {skip: isPrimaryOnly}, abortTransaction: {skip: isPrimaryOnly}, aggregate: { command: {aggregate: collName, pipeline: [{$match: {}}], cursor: {}}, diff --git a/jstests/sharding/database_versioning_all_commands.js b/jstests/sharding/database_versioning_all_commands.js index 1e5973d3654..26983182aa0 100644 --- a/jstests/sharding/database_versioning_all_commands.js +++ b/jstests/sharding/database_versioning_all_commands.js @@ -236,6 +236,7 @@ let testCases = { _isSelf: {skip: "executes locally on mongos (not sent to any remote node)"}, _killOperations: {skip: "executes locally on mongos (not sent to any remote node)"}, _mergeAuthzCollections: {skip: "always targets the config server"}, + abortReshardCollection: {skip: "always targets the config server"}, abortTransaction: {skip: "unversioned and uses special targetting rules"}, addShard: {skip: "not on a user database"}, addShardToZone: {skip: "not on a user database"}, diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js index f536f25435b..c2e7b1aa054 100644 --- a/jstests/sharding/libs/resharding_test_fixture.js +++ b/jstests/sharding/libs/resharding_test_fixture.js @@ -257,13 +257,17 @@ var ReshardingTest = class { * assertions. This function is called in the critical section after a successful * `reshardCollection` command has shuffled data, but before the response is returned to the * client. + * + * @param postDecisionPersistedFn - a function for evaluating addition assertions after + * the decision has been persisted, but before the resharding operation finishes and returns + * to the client. */ withReshardingInBackground({newShardKeyPattern, newChunks}, duringReshardingFn = (tempNs) => {}, { expectedErrorCode = ErrorCodes.OK, postCheckConsistencyFn = (tempNs) => {}, - postAbortDecisionPersistedFn = () => {} + postDecisionPersistedFn = () => {} } = {}) { this._startReshardingInBackgroundAndAllowCommandFailure({newShardKeyPattern, newChunks}, expectedErrorCode); @@ -276,7 +280,7 @@ var ReshardingTest = class { this._callFunctionSafely(() => duringReshardingFn(this._tempNs)); this._checkConsistencyAndPostState(expectedErrorCode, () => postCheckConsistencyFn(this._tempNs), - () => postAbortDecisionPersistedFn()); + () => postDecisionPersistedFn()); } /** @private */ @@ -380,7 +384,7 @@ var ReshardingTest = class { /** @private */ _checkConsistencyAndPostState(expectedErrorCode, postCheckConsistencyFn = () => {}, - postAbortDecisionPersistedFn = () => {}) { + postDecisionPersistedFn = () => {}) { let performCorrectnessChecks = true; if (expectedErrorCode === ErrorCodes.OK) { this._callFunctionSafely(() => { @@ -414,13 +418,14 @@ var ReshardingTest = class { } this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off(); + postDecisionPersistedFn(); this._pauseCoordinatorBeforeCompletionFailpoint.off(); }); } else { this._callFunctionSafely(() => { this._pauseCoordinatorInSteadyStateFailpoint.off(); this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off(); - postAbortDecisionPersistedFn(); + postDecisionPersistedFn(); this._pauseCoordinatorBeforeCompletionFailpoint.off(); }); } diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index dd747984da8..13a03235913 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -74,6 +74,7 @@ let validateTestCase = function(test) { let testCases = { _addShard: {skip: "internal command"}, _cloneCollectionOptionsFromPrimaryShard: {skip: "internal command"}, + _configsvrAbortReshardCollection: {skip: "internal command"}, _configsvrAddShard: {skip: "internal command"}, _configsvrAddShardToZone: {skip: "internal command"}, _configsvrBalancerCollectionStatus: {skip: "internal command"}, @@ -132,6 +133,7 @@ let testCases = { _shardsvrShardCollection: {skip: "internal command"}, _transferMods: {skip: "internal command"}, _vectorClockPersist: {skip: "internal command"}, + abortReshardCollection: {skip: "does not accept read or write concern"}, abortTransaction: { setUp: function(conn) { assert.commandWorked(conn.getDB(db).runCommand({create: coll, writeConcern: {w: 1}})); diff --git a/jstests/sharding/resharding_abort_command.js b/jstests/sharding/resharding_abort_command.js new file mode 100644 index 00000000000..d103ca89668 --- /dev/null +++ b/jstests/sharding/resharding_abort_command.js @@ -0,0 +1,329 @@ +/** + * Test to make sure that the abort command interrupts a resharding operation that has not yet + * persisted a decision. + * + * @tags: [requires_fcv_49, uses_atclustertime] + */ +(function() { +"use strict"; +load("jstests/libs/discover_topology.js"); +load("jstests/libs/parallelTester.js"); +load("jstests/sharding/libs/resharding_test_fixture.js"); +load("jstests/sharding/libs/resharding_test_util.js"); + +const originalCollectionNs = "reshardingDb.coll"; +const enterAbortFailpointName = "reshardingPauseCoordinatorBeforeStartingErrorFlow"; + +const nodeTypeEnum = { + COORDINATOR: 1, + DONOR: 2, + RECIPIENT: 3, + NO_EXTRA_FAILPOINTS_SENTINEL: 4 +}; + +const abortLocationEnum = { + BEFORE_STEADY_STATE: 1, + BEFORE_DECISION_PERSISTED: 2, + AFTER_DECISION_PERSISTED: 3 +}; + +let getConnStringsFromNodeType = (nodeType, reshardingTest, topology) => { + let connStrings = []; + if (nodeType == nodeTypeEnum.COORDINATOR) { + connStrings.push(topology.configsvr.nodes[0]); + } else if (nodeType == nodeTypeEnum.DONOR) { + for (let donor of reshardingTest.donorShardNames) { + connStrings.push(topology.shards[donor].primary); + } + } else if (nodeType == nodeTypeEnum.RECIPIENT) { + for (let recipient of reshardingTest.recipientShardNames) { + connStrings.push(topology.shards[recipient].primary); + } + } else if (nodeType == nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) { + } else { + throw 'unsupported node type in resharding abort test'; + } + + return connStrings; +}; + +let getMongosFromConnStrings = (connStrings) => { + let mongos = []; + for (let conn of connStrings) { + mongos.push(new Mongo(conn)); + } + return mongos; +}; + +let generateFailpoints = + (failpointName, failpointNodeType, reshardingTest, toplogy, failpointMode = "alwaysOn") => { + const failpointTargetConnStrings = + getConnStringsFromNodeType(failpointNodeType, reshardingTest, toplogy); + const failpointHosts = getMongosFromConnStrings(failpointTargetConnStrings); + + let failpoints = []; + for (let host of failpointHosts) { + failpoints.push(configureFailPoint(host, failpointName)); + } + + return failpoints; + }; + +let generateAbortThread = (mongosConnString, ns, expectedErrorCodes) => { + return new Thread((mongosConnString, ns, expectedErrorCodes) => { + const mongos = new Mongo(mongosConnString); + if (expectedErrorCodes == ErrorCodes.OK) { + assert.commandWorked(mongos.adminCommand({abortReshardCollection: ns})); + } else { + assert.commandFailedWithCode(mongos.adminCommand({abortReshardCollection: ns}), + expectedErrorCodes); + } + }, mongosConnString, ns, expectedErrorCodes); +}; + +let triggerAbortAndCoordinateFailpoints = (failpointName, + failpointNodeType, + reshardingTest, + topology, + mongos, + configsvr, + abortThread, + failpoints, + executeBeforeWaitingOnFailpointsFn, + executeAfterWaitingOnFailpointsFn, + executeAfterAbortingFn) => { + if (executeBeforeWaitingOnFailpointsFn) { + jsTestLog(`Executing the before-waiting-on-failpoint function`); + executeBeforeWaitingOnFailpointsFn(mongos, originalCollectionNs); + } + + if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) { + jsTestLog(`Wait for the failpoint ${failpointName} to be reached on all applicable nodes`); + for (let failpoint of failpoints) { + failpoint.wait(); + } + } + + if (executeAfterWaitingOnFailpointsFn) { + jsTestLog(`Executing the after-waiting-on-failpoint function`); + executeAfterWaitingOnFailpointsFn(mongos, originalCollectionNs); + } + + jsTestLog(`Wait for the coordinator to recognize that it's been aborted`); + + const enterAbortFailpoint = configureFailPoint(configsvr, enterAbortFailpointName); + abortThread.start(); + enterAbortFailpoint.wait(); + + if (executeAfterAbortingFn) { + jsTestLog(`Executing the after-aborting function`); + executeAfterAbortingFn(mongos, originalCollectionNs); + } + + enterAbortFailpoint.off(); + + if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) { + jsTestLog(`Turn off the failpoint ${ + failpointName} to allow both the abort and the resharding operation to complete`); + for (let failpoint of failpoints) { + failpoint.off(); + } + } +}; + +let triggerPostDecisionPersistedAbort = (mongos, abortThread) => { + assert.soon(() => { + // It's possible that after the decision has been persisted, the + // coordinator document could be in either of the two specified states, + // or will have been completely deleted. Await any of these conditions + // in order to test the abort command's inability to abort after a + // persisted decision. + const coordinatorDoc = + mongos.getCollection('config.reshardingOperations').findOne({ns: originalCollectionNs}); + return coordinatorDoc == null || + (coordinatorDoc.state === "decision-persisted" || coordinatorDoc.state === "done"); + }); + + abortThread.start(); +}; + +const runAbortWithFailpoint = (failpointName, failpointNodeType, abortLocation, { + executeAtStartOfReshardingFn = null, + executeBeforeWaitingOnFailpointsFn = null, + executeAfterWaitingOnFailpointsFn = null, + executeAfterAbortingFn = null, +} = {}) => { + const reshardingTest = + new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true}); + reshardingTest.setup(); + + const donorShardNames = reshardingTest.donorShardNames; + const recipientShardNames = reshardingTest.recipientShardNames; + + const sourceCollection = reshardingTest.createShardedCollection({ + ns: "reshardingDb.coll", + shardKeyPattern: {oldKey: 1}, + chunks: [ + {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]}, + {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}, + ], + }); + + const mongos = sourceCollection.getMongo(); + const topology = DiscoverTopology.findConnectedNodes(mongos); + const configsvr = new Mongo(topology.configsvr.nodes[0]); + + let expectedAbortErrorCodes = ErrorCodes.OK; + let expectedReshardingErrorCode = ErrorCodes.ReshardCollectionAborted; + + // If the abort is going to happen after the decision is persisted, it's expected that the + // resharding operation will have finished without error, and that the abort itself will + // error. + if (abortLocation == abortLocationEnum.AFTER_DECISION_PERSISTED) { + expectedAbortErrorCodes = + [ErrorCodes.ReshardCollectionCommitted, ErrorCodes.NoSuchReshardCollection]; + expectedReshardingErrorCode = ErrorCodes.OK; + } + + const abortThread = generateAbortThread( + topology.mongos.nodes[0], originalCollectionNs, expectedAbortErrorCodes); + + let failpoints = []; + if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) { + failpoints = generateFailpoints(failpointName, failpointNodeType, reshardingTest, topology); + } + + reshardingTest.withReshardingInBackground( + { + newShardKeyPattern: {newKey: 1}, + newChunks: [ + {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]}, + {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]}, + ], + }, + () => { + if (executeAtStartOfReshardingFn) { + jsTestLog(`Executing the start-of-resharding fn`); + executeAtStartOfReshardingFn( + reshardingTest, topology, mongos, originalCollectionNs); + } + + if (abortLocation == abortLocationEnum.BEFORE_STEADY_STATE) { + triggerAbortAndCoordinateFailpoints(failpointName, + failpointNodeType, + reshardingTest, + topology, + mongos, + configsvr, + abortThread, + failpoints, + executeBeforeWaitingOnFailpointsFn, + executeAfterWaitingOnFailpointsFn, + executeAfterAbortingFn); + } + }, + { + expectedErrorCode: expectedReshardingErrorCode, + postCheckConsistencyFn: () => { + if (abortLocation == abortLocationEnum.BEFORE_DECISION_PERSISTED) { + triggerAbortAndCoordinateFailpoints(failpointName, + failpointNodeType, + reshardingTest, + topology, + mongos, + configsvr, + abortThread, + failpoints, + executeBeforeWaitingOnFailpointsFn, + executeAfterWaitingOnFailpointsFn, + executeAfterAbortingFn); + } + }, + postDecisionPersistedFn: () => { + if (abortLocation == abortLocationEnum.AFTER_DECISION_PERSISTED) { + triggerPostDecisionPersistedAbort(mongos, abortThread); + } + } + }); + reshardingTest.teardown(); + + abortThread.join(); +}; + +runAbortWithFailpoint("reshardingPauseRecipientBeforeCloning", + nodeTypeEnum.RECIPIENT, + abortLocationEnum.BEFORE_STEADY_STATE); +runAbortWithFailpoint("reshardingPauseRecipientDuringCloning", + nodeTypeEnum.RECIPIENT, + abortLocationEnum.BEFORE_STEADY_STATE); + +runAbortWithFailpoint("reshardingPauseRecipientDuringOplogApplication", + nodeTypeEnum.RECIPIENT, + abortLocationEnum.BEFORE_STEADY_STATE, + { + executeAfterWaitingOnFailpointsFn: (mongos, ns) => { + assert.commandWorked(mongos.getCollection(ns).insert([ + {_id: 0, oldKey: -10, newKey: -10}, + {_id: 1, oldKey: 10, newKey: -10}, + {_id: 2, oldKey: -10, newKey: 10}, + {_id: 3, oldKey: 10, newKey: 10}, + ])); + }, + }); + +// Rely on the resharding_test_fixture's built-in failpoint that hangs before switching to +// the blocking writes state. +runAbortWithFailpoint( + null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.BEFORE_STEADY_STATE, { + executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => { + assert.soon(() => { + const coordinatorDoc = + mongos.getCollection('config.reshardingOperations').findOne({ns: ns}); + if (coordinatorDoc == null) { + return false; + } + + jsTestLog(tojson(coordinatorDoc)); + + for (const shardEntry of coordinatorDoc.recipientShards) { + if (shardEntry.mutableState.state !== "steady-state") { + return false; + } + } + + return true; + }); + }, + }); + +runAbortWithFailpoint( + null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.AFTER_DECISION_PERSISTED); + +// TODO SERVER-55506 Uncomment and fix this case after the _flushReshardingStateChange command has +// been emplaced. +/* +runAbortWithFailpoint("reshardingDonorPausesAfterEmplacingCriticalSection", nodeTypeEnum.DONOR, +abortLocationEnum.BEFORE_DECISION_PERSISTED, +{ + executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => { + assert.soon(() => { + const coordinatorDoc = mongos.getCollection('config.reshardingOperations').findOne({ns: +ns}); return coordinatorDoc != null && coordinatorDoc.state === "applying"; + }); + + generateFailpoints("reshardingPauseRecipientDuringOplogApplication", nodeTypeEnum.RECIPIENT, +reshardingTest, topology); + + assert.commandWorked(mongos.getCollection(ns).insert([ + {_id: 0, oldKey: -10, newKey: -10}, + {_id: 1, oldKey: 10, newKey: -10}, + {_id: 2, oldKey: -10, newKey: 10}, + {_id: 3, oldKey: 10, newKey: 10}, + ])); + }, + executeAfterAbortingFn: (reshardingTest, topology, mongos, ns) => { + generateFailpoints("reshardingPauseRecipientDuringOplogApplication", nodeTypeEnum.RECIPIENT, +reshardingTest, topology, "off"); + } +});*/ +})(); diff --git a/jstests/sharding/resharding_fails_on_nonempty_stash.js b/jstests/sharding/resharding_fails_on_nonempty_stash.js index 8af6db926f9..32f45168db3 100644 --- a/jstests/sharding/resharding_fails_on_nonempty_stash.js +++ b/jstests/sharding/resharding_fails_on_nonempty_stash.js @@ -60,7 +60,7 @@ reshardingTest.withReshardingInBackground( }, { expectedErrorCode: 5356800, - postAbortDecisionPersistedFn: () => { + postDecisionPersistedFn: () => { ReshardingTestUtil.assertRecipientAbortsLocally( recipient1Conn, recipient1Conn.shardName, "reshardingDb.coll", 5356800); } diff --git a/jstests/sharding/resharding_recipient_broadcasts_abortReason.js b/jstests/sharding/resharding_recipient_broadcasts_abortReason.js index b1631b1a62e..4c5b8298665 100644 --- a/jstests/sharding/resharding_recipient_broadcasts_abortReason.js +++ b/jstests/sharding/resharding_recipient_broadcasts_abortReason.js @@ -61,7 +61,7 @@ reshardingTest.withReshardingInBackground( }, { expectedErrorCode: ErrorCodes.InternalError, - postAbortDecisionPersistedFn: () => { + postDecisionPersistedFn: () => { ReshardingTestUtil.assertAllParticipantsReportAbortToCoordinator( configsvr, inputCollection.getFullName(), ErrorCodes.InternalError); } diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js index deb9f8cb965..5851502f5a2 100644 --- a/jstests/sharding/safe_secondary_reads_drop_recreate.js +++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js @@ -71,6 +71,7 @@ let testCases = { _recvChunkStart: {skip: "primary only"}, _recvChunkStatus: {skip: "primary only"}, _transferMods: {skip: "primary only"}, + abortReshardCollection: {skip: "primary only"}, abortTransaction: {skip: "primary only"}, addShard: {skip: "primary only"}, addShardToZone: {skip: "primary only"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js index 22953b46ecc..c6a41a6d463 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js @@ -83,6 +83,7 @@ let testCases = { _recvChunkStart: {skip: "primary only"}, _recvChunkStatus: {skip: "primary only"}, _transferMods: {skip: "primary only"}, + abortReshardCollection: {skip: "primary only"}, abortTransaction: {skip: "primary only"}, addShard: {skip: "primary only"}, addShardToZone: {skip: "primary only"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js index 3ed86ba2893..aea60e6afc0 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js @@ -74,6 +74,7 @@ let testCases = { _recvChunkStart: {skip: "primary only"}, _recvChunkStatus: {skip: "primary only"}, _transferMods: {skip: "primary only"}, + abortReshardCollection: {skip: "primary only"}, abortTransaction: {skip: "primary only"}, addShard: {skip: "primary only"}, addShardToZone: {skip: "primary only"}, diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 63f44f460c1..ade1b613d3f 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -420,6 +420,10 @@ error_codes: - {code: 338, name: ReshardCollectionInProgress} + - {code: 339, name: NoSuchReshardCollection} + - {code: 340, name: ReshardCollectionCommitted} + - {code: 341, name: ReshardCollectionAborted} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 6667851b1d0..3987a737a7b 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -310,6 +310,7 @@ env.Library( 'cleanup_orphaned_cmd.cpp', 'clone_catalog_data_command.cpp', 'clone_collection_options_from_primary_shard_cmd.cpp', + 'config/configsvr_abort_reshard_collection_command.cpp', 'config/configsvr_add_shard_command.cpp', 'config/configsvr_add_shard_to_zone_command.cpp', 'config/configsvr_balancer_collection_status_command.cpp', diff --git a/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp b/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp new file mode 100644 index 00000000000..00f85ce51e6 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp @@ -0,0 +1,175 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/resharding/resharding_coordinator_service.h" +#include "mongo/db/s/resharding/resharding_donor_recipient_common.h" +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/abort_reshard_collection_gen.h" + +namespace mongo { +namespace { + +UUID retrieveReshardingUUID(OperationContext* opCtx, const NamespaceString& ns) { + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + const auto catalogClient = Grid::get(opCtx)->catalogClient(); + const auto collEntry = catalogClient->getCollection(opCtx, ns); + + uassert(ErrorCodes::NoSuchReshardCollection, + "Could not find resharding-related metadata that matches the given namespace", + collEntry.getReshardingFields()); + + return collEntry.getReshardingFields()->getReshardingUUID(); +} + +void assertExistsReshardingDocument(OperationContext* opCtx, UUID reshardingUUID) { + PersistentTaskStore<ReshardingCoordinatorDocument> store( + NamespaceString::kConfigReshardingOperationsNamespace); + + boost::optional<ReshardingCoordinatorDocument> docOptional; + store.forEach(opCtx, + QUERY(ReshardingCoordinatorDocument::kReshardingUUIDFieldName << reshardingUUID), + [&](const ReshardingCoordinatorDocument& doc) { + docOptional.emplace(doc); + return false; + }); + uassert(ErrorCodes::NoSuchReshardCollection, + "Could not find resharding document to abort resharding operation", + !!docOptional); +} + +auto assertGetReshardingMachine(OperationContext* opCtx, UUID reshardingUUID) { + auto machine = resharding::tryGetReshardingStateMachine< + ReshardingCoordinatorService, + ReshardingCoordinatorService::ReshardingCoordinator, + ReshardingCoordinatorDocument>(opCtx, reshardingUUID); + + uassert(ErrorCodes::NoSuchReshardCollection, + "Could not find in-progress resharding operation to abort", + machine); + return *machine; +} + +class ConfigsvrAbortReshardCollectionCommand final + : public TypedCommand<ConfigsvrAbortReshardCollectionCommand> { +public: + using Request = ConfigsvrAbortReshardCollection; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + + opCtx->setAlwaysInterruptAtStepDownOrUp(); + + uassert(ErrorCodes::IllegalOperation, + "_configsvrAbortReshardCollection can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + uassert(ErrorCodes::InvalidOptions, + "_configsvrAbortReshardCollection must be called with majority writeConcern", + opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); + + const auto reshardingUUID = retrieveReshardingUUID(opCtx, ns()); + + LOGV2(5403501, + "Aborting resharding operation", + "namespace"_attr = ns(), + "reshardingUUID"_attr = reshardingUUID); + + assertExistsReshardingDocument(opCtx, reshardingUUID); + + auto machine = assertGetReshardingMachine(opCtx, reshardingUUID); + auto future = machine->getCompletionFuture(); + machine->abort(); + + auto completionStatus = future.getNoThrow(opCtx); + + // Receiving this error from the state machine indicates that resharding was + // successfully aborted, and that the abort command should return OK. + if (completionStatus == ErrorCodes::ReshardCollectionAborted) { + return; + } + + // Receiving an OK status from the machine after attempting to abort the resharding + // operation indicates that the resharding operation ignored the abort attempt. The + // resharding operation only ignores the abort attempt if the decision was already + // persisted, implying that the resharding operation was in an unabortable state. + uassert(ErrorCodes::ReshardCollectionCommitted, + "Can't abort resharding operation after the decision has been persisted", + completionStatus != Status::OK()); + + // Return any other status to the client. + uassertStatusOK(completionStatus); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + + std::string help() const override { + return "Internal command, which is exported by the sharding config server. Do not call " + "directly. Aborts any in-progress resharding operations for this collection."; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } +} configsvrAbortReshardCollectionCmd; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index 8e8829fcb38..2a7063b18eb 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -143,7 +143,8 @@ public: coordinatorDoc.setNumInitialChunks(request().getNumInitialChunks()); auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); - auto service = registry->lookupServiceByName(kReshardingCoordinatorServiceName); + auto service = + registry->lookupServiceByName(ReshardingCoordinatorService::kServiceName); auto instance = ReshardingCoordinatorService::ReshardingCoordinator::getOrCreate( opCtx, service, coordinatorDoc.toBSON()); diff --git a/src/mongo/db/s/resharding/resharding_agg_test.cpp b/src/mongo/db/s/resharding/resharding_agg_test.cpp index 7a4e7720bca..3196dacd64e 100644 --- a/src/mongo/db/s/resharding/resharding_agg_test.cpp +++ b/src/mongo/db/s/resharding/resharding_agg_test.cpp @@ -40,6 +40,8 @@ namespace mongo { namespace { +using namespace fmt::literals; + /** * Mock interface to allow specifiying mock results for the lookup pipeline. */ @@ -279,7 +281,8 @@ protected: } const NamespaceString _remoteOplogNss{"local.oplog.rs"}; - const NamespaceString _localOplogBufferNss{"config.localReshardingOplogBuffer.xxx.yyy"}; + const NamespaceString _localOplogBufferNss{"{}.{}xxx.yyy"_format( + NamespaceString::kConfigDb, NamespaceString::kReshardingLocalOplogBufferPrefix)}; const NamespaceString _crudNss{"test.foo"}; // Use a constant value so unittests can store oplog entries as extended json strings in code. const UUID _reshardingCollUUID = diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 3ef29568df1..e76a0f93d03 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -66,7 +66,9 @@ using namespace fmt::literals; MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCloning); MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorInSteadyState); MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeDecisionPersisted); -MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCompletion) +MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCompletion); +MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeStartingErrorFlow); +MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforePersistingStateTransition); const std::string kReshardingCoordinatorActiveIndexName = "ReshardingCoordinatorActiveIndex"; const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); @@ -808,7 +810,7 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingCoordinatorService::constructInstance( BSONObj initialState) const { - return std::make_shared<ReshardingCoordinator>(std::move(initialState)); + return std::make_shared<ReshardingCoordinator>(this, std::move(initialState)); } ExecutorFuture<void> ReshardingCoordinatorService::_rebuildService( @@ -836,9 +838,11 @@ ExecutorFuture<void> ReshardingCoordinatorService::_rebuildService( .on(**executor, CancellationToken::uncancelable()); } -ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const BSONObj& state) +ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator( + const ReshardingCoordinatorService* coordinatorService, const BSONObj& state) : PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(), _id(state["_id"].wrap().getOwned()), + _coordinatorService(coordinatorService), _coordinatorDoc(ReshardingCoordinatorDocument::parse( IDLParserErrorContext("ReshardingCoordinatorStateDoc"), state)) { _reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>(); @@ -914,9 +918,9 @@ BSONObj createFlushReshardingStateChangeCommand(const NamespaceString& nss) { BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); } -SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( - std::shared_ptr<executor::ScopedTaskExecutor> executor, - const CancellationToken& token) noexcept { +ExecutorFuture<ReshardingCoordinatorDocument> +ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDecision( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept { return ExecutorFuture<void>(**executor) .then([this, executor] { _insertCoordDocAndChangeOrigCollEntry(); }) .then([this, executor] { _calculateParticipantsAndChunksThenWriteToDisk(); }) @@ -926,18 +930,45 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); }) .then([this, executor] { _tellAllDonorsToRefresh(executor); }) - .then([this, executor, token] { + .then([this, executor] { // TODO SERVER-53916 to verify that the following runs only after the last recipient // shard reports to the coordinator that it has entered "steady-state". - return waitForMinimumOperationDuration(**executor, token); + return waitForMinimumOperationDuration(**executor, _ctHolder->getAbortToken()); }) .then([this, executor] { return _awaitAllRecipientsFinishedApplying(executor); }) .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); }) - .then([this](const ReshardingCoordinatorDocument& updatedCoordinatorDoc) { + .onError([this, self = shared_from_this(), executor]( + Status status) -> StatusWith<ReshardingCoordinatorDocument> { + { + auto opCtx = cc().makeOperationContext(); + reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get()); + } + + if (_ctHolder->isSteppingOrShuttingDown()) { + return status; + } + + // If the abort cancellation token was triggered, implying that a user ran the abort + // command, override with the abort error code. + if (_ctHolder->isAborted()) { + status = {ErrorCodes::ReshardCollectionAborted, status.reason()}; + } + + _onAbort(executor, status); + return status; + }); +} + +ExecutorFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishReshardOperation( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const ReshardingCoordinatorDocument& updatedCoordinatorDoc) noexcept { + return ExecutorFuture<void>(**executor) + .then([this, self = shared_from_this(), executor, updatedCoordinatorDoc] { return _persistDecision(updatedCoordinatorDoc); }) - .then([this, executor] { + .then([this, self = shared_from_this(), executor] { _tellAllParticipantsToRefresh(_coordinatorDoc.getSourceNss(), executor); }) .then([this, self = shared_from_this(), executor] { @@ -946,39 +977,29 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( // keep 'this' pointer alive for the remaining callbacks. return _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(executor); }) - .onError([this, self = shared_from_this(), token, executor](Status status) { + .onError([this, self = shared_from_this(), executor](Status status) { { - stdx::lock_guard<Latch> lg(_mutex); - if (_completionPromise.getFuture().isReady()) { - // interrupt() was called before we got here. - return status; - } + auto opCtx = cc().makeOperationContext(); + reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get()); } - auto nss = _coordinatorDoc.getSourceNss(); - - LOGV2(4956902, - "Resharding failed", - "namespace"_attr = nss.ns(), - "newShardKeyPattern"_attr = _coordinatorDoc.getReshardingKey(), - "error"_attr = status); - - if (_coordinatorDoc.getState() == CoordinatorStateEnum::kUnused) { + if (_ctHolder->isSteppingOrShuttingDown()) { return status; } - _updateCoordinatorDocStateAndCatalogEntries( - CoordinatorStateEnum::kError, _coordinatorDoc, boost::none, boost::none, status); - - _tellAllParticipantsToRefresh(nss, executor); - - // Wait for all participants to acknowledge the operation reached an unrecoverable - // error. - future_util::withCancellation( - _reshardingCoordinatorObserver->awaitAllParticipantsDoneAborting(), token) - .get(); - - return status; + LOGV2_FATAL(5277000, + "Unrecoverable error past the point resharding was guaranteed to succeed", + "error"_attr = redact(status)); + }); +} +SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& stepdownToken) noexcept { + _ctHolder = std::make_unique<CoordinatorCancellationTokenHolder>(stepdownToken); + return _runUntilReadyToPersistDecision(executor) + .then([this, self = shared_from_this(), executor]( + const ReshardingCoordinatorDocument& updatedCoordinatorDoc) { + return _persistDecisionAndFinishReshardOperation(executor, updatedCoordinatorDoc); }) .onCompletion([this, self = shared_from_this()](Status status) { // TODO SERVER-53914 depending on where we load metrics at the start of the operation, @@ -990,31 +1011,64 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( } auto opCtx = cc().makeOperationContext(); - reshardingPauseCoordinatorBeforeCompletion.pauseWhileSet(opCtx.get()); + reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled( + opCtx.get(), _ctHolder->getStepdownToken()); stdx::lock_guard<Latch> lg(_mutex); - if (_completionPromise.getFuture().isReady()) { - // interrupt() was called before we got here. - return; - } - if (status.isOK()) { _completionPromise.emplaceValue(); } else { _completionPromise.setError(status); } + + return status; + }) + .thenRunOn(_coordinatorService->getInstanceCleanupExecutor()) + .onCompletion([this, self = shared_from_this()](Status status) { + // On stepdown or shutdown, the _scopedExecutor may have already been shut down. + // Schedule cleanup work on the parent executor. + if (!status.isOK()) { + { + stdx::lock_guard<Latch> lg(_mutex); + if (!_completionPromise.getFuture().isReady()) { + _completionPromise.setError(status); + } + } + _reshardingCoordinatorObserver->interrupt(status); + } }) .semi(); } -void ReshardingCoordinatorService::ReshardingCoordinator::interrupt(Status status) { - // Resolve any unresolved promises to avoid hanging. - _reshardingCoordinatorObserver->interrupt(status); +void ReshardingCoordinatorService::ReshardingCoordinator::_onAbort( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const Status& status) { + auto nss = _coordinatorDoc.getSourceNss(); - stdx::lock_guard<Latch> lg(_mutex); - if (!_completionPromise.getFuture().isReady()) { - _completionPromise.setError(status); + LOGV2(4956902, + "Resharding failed", + "namespace"_attr = nss.ns(), + "newShardKeyPattern"_attr = _coordinatorDoc.getReshardingKey(), + "error"_attr = status); + + if (_coordinatorDoc.getState() == CoordinatorStateEnum::kUnused) { + return; } + + _updateCoordinatorDocStateAndCatalogEntries( + CoordinatorStateEnum::kError, _coordinatorDoc, boost::none, boost::none, status); + + _tellAllParticipantsToRefresh(nss, executor); + + // Wait for all participants to acknowledge the operation reached an unrecoverable + // error. + future_util::withCancellation( + _reshardingCoordinatorObserver->awaitAllParticipantsDoneAborting(), + _ctHolder->getStepdownToken()) + .get(); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::abort() { + _ctHolder->abort(); } boost::optional<BSONObj> ReshardingCoordinatorService::ReshardingCoordinator::reportForCurrentOp( @@ -1140,12 +1194,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat return ExecutorFuture<void>(**executor, Status::OK()); } - return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate() + return future_util::withCancellation( + _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate(), + _ctHolder->getAbortToken()) .thenRunOn(**executor) .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { { auto opCtx = cc().makeOperationContext(); - reshardingPauseCoordinatorBeforeCloning.pauseWhileSet(opCtx.get()); + reshardingPauseCoordinatorBeforeCloning.pauseWhileSetAndNotCanceled( + opCtx.get(), _ctHolder->getAbortToken()); } auto highestMinFetchTimestamp = @@ -1165,7 +1222,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished return ExecutorFuture<void>(**executor, Status::OK()); } - return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning() + return future_util::withCancellation( + _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning(), + _ctHolder->getAbortToken()) .thenRunOn(**executor) .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kApplying, @@ -1180,12 +1239,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished return ExecutorFuture<void>(**executor, Status::OK()); } - return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying() + return future_util::withCancellation( + _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying(), + _ctHolder->getAbortToken()) .thenRunOn(**executor) .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { { auto opCtx = cc().makeOperationContext(); - reshardingPauseCoordinatorInSteadyState.pauseWhileSet(opCtx.get()); + reshardingPauseCoordinatorInSteadyState.pauseWhileSetAndNotCanceled( + opCtx.get(), _ctHolder->getAbortToken()); } this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kBlockingWrites, @@ -1193,15 +1255,18 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished }); } -SharedSemiFuture<ReshardingCoordinatorDocument> +ExecutorFuture<ReshardingCoordinatorDocument> ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrictConsistency( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_coordinatorDoc.getState() > CoordinatorStateEnum::kBlockingWrites) { // If in recovery, just return the existing _stateDoc. - return _coordinatorDoc; + return ExecutorFuture<ReshardingCoordinatorDocument>(**executor, _coordinatorDoc); } - return _reshardingCoordinatorObserver->awaitAllRecipientsInStrictConsistency(); + return future_util::withCancellation( + _reshardingCoordinatorObserver->awaitAllRecipientsInStrictConsistency(), + _ctHolder->getAbortToken()) + .thenRunOn(**executor); } Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecision( @@ -1214,7 +1279,8 @@ Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisi updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDecisionPersisted); auto opCtx = cc().makeOperationContext(); - reshardingPauseCoordinatorBeforeDecisionPersisted.pauseWhileSet(opCtx.get()); + reshardingPauseCoordinatorBeforeDecisionPersisted.pauseWhileSetAndNotCanceled( + opCtx.get(), _ctHolder->getAbortToken()); // The new epoch and timestamp to use for the resharded collection to indicate that the // collection is a new incarnation of the namespace @@ -1250,9 +1316,12 @@ ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator:: _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection().thenRunOn( **executor)); - return whenAllSucceed(std::move(futures)) + // We only allow the stepdown token to cancel operations after progressing past + // kDecisionPersisted. + return future_util::withCancellation(whenAllSucceed(std::move(futures)), + _ctHolder->getStepdownToken()) .thenRunOn(**executor) - .then([executor](const auto& coordinatorDocsChangedOnDisk) { + .then([this, executor](const auto& coordinatorDocsChangedOnDisk) { auto opCtx = cc().makeOperationContext(); resharding::removeCoordinatorDocAndReshardingFields(opCtx.get(), coordinatorDocsChangedOnDisk[1]); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index 5d939ebdcee..b99accd3050 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -96,10 +96,72 @@ std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntri class ServiceContext; class OperationContext; -constexpr StringData kReshardingCoordinatorServiceName = "ReshardingCoordinatorService"_sd; +/** + * Construct to encapsulate cancellation tokens and related semantics on the ReshardingCoordinator. + */ +class CoordinatorCancellationTokenHolder { +public: + CoordinatorCancellationTokenHolder(CancellationToken stepdownToken) + : _stepdownToken(stepdownToken), + _abortSource(CancellationSource(stepdownToken)), + _abortToken(_abortSource.token()) {} + + /** + * Returns whether the any token has been canceled. + */ + bool isCanceled() { + return _stepdownToken.isCanceled() || _abortToken.isCanceled(); + } + + /** + * Returns whether the abort token has been canceled, indicating that the resharding operation + * was explicitly aborted by an external user. + */ + bool isAborted() { + return !_stepdownToken.isCanceled() && _abortToken.isCanceled(); + } + + /** + * Returns whether the stepdownToken has been canceled, indicating that the shard's underlying + * replica set node is stepping down or shutting down. + */ + bool isSteppingOrShuttingDown() { + return _stepdownToken.isCanceled(); + } + + /** + * Cancels the source created by this class, in order to indicate to holders of the abortToken + * that the resharding operation has been aborted. + */ + void abort() { + _abortSource.cancel(); + } + + const CancellationToken& getStepdownToken() { + return _stepdownToken; + } + + const CancellationToken& getAbortToken() { + return _abortToken; + } + +private: + // The token passed in by the PrimaryOnlyService runner that is canceled when this shard's + // underlying replica set node is stepping down or shutting down. + CancellationToken _stepdownToken; + + // The source created by inheriting from the stepdown token. + CancellationSource _abortSource; + + // The token to wait on in cases where a user wants to wait on either a resharding operation + // being aborted or the replica set node stepping/shutting down. + CancellationToken _abortToken; +}; class ReshardingCoordinatorService final : public repl::PrimaryOnlyService { public: + static constexpr StringData kServiceName = "ReshardingCoordinatorService"_sd; + explicit ReshardingCoordinatorService(ServiceContext* serviceContext) : PrimaryOnlyService(serviceContext) {} ~ReshardingCoordinatorService() = default; @@ -107,7 +169,7 @@ public: class ReshardingCoordinator; StringData getServiceName() const override { - return kReshardingCoordinatorServiceName; + return kServiceName; } NamespaceString getStateDocumentsNS() const override { @@ -130,13 +192,19 @@ private: class ReshardingCoordinatorService::ReshardingCoordinator final : public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> { public: - explicit ReshardingCoordinator(const BSONObj& state); + explicit ReshardingCoordinator(const ReshardingCoordinatorService* coordinatorService, + const BSONObj& state); ~ReshardingCoordinator(); SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; - void interrupt(Status status) override; + void interrupt(Status status) override {} + + /** + * Attempts to cancel the underlying resharding operation using the abort token. + */ + void abort(); /** * Replace in-memory representation of the CoordinatorDoc @@ -164,6 +232,25 @@ private: }; /** + * Runs resharding up through preparing to persist the decision. + */ + ExecutorFuture<ReshardingCoordinatorDocument> _runUntilReadyToPersistDecision( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept; + + /** + * Runs resharding through persisting the decision until cleanup. + */ + ExecutorFuture<void> _persistDecisionAndFinishReshardOperation( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const ReshardingCoordinatorDocument& updatedCoordinatorDoc) noexcept; + + /** + * Runs cleanup logic that only applies to abort. + */ + void _onAbort(const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const Status& status); + + /** * Does the following writes: * 1. Inserts the coordinator document into config.reshardingOperations * 2. Adds reshardingFields to the config.collections entry for the original collection @@ -210,7 +297,7 @@ private: * Waits on _reshardingCoordinatorObserver to notify that all recipients have entered * strict-consistency. */ - SharedSemiFuture<ReshardingCoordinatorDocument> _awaitAllRecipientsInStrictConsistency( + ExecutorFuture<ReshardingCoordinatorDocument> _awaitAllRecipientsInStrictConsistency( const std::shared_ptr<executor::ScopedTaskExecutor>& executor); /** @@ -271,6 +358,9 @@ private: // collection. The object looks like: {_id: 'reshardingUUID'} const InstanceID _id; + // The primary-only service instance corresponding to the coordinator instance. Not owned. + const ReshardingCoordinatorService* const _coordinatorService; + // Observes writes that indicate state changes for this resharding operation and notifies // 'this' when all donors/recipients have entered some state so that 'this' can transition // states. @@ -279,6 +369,9 @@ private: // The updated coordinator state document. ReshardingCoordinatorDocument _coordinatorDoc; + // Holds the cancellation tokens relevant to the ReshardingCoordinator. + std::unique_ptr<CoordinatorCancellationTokenHolder> _ctHolder; + // Protects promises below. mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_mutex"); diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index 341bbbb92f6..fb5a5958c13 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -46,6 +46,8 @@ namespace mongo { namespace { +using namespace fmt::literals; + const ReshardingDonorOplogId kResumeFromBeginning{Timestamp::min(), Timestamp::min()}; repl::MutableOplogEntry makeOplog(const NamespaceString& nss, @@ -191,7 +193,8 @@ public: } private: - const NamespaceString _oplogNss{"config.localReshardingOplogBuffer.xxx.yyy"}; + const NamespaceString _oplogNss{"{}.{}xxx.yyy"_format( + NamespaceString::kConfigDb, NamespaceString::kReshardingLocalOplogBufferPrefix)}; const NamespaceString _crudNss{"test.foo"}; const UUID _uuid{UUID::gen()}; diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp index fae4b4d4eac..2b6e9da1ded 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp @@ -47,7 +47,7 @@ namespace { std::shared_ptr<ReshardingCoordinatorObserver> getReshardingCoordinatorObserver( OperationContext* opCtx, const BSONObj& reshardingId) { auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); - auto service = registry->lookupServiceByName(kReshardingCoordinatorServiceName); + auto service = registry->lookupServiceByName(ReshardingCoordinatorService::kServiceName); auto instance = ReshardingCoordinatorService::ReshardingCoordinator::lookup(opCtx, service, reshardingId); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 29d02d18c2f..0e8cde51878 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -65,6 +65,9 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(removeRecipientDocFailpoint); +MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientBeforeCloning); +MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringCloning); +MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringOplogApplication); namespace { @@ -471,6 +474,11 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin auto* serviceContext = Client::getCurrent()->getServiceContext(); auto fetchTimestamp = *_fetchTimestamp; + { + auto opCtx = cc().makeOperationContext(); + reshardingPauseRecipientBeforeCloning.pauseWhileSet(opCtx.get()); + } + _collectionCloner = std::make_unique<ReshardingCollectionCloner>( std::make_unique<ReshardingCollectionCloner::Env>(_metrics()), ShardKeyPattern{_metadata.getReshardingKey()}, @@ -526,35 +534,42 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin })); } - return whenAllSucceed( - _collectionCloner - ->run(**executor, - abortToken, - CancelableOperationContextFactory(abortToken, _markKilledExecutor)) - .thenRunOn(**executor), - (*executor) - ->sleepFor(_minimumOperationDuration, abortToken) - .then([this, executor, abortToken] { - if (_txnCloners.empty()) { - return SemiFuture<void>::makeReady(); - } - - auto serviceContext = Client::getCurrent()->getServiceContext(); - - std::vector<ExecutorFuture<void>> txnClonerFutures; - for (auto&& txnCloner : _txnCloners) { - txnClonerFutures.push_back( - txnCloner->run(serviceContext, **executor, abortToken)); - } - - return whenAllSucceed(std::move(txnClonerFutures)); - })) - .thenRunOn(**executor) - .then([this] { - // ReshardingTxnCloners must complete before the recipient transitions to kApplying to - // avoid errors caused by donor shards unpinning the fetchTimestamp. - _transitionState(RecipientStateEnum::kApplying); - }); + auto cloneFinishFuture = + whenAllSucceed(_collectionCloner + ->run(**executor, + abortToken, + CancelableOperationContextFactory(abortToken, _markKilledExecutor)) + .thenRunOn(**executor), + (*executor) + ->sleepFor(_minimumOperationDuration, abortToken) + .then([this, executor, abortToken] { + if (_txnCloners.empty()) { + return SemiFuture<void>::makeReady(); + } + + auto serviceContext = Client::getCurrent()->getServiceContext(); + + std::vector<ExecutorFuture<void>> txnClonerFutures; + for (auto&& txnCloner : _txnCloners) { + txnClonerFutures.push_back( + txnCloner->run(serviceContext, **executor, abortToken)); + } + + return whenAllSucceed(std::move(txnClonerFutures)); + })) + .thenRunOn(**executor) + .then([this] { + // ReshardingTxnCloners must complete before the recipient transitions to kApplying + // to avoid errors caused by donor shards unpinning the fetchTimestamp. + _transitionState(RecipientStateEnum::kApplying); + }); + + { + auto opCtx = cc().makeOperationContext(); + reshardingPauseRecipientDuringCloning.pauseWhileSet(opCtx.get()); + } + + return cloneFinishFuture; } ExecutorFuture<void> @@ -660,6 +675,11 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: })); } + { + auto opCtx = cc().makeOperationContext(); + reshardingPauseRecipientDuringOplogApplication.pauseWhileSet(opCtx.get()); + } + return whenAllSucceed(std::move(futuresToWaitOn)) .thenRunOn(**executor) .then([stashCollections] { diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 24e58751dfd..3d0ac9e264e 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -146,6 +146,7 @@ env.Library( 'database_version.cpp', 'database_version.idl', 'mongod_and_mongos_server_parameters.idl', + 'request_types/abort_reshard_collection.idl', 'request_types/add_shard_request_type.cpp', 'request_types/add_shard_to_zone_request_type.cpp', 'request_types/balance_chunk_request_type.cpp', diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 27f57770b88..4028d7648c0 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -29,6 +29,7 @@ env.Library( 'cluster_available_query_options_cmd.cpp', 'cluster_balancer_collection_status_cmd.cpp', 'cluster_build_info.cpp', + 'cluster_abort_reshard_collection_cmd.cpp', 'cluster_clear_jumbo_flag_cmd.cpp', 'cluster_coll_stats_cmd.cpp', 'cluster_collection_mod_cmd.cpp', diff --git a/src/mongo/s/commands/cluster_abort_reshard_collection_cmd.cpp b/src/mongo/s/commands/cluster_abort_reshard_collection_cmd.cpp new file mode 100644 index 00000000000..cfe23077441 --- /dev/null +++ b/src/mongo/s/commands/cluster_abort_reshard_collection_cmd.cpp @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/abort_reshard_collection_gen.h" + +namespace mongo { +namespace { + +class AbortReshardCollectionCommand : public TypedCommand<AbortReshardCollectionCommand> { +public: + using Request = AbortReshardCollection; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + const NamespaceString& nss = ns(); + + LOGV2(5403500, "Beginning reshard abort operation", "namespace"_attr = ns()); + + ConfigsvrAbortReshardCollection configsvrAbortReshardCollection(nss); + configsvrAbortReshardCollection.setDbName(request().getDbName()); + + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + "admin", + CommandHelpers::appendMajorityWriteConcern( + configsvrAbortReshardCollection.toBSON({}), opCtx->getWriteConcern()), + Shard::RetryPolicy::kIdempotent)); + uassertStatusOK(cmdResponse.commandStatus); + uassertStatusOK(cmdResponse.writeConcernStatus); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return false; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forExactNamespace(ns()), + ActionType::reshardCollection)); + } + }; + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + std::string help() const override { + return "Abort any in-progress resharding operations for this collection."; + } +}; + +MONGO_REGISTER_TEST_COMMAND(AbortReshardCollectionCommand); + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/request_types/abort_reshard_collection.idl b/src/mongo/s/request_types/abort_reshard_collection.idl new file mode 100644 index 00000000000..301a3e27cc6 --- /dev/null +++ b/src/mongo/s/request_types/abort_reshard_collection.idl @@ -0,0 +1,53 @@ +# Copyright (C) 2021-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# abortReshardCollection IDL file + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +commands: + abortReshardCollection: + description: "The public abortReshardCollection command on mongos." + command_name: abortReshardCollection + strict: false + namespace: type + api_version: "" + type: namespacestring + + _configsvrAbortReshardCollection: + command_name: _configsvrAbortReshardCollection + cpp_name: ConfigsvrAbortReshardCollection + description: "The internal abortReshardCollection command on the config server." + strict: false + namespace: type + api_version: "" + type: namespacestring |