diff options
Diffstat (limited to 'jstests/sharding/libs/resharding_test_fixture.js')
-rw-r--r-- | jstests/sharding/libs/resharding_test_fixture.js | 65 |
1 files changed, 43 insertions, 22 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js index 13d39674f0f..618a12a9ba5 100644 --- a/jstests/sharding/libs/resharding_test_fixture.js +++ b/jstests/sharding/libs/resharding_test_fixture.js @@ -85,11 +85,11 @@ var ReshardingTest = class { /** @private */ this._newShardKey = undefined; /** @private */ - this._pauseCoordinatorBeforeBlockingWrites = undefined; + this._pauseCoordinatorBeforeBlockingWritesFailpoints = []; /** @private */ - this._pauseCoordinatorBeforeDecisionPersistedFailpoint = undefined; + this._pauseCoordinatorBeforeDecisionPersistedFailpoints = []; /** @private */ - this._pauseCoordinatorBeforeCompletionFailpoint = undefined; + this._pauseCoordinatorBeforeCompletionFailpoints = []; /** @private */ this._reshardingThread = undefined; /** @private */ @@ -316,13 +316,17 @@ var ReshardingTest = class { this._newShardKey = Object.assign({}, newShardKeyPattern); - const configPrimary = this._st.configRS.getPrimary(); - this._pauseCoordinatorBeforeBlockingWrites = - configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeBlockingWrites"); - this._pauseCoordinatorBeforeDecisionPersistedFailpoint = - configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeDecisionPersisted"); - this._pauseCoordinatorBeforeCompletionFailpoint = configureFailPoint( - configPrimary, "reshardingPauseCoordinatorBeforeCompletion", {}, {times: 1}); + this._pauseCoordinatorBeforeBlockingWritesFailpoints = []; + this._pauseCoordinatorBeforeDecisionPersistedFailpoints = []; + this._pauseCoordinatorBeforeCompletionFailpoints = []; + this._st.forEachConfigServer((configServer) => { + this._pauseCoordinatorBeforeBlockingWritesFailpoints.push( + configureFailPoint(configServer, "reshardingPauseCoordinatorBeforeBlockingWrites")); + this._pauseCoordinatorBeforeDecisionPersistedFailpoints.push(configureFailPoint( + configServer, "reshardingPauseCoordinatorBeforeDecisionPersisted")); + this._pauseCoordinatorBeforeCompletionFailpoints.push(configureFailPoint( + configServer, "reshardingPauseCoordinatorBeforeCompletion", {}, {times: 1})); + }); this._commandDoneSignal = new CountDownLatch(1); @@ -452,9 +456,9 @@ var ReshardingTest = class { try { fn(); } catch (duringReshardingError) { - for (const fp of [this._pauseCoordinatorBeforeBlockingWrites, - this._pauseCoordinatorBeforeDecisionPersistedFailpoint, - this._pauseCoordinatorBeforeCompletionFailpoint]) { + for (const fp of [...this._pauseCoordinatorBeforeBlockingWritesFailpoints, + ...this._pauseCoordinatorBeforeDecisionPersistedFailpoints, + ...this._pauseCoordinatorBeforeCompletionFailpoints]) { try { fp.off(); } catch (disableFailpointError) { @@ -529,6 +533,19 @@ var ReshardingTest = class { postCheckConsistencyFn = () => {}, postDecisionPersistedFn = () => {}, afterReshardingFn = () => {}) { + // The CSRS primary may have changed as a result of running the duringReshardingFn() + // callback function. The failpoints will only be triggered on the new CSRS primary so we + // detect which node that is here. + const configPrimary = this._st.configRS.getPrimary(); + const primaryIdx = this._pauseCoordinatorBeforeBlockingWritesFailpoints.findIndex( + fp => fp.conn.host === configPrimary.host); + // The CSRS secondaries may be going through replication rollback which closes their + // connections to the test client. We wait for any replication rollbacks to complete and for + // the test client to have reconnected so the failpoints can be turned off on all of the + // nodes later on. + this._st.configRS.awaitSecondaryNodes(); + this._st.configRS.awaitReplication(); + let performCorrectnessChecks = true; if (expectedErrorCode === ErrorCodes.OK) { this._callFunctionSafely(() => { @@ -539,17 +556,18 @@ var ReshardingTest = class { // reshardingPauseCoordinatorBeforeDecisionPersisted failpoint to wait for all of // the recipient shards to have applied through all of the oplog entries from all of // the donor shards. - if (!this._waitForFailPoint(this._pauseCoordinatorBeforeBlockingWrites)) { + if (!this._waitForFailPoint( + this._pauseCoordinatorBeforeBlockingWritesFailpoints[primaryIdx])) { performCorrectnessChecks = false; } - this._pauseCoordinatorBeforeBlockingWrites.off(); + this._pauseCoordinatorBeforeBlockingWritesFailpoints.forEach(fp => fp.off()); // A resharding command that returned a failure will not hit the "Decision // Persisted" failpoint. If the command has returned, don't require that the // failpoint was entered. This ensures that following up by joining the // `_reshardingThread` will succeed. if (!this._waitForFailPoint( - this._pauseCoordinatorBeforeDecisionPersistedFailpoint)) { + this._pauseCoordinatorBeforeDecisionPersistedFailpoints[primaryIdx])) { performCorrectnessChecks = false; } @@ -562,22 +580,24 @@ var ReshardingTest = class { postCheckConsistencyFn(); } - this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off(); + this._pauseCoordinatorBeforeDecisionPersistedFailpoints.forEach(fp => fp.off()); postDecisionPersistedFn(); - this._pauseCoordinatorBeforeCompletionFailpoint.off(); + this._pauseCoordinatorBeforeCompletionFailpoints.forEach(fp => fp.off()); }); } else { this._callFunctionSafely(() => { this.retryOnceOnNetworkError( // - () => this._pauseCoordinatorBeforeBlockingWrites.off()); + () => this._pauseCoordinatorBeforeBlockingWritesFailpoints.forEach( + fp => fp.off())); postCheckConsistencyFn(); this.retryOnceOnNetworkError( - () => this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off()); + () => this._pauseCoordinatorBeforeDecisionPersistedFailpoints.forEach( + fp => fp.off())); postDecisionPersistedFn(); this.retryOnceOnNetworkError( - () => this._pauseCoordinatorBeforeCompletionFailpoint.off()); + () => this._pauseCoordinatorBeforeCompletionFailpoints.forEach(fp => fp.off())); }); } @@ -618,7 +638,8 @@ var ReshardingTest = class { docsExtraAfterResharding: [], docsMissingAfterResharding: [], }, - "existing sharded collection and temporary resharding collection had different" + + "existing sharded collection " + this._ns + + " and temporary resharding collection " + this._tempNs + " had different" + " contents"); } |