From 0d5fd57f9e55915550dd7d13340e2944c169c6e2 Mon Sep 17 00:00:00 2001 From: Abdul Qadeer Date: Mon, 18 Jul 2022 11:36:54 +0000 Subject: SERVER-61985 SERVER-67193 Make reshardingPauseCoordinatorBeforeCompletion failpoint pause conditionally --- .../setfcv_aborts_reshard_collection.js | 154 +++++++++++++++++++++ .../setfcv_reshard_collection.js | 129 ----------------- jstests/sharding/libs/resharding_test_fixture.js | 6 +- .../resharding_abort_in_preparing_to_donate.js | 21 ++- .../resharding_nonblocking_coordinator_rebuild.js | 14 +- jstests/sharding/resharding_prohibited_commands.js | 19 ++- .../resharding/resharding_coordinator_service.cpp | 11 +- 7 files changed, 204 insertions(+), 150 deletions(-) create mode 100644 jstests/multiVersion/genericSetFCVUsage/setfcv_aborts_reshard_collection.js delete mode 100644 jstests/multiVersion/genericSetFCVUsage/setfcv_reshard_collection.js diff --git a/jstests/multiVersion/genericSetFCVUsage/setfcv_aborts_reshard_collection.js b/jstests/multiVersion/genericSetFCVUsage/setfcv_aborts_reshard_collection.js new file mode 100644 index 00000000000..82d7d62b8f7 --- /dev/null +++ b/jstests/multiVersion/genericSetFCVUsage/setfcv_aborts_reshard_collection.js @@ -0,0 +1,154 @@ +/** + * Tests that setFeatureCompatibilityVersion command aborts an ongoing reshardCollection command + */ +(function() { +"use strict"; + +load("jstests/libs/parallel_shell_helpers.js"); +load("jstests/sharding/libs/resharding_test_fixture.js"); +load('jstests/libs/discover_topology.js'); +load('jstests/libs/fail_point_util.js'); +load('jstests/sharding/libs/sharded_transactions_helpers.js'); + +function runTest(forcePooledConnectionsDropped) { + const reshardingTest = + new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true}); + reshardingTest.setup(); + + const donorShardNames = reshardingTest.donorShardNames; + let inputCollection = reshardingTest.createShardedCollection({ + ns: "reshardingDb.testColl", + shardKeyPattern: {oldKey: 1}, + chunks: [ + {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]}, + {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}, + ], + }); + + const sourceNamespace = inputCollection.getFullName(); + + let mongos = inputCollection.getMongo(); + + for (let x = 0; x < 1000; x++) { + assert.commandWorked(inputCollection.insert({oldKey: x, newKey: -1 * x})); + } + + const topology = DiscoverTopology.findConnectedNodes(mongos); + const config = new Mongo(topology.configsvr.primary); + + let pauseBeforeTellDonorToRefresh; + let pauseBeforeCloseCxns; + if (forcePooledConnectionsDropped) { + pauseBeforeTellDonorToRefresh = configureFailPoint(config, "pauseBeforeTellDonorToRefresh"); + pauseBeforeCloseCxns = configureFailPoint(config, "pauseBeforeCloseCxns"); + } + + function checkCoordinatorDoc() { + assert.soon(() => { + const coordinatorDoc = + mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace}); + + return coordinatorDoc === null || coordinatorDoc.state === "aborting"; + }); + } + + const recipientShardNames = reshardingTest.recipientShardNames; + let awaitShell; + reshardingTest.withReshardingInBackground( + { + newShardKeyPattern: {newKey: 1}, + newChunks: [ + {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]}, + {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]}, + ], + }, + () => { + // Wait for config server to have started resharding before sending setFCV, otherwise + // there is a possible race where setFCV can be sent to the config before + // configsvrReshard. + assert.soon(() => { + return mongos.getDB('config').reshardingOperations.findOne() != null; + }, "timed out waiting for coordinator doc to be written", 30 * 1000); + + if (forcePooledConnectionsDropped) { + pauseBeforeTellDonorToRefresh.wait(); + } + + let codeToRunInParallelShell = + `{ + assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + }`; + + awaitShell = startParallelShell(codeToRunInParallelShell, mongos.port); + + if (forcePooledConnectionsDropped) { + pauseBeforeCloseCxns.wait(); + + let pauseBeforeMarkKeepOpen = configureFailPoint(config, "pauseBeforeMarkKeepOpen"); + + pauseBeforeTellDonorToRefresh.off(); + + jsTestLog("Wait to hit pauseBeforeMarkKeepOpen failpoint"); + pauseBeforeMarkKeepOpen.wait(); + + jsTestLog("Set hitDropConnections failpoint"); + let hitDropConnections = configureFailPoint(config, "finishedDropConnections"); + pauseBeforeCloseCxns.off(); + + waitForFailpoint("Hit finishedDropConnections", 1); + clearRawMongoProgramOutput(); + + jsTestLog("Turn off hitDropConnections failpoint"); + hitDropConnections.off(); + + jsTestLog("Turn off pause before pauseBeforeMarkKeepOpen failpoint"); + pauseBeforeMarkKeepOpen.off(); + } + checkCoordinatorDoc(); + }, + { + expectedErrorCode: [ + ErrorCodes.ReshardCollectionAborted, + ErrorCodes.Interrupted, + ] + }); + + awaitShell(); + + reshardingTest.withReshardingInBackground( + { + newShardKeyPattern: {newKey: 1}, + newChunks: [ + {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]}, + {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]}, + ], + }, + () => { + assert.soon(() => { + return mongos.getDB('config').reshardingOperations.findOne() != null; + }, "timed out waiting for coordinator doc to be written", 30 * 1000); + awaitShell = startParallelShell(funWithArgs(function(latestFCV) { + assert.commandWorked(db.adminCommand( + {setFeatureCompatibilityVersion: latestFCV})); + }, latestFCV), mongos.port); + checkCoordinatorDoc(); + }, + { + expectedErrorCode: [ + ErrorCodes.CommandNotSupported, + ErrorCodes.ReshardCollectionAborted, + ErrorCodes.Interrupted, + ] + }); + + awaitShell(); + reshardingTest.teardown(); +} + +// This test case forces the setFCV command to call dropsConnections while the coordinator is in +// the process of establishing connections to the participant shards in order to ensure that the +// resharding operation does not stall. +runTest(true); + +runTest(false); +})(); diff --git a/jstests/multiVersion/genericSetFCVUsage/setfcv_reshard_collection.js b/jstests/multiVersion/genericSetFCVUsage/setfcv_reshard_collection.js deleted file mode 100644 index 7fa818f7f99..00000000000 --- a/jstests/multiVersion/genericSetFCVUsage/setfcv_reshard_collection.js +++ /dev/null @@ -1,129 +0,0 @@ -(function() { -"use strict"; - -load("jstests/sharding/libs/resharding_test_fixture.js"); -load('jstests/libs/discover_topology.js'); -load('jstests/libs/fail_point_util.js'); -load('jstests/sharding/libs/sharded_transactions_helpers.js'); - -function runTest(forcePooledConnectionsDropped) { - const reshardingTest = - new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true}); - reshardingTest.setup(); - - const donorShardNames = reshardingTest.donorShardNames; - let inputCollection = reshardingTest.createShardedCollection({ - ns: "reshardingDb.testColl", - shardKeyPattern: {oldKey: 1}, - chunks: [ - {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]}, - {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}, - ], - }); - - let mongos = inputCollection.getMongo(); - - for (let x = 0; x < 1000; x++) { - assert.commandWorked(inputCollection.insert({oldKey: x, newKey: -1 * x})); - } - - const topology = DiscoverTopology.findConnectedNodes(mongos); - const config = new Mongo(topology.configsvr.primary); - - let pauseBeforeTellDonorToRefresh; - let pauseBeforeCloseCxns; - if (forcePooledConnectionsDropped) { - pauseBeforeTellDonorToRefresh = configureFailPoint(config, "pauseBeforeTellDonorToRefresh"); - pauseBeforeCloseCxns = configureFailPoint(config, "pauseBeforeCloseCxns"); - } - - const recipientShardNames = reshardingTest.recipientShardNames; - reshardingTest.withReshardingInBackground( - { - newShardKeyPattern: {newKey: 1}, - newChunks: [ - {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]}, - {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]}, - ], - }, - () => { - // Wait for config server to have started resharding before sending setFCV, otherwise - // there is a possible race where setFCV can be sent to the config before - // configsvrReshard. - assert.soon(() => { - return mongos.getDB('config').reshardingOperations.findOne() != null; - }, "timed out waiting for coordinator doc to be written", 30 * 1000); - - if (forcePooledConnectionsDropped) { - pauseBeforeTellDonorToRefresh.wait(); - } - - let codeToRunInParallelShell = - `{ - assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); - }`; - - let awaitShell = startParallelShell(codeToRunInParallelShell, mongos.port); - - if (forcePooledConnectionsDropped) { - pauseBeforeCloseCxns.wait(); - - let pauseBeforeMarkKeepOpen = configureFailPoint(config, "pauseBeforeMarkKeepOpen"); - - pauseBeforeTellDonorToRefresh.off(); - - jsTestLog("Wait to hit pauseBeforeMarkKeepOpen failpoint"); - pauseBeforeMarkKeepOpen.wait(); - - jsTestLog("Set hitDropConnections failpoint"); - let hitDropConnections = configureFailPoint(config, "finishedDropConnections"); - pauseBeforeCloseCxns.off(); - - waitForFailpoint("Hit finishedDropConnections", 1); - clearRawMongoProgramOutput(); - - jsTestLog("Turn off hitDropConnections failpoint"); - hitDropConnections.off(); - - jsTestLog("Turn off pause before pauseBeforeMarkKeepOpen failpoint"); - pauseBeforeMarkKeepOpen.off(); - } - - awaitShell(); - }, - { - expectedErrorCode: [ - ErrorCodes.ReshardCollectionAborted, - ErrorCodes.Interrupted, - ] - }); - - reshardingTest.withReshardingInBackground( - { - newShardKeyPattern: {newKey: 1}, - newChunks: [ - {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]}, - {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]}, - ], - }, - () => { - assert.commandWorked(mongos.adminCommand({setFeatureCompatibilityVersion: latestFCV})); - }, - { - expectedErrorCode: [ - ErrorCodes.CommandNotSupported, - ErrorCodes.ReshardCollectionAborted, - ErrorCodes.Interrupted, - ] - }); - - reshardingTest.teardown(); -} - -// This test case forces the setFCV command to call dropsConnections while the coordinator is in -// the process of establishing connections to the participant shards in order to ensure that the -// resharding operation does not stall. -runTest(true); - -runTest(false); -})(); diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js index 128ea61ebf1..30ed771ab98 100644 --- a/jstests/sharding/libs/resharding_test_fixture.js +++ b/jstests/sharding/libs/resharding_test_fixture.js @@ -321,8 +321,10 @@ var ReshardingTest = class { configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeBlockingWrites"); this._pauseCoordinatorBeforeDecisionPersistedFailpoint = configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeDecisionPersisted"); - this._pauseCoordinatorBeforeCompletionFailpoint = configureFailPoint( - configPrimary, "reshardingPauseCoordinatorBeforeCompletion", {}, {times: 1}); + this._pauseCoordinatorBeforeCompletionFailpoint = + configureFailPoint(configPrimary, + "reshardingPauseCoordinatorBeforeCompletion", + {"sourceNamespace": this._ns}); this._commandDoneSignal = new CountDownLatch(1); diff --git a/jstests/sharding/resharding_abort_in_preparing_to_donate.js b/jstests/sharding/resharding_abort_in_preparing_to_donate.js index 167dcd3c67a..711dbb71860 100644 --- a/jstests/sharding/resharding_abort_in_preparing_to_donate.js +++ b/jstests/sharding/resharding_abort_in_preparing_to_donate.js @@ -11,6 +11,7 @@ "use strict"; load("jstests/libs/discover_topology.js"); load("jstests/sharding/libs/resharding_test_fixture.js"); +load('jstests/libs/parallel_shell_helpers.js'); const originalCollectionNs = "reshardingDb.coll"; @@ -36,6 +37,7 @@ const configsvr = new Mongo(topology.configsvr.nodes[0]); const pauseAfterPreparingToDonateFP = configureFailPoint(configsvr, "reshardingPauseCoordinatorAfterPreparingToDonate"); +let awaitAbort; reshardingTest.withReshardingInBackground( { @@ -47,13 +49,30 @@ reshardingTest.withReshardingInBackground( }, () => { pauseAfterPreparingToDonateFP.wait(); - assert.commandWorked(mongos.adminCommand({abortReshardCollection: originalCollectionNs})); + assert.neq(null, mongos.getCollection("config.reshardingOperations").findOne({ + ns: originalCollectionNs + })); // Signaling abort will cause the // pauseAfterPreparingToDonateFP to throw, implicitly // allowing the coordinator to make progress without // explicitly turning off the failpoint. + awaitAbort = + startParallelShell(funWithArgs(function(sourceNamespace) { + db.adminCommand({abortReshardCollection: sourceNamespace}); + }, originalCollectionNs), mongos.port); + // Wait for the coordinator to remove coordinator document from config.reshardingOperations + // as a result of the recipients and donors transitioning to done due to abort. + assert.soon(() => { + const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ + ns: originalCollectionNs + }); + return coordinatorDoc === null || coordinatorDoc.state === "aborting"; + }); }, {expectedErrorCode: ErrorCodes.ReshardCollectionAborted}); + +awaitAbort(); pauseAfterPreparingToDonateFP.off(); + reshardingTest.teardown(); })(); diff --git a/jstests/sharding/resharding_nonblocking_coordinator_rebuild.js b/jstests/sharding/resharding_nonblocking_coordinator_rebuild.js index dac1afc0014..2ee6c76aaf1 100644 --- a/jstests/sharding/resharding_nonblocking_coordinator_rebuild.js +++ b/jstests/sharding/resharding_nonblocking_coordinator_rebuild.js @@ -109,19 +109,7 @@ reshardingTest.withReshardingInBackground( } }, { - // As a result of the elections intentionally triggered on the config server replica sets, - // the primary shard of the database may retry the _configsvrReshardCollection command. It - // is possible for the resharding operation from the first _configsvrReshardCollection - // command to have entirely finished executing to the point of removing the coordinator - // state document. A retry of the _configsvrReshardCollection command in this situation will - // lead to a second resharding operation to run. The second resharding operation will have - // the duplicate documents cloned by the ReshardingCollectionCloner rather than applied by - // the ReshardingOplogApplier as intended. This results in the reshardCollection command - // failing with a DuplicateKey error rather than the error code for the stash collections - // being non-empty. The recipient must have been able to successfully update its state to - // "applying" in the first resharding operation even when the ReshardingCoordinatorService - // had yet to be rebuilt so we accept DuplicateKey as an error too. - expectedErrorCode: [5356800, ErrorCodes.DuplicateKey], + expectedErrorCode: 5356800, }); reshardingTest.teardown(); diff --git a/jstests/sharding/resharding_prohibited_commands.js b/jstests/sharding/resharding_prohibited_commands.js index d06a9561d2a..9f7d73b57c3 100644 --- a/jstests/sharding/resharding_prohibited_commands.js +++ b/jstests/sharding/resharding_prohibited_commands.js @@ -118,6 +118,7 @@ const waitUntilReshardingInitializedOnDonor = () => { * @param {Function} config.setup * @param {AfterReshardingCallback} afterReshardingFn */ + const withReshardingInBackground = (duringReshardingFn, {setup = () => {}, expectedErrorCode, afterReshardingFn = () => {}} = {}) => { @@ -132,22 +133,34 @@ const withReshardingInBackground = }, duringReshardingFn, {expectedErrorCode: expectedErrorCode, afterReshardingFn: afterReshardingFn}); - assertCommandsSucceedAfterReshardingOpFinishes(mongos.getDB(databaseName)); assert.commandWorked(sourceCollection.dropIndex(indexCreatedByTest)); }; // Tests that the prohibited commands work if the resharding operation is aborted. +let awaitAbort; withReshardingInBackground(() => { waitUntilReshardingInitializedOnDonor(); + assert.neq(null, + mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace})); + awaitAbort = startParallelShell(funWithArgs(function(sourceNamespace) { + db.adminCommand({abortReshardCollection: sourceNamespace}); + }, sourceNamespace), mongos.port); + // Wait for the coordinator to remove coordinator document from config.reshardingOperations + // as a result of the recipients and donors transitioning to done due to abort. + assert.soon(() => { + const coordinatorDoc = + mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace}); - assert.commandWorked(mongos.adminCommand({abortReshardCollection: sourceNamespace})); + return coordinatorDoc === null || coordinatorDoc.state === "aborting"; + }); }, { expectedErrorCode: ErrorCodes.ReshardCollectionAborted, }); +awaitAbort(); // Tests that the prohibited commands succeed if the resharding operation succeeds. During the -// operation it makes sures that the prohibited commands are rejected during the resharding +// operation it makes sure that the prohibited commands are rejected during the resharding // operation. withReshardingInBackground(() => { waitUntilReshardingInitializedOnDonor(); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 80635da8379..6586961d4d6 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -1364,8 +1364,15 @@ SemiFuture ReshardingCoordinatorService::ReshardingCoordinator::run( }) .onCompletion([this, executor](Status status) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled( - opCtx.get(), _ctHolder->getStepdownToken()); + reshardingPauseCoordinatorBeforeCompletion.executeIf( + [&](const BSONObj&) { + reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled( + opCtx.get(), _ctHolder->getStepdownToken()); + }, + [&](const BSONObj& data) { + auto ns = data.getStringField("sourceNamespace"); + return ns.empty() ? true : ns.toString() == _coordinatorDoc.getSourceNss().ns(); + }); { auto lg = stdx::lock_guard(_fulfillmentMutex); -- cgit v1.2.1