diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2021-02-19 20:12:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-23 22:46:57 +0000 |
commit | 6abf5d917e5f5aac880c3201c17bf6749ddd2f55 (patch) | |
tree | 5d9dd5ea2565686cb5269b34a3b15e2b86915efc | |
parent | bd5979b18572c26ac44b36c8e5e32972c12a82d5 (diff) | |
download | mongo-6abf5d917e5f5aac880c3201c17bf6749ddd2f55.tar.gz |
SERVER-54000 Make errors propagate from the ReshardingCoordinator to participants
SERVER-54457 Resharding state transition log messages use correct 'oldState'
18 files changed, 683 insertions, 321 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js index dd50412379f..037346b66a7 100644 --- a/jstests/sharding/libs/resharding_test_fixture.js +++ b/jstests/sharding/libs/resharding_test_fixture.js @@ -64,6 +64,8 @@ var ReshardingTest = class { /** @private */ this._pauseCoordinatorBeforeDecisionPersistedFailpoint = undefined; /** @private */ + this._pauseCoordinatorBeforeCompletionFailpoint = undefined; + /** @private */ this._reshardingThread = undefined; /** @private */ this._isReshardingActive = false; @@ -184,6 +186,8 @@ var ReshardingTest = class { configureFailPoint(configPrimary, "reshardingPauseCoordinatorInSteadyState"); this._pauseCoordinatorBeforeDecisionPersistedFailpoint = configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeDecisionPersisted"); + this._pauseCoordinatorBeforeCompletionFailpoint = + configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeCompletion"); const commandDoneSignal = new CountDownLatch(1); @@ -234,10 +238,13 @@ var ReshardingTest = class { * Callers of interruptReshardingThread() will want to set this to ErrorCodes.Interrupted, for * example. */ - withReshardingInBackground( - {newShardKeyPattern, newChunks}, - duringReshardingFn = (tempNs) => {}, - {expectedErrorCode = ErrorCodes.OK, postCheckConsistencyFn = (tempNs) => {}} = {}) { + withReshardingInBackground({newShardKeyPattern, newChunks}, + duringReshardingFn = (tempNs) => {}, + { + expectedErrorCode = ErrorCodes.OK, + postCheckConsistencyFn = (tempNs) => {}, + postAbortDecisionPersistedFn = () => {} + } = {}) { const commandDoneSignal = this._startReshardingInBackgroundAndAllowCommandFailure( {newShardKeyPattern, newChunks}, expectedErrorCode); @@ -249,7 +256,8 @@ var ReshardingTest = class { this._callFunctionSafely(() => duringReshardingFn(this._tempNs)); this._checkConsistencyAndPostState(expectedErrorCode, - () => postCheckConsistencyFn(this._tempNs)); + () => postCheckConsistencyFn(this._tempNs), + () => postAbortDecisionPersistedFn()); } /** @private */ @@ -279,7 +287,8 @@ var ReshardingTest = class { fn(); } catch (duringReshardingError) { for (const fp of [this._pauseCoordinatorInSteadyStateFailpoint, - this._pauseCoordinatorBeforeDecisionPersistedFailpoint]) { + this._pauseCoordinatorBeforeDecisionPersistedFailpoint, + this._pauseCoordinatorBeforeCompletionFailpoint]) { try { fp.off(); } catch (disableFailpointError) { @@ -321,7 +330,9 @@ var ReshardingTest = class { } /** @private */ - _checkConsistencyAndPostState(expectedErrorCode, postCheckConsistencyFn = () => {}) { + _checkConsistencyAndPostState(expectedErrorCode, + postCheckConsistencyFn = () => {}, + postAbortDecisionPersistedFn = () => {}) { if (expectedErrorCode === ErrorCodes.OK) { this._callFunctionSafely(() => { // We use the reshardingPauseCoordinatorInSteadyState failpoint so that any @@ -340,11 +351,14 @@ var ReshardingTest = class { postCheckConsistencyFn(); this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off(); + this._pauseCoordinatorBeforeCompletionFailpoint.off(); }); } else { this._callFunctionSafely(() => { this._pauseCoordinatorInSteadyStateFailpoint.off(); this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off(); + postAbortDecisionPersistedFn(); + this._pauseCoordinatorBeforeCompletionFailpoint.off(); }); } diff --git a/jstests/sharding/libs/resharding_test_util.js b/jstests/sharding/libs/resharding_test_util.js new file mode 100644 index 00000000000..265613373e8 --- /dev/null +++ b/jstests/sharding/libs/resharding_test_util.js @@ -0,0 +1,84 @@ +/* + * Utilities for reshardCollection testing. + */ +var ReshardingTestUtil = (function() { + /** + * Returns whether the shard is 'done' aborting the resharding operation with + * 'abortReason.code'. + */ + const shardDoneAbortingWithCode = function(shardEntry, errorCode) { + return shardEntry["abortReason"] && shardEntry["abortReason"]["code"] && + shardEntry["abortReason"]["code"] === errorCode && shardEntry["state"] === "done"; + }; + + /** + * Confirms all resharding participants eventually write their abortReason with state 'done' to + * the main CoordinatorDocument on the configsvr. + * + * Each participant shard should eventually report its abortReason to + * its corresponding entry on the CoordinatorDocument - + * config.reshardingOperations.recipientShards[shardName] for recipients and + * config.reshardingOperations.donorShards[shardName] for donors. + */ + const assertAllParticipantsReportAbortToCoordinator = function(configsvr, nss, errCode) { + const reshardingOperationsCollection = + configsvr.getCollection("config.reshardingOperations"); + assert.soon( + () => { + const coordinatorDoc = reshardingOperationsCollection.findOne({nss}); + assert(coordinatorDoc); + // Iterate over both the recipientShards and donorShards and check that every shard + // entry is in state 'done' and contains an abortReason with the errCode. + for (const shardEntry + of [...coordinatorDoc.recipientShards, ...coordinatorDoc.donorShards]) { + if (!shardDoneAbortingWithCode(shardEntry, errCode)) { + return false; + } + } + return true; + }, + () => { + return "Not all participants reported to the configsvr's CoordinatorDocument" + + " they are done aborting with code " + errCode + ". Got CoordinatorDocument: " + + tojson(reshardingOperationsCollection.find().toArray()); + }); + }; + + /** + * Asserts that the particpant eventually reports their abortReason with state 'done' locally + * to its ReshardingDonorDocument or ReshardingRecipientDocument, pending the participantType. + * + * Not exposed to users of ReshardingTestUtil, users should call assertRecipientAbortsLocally or + * assertDonorAbortsLocally instead. + */ + const assertParticipantAbortsLocally = function( + shardConn, shardName, nss, abortReason, participantType) { + const localOpsCollection = + shardConn.getCollection(`config.localReshardingOperations.${participantType}`); + + assert.soon( + () => { + return localOpsCollection.findOne( + {nss, state: "done", "abortReason.code": abortReason}) !== null; + }, + () => { + return participantType + " shard " + shardName + + " never transitioned to an done state with abortReason " + abortReason + ": " + + tojson(localDonorOpsCollection.findOne()); + }); + }; + + const assertRecipientAbortsLocally = function(shardConn, shardName, nss, abortReason) { + return assertParticipantAbortsLocally(shardConn, shardName, nss, abortReason, "recipient"); + }; + + const assertDonorAbortsLocally = function(shardConn, shardName, nss, abortReason) { + return assertParticipantAbortsLocally(shardConn, shardName, nss, abortReason, "donor"); + }; + + return { + assertAllParticipantsReportAbortToCoordinator, + assertRecipientAbortsLocally, + assertDonorAbortsLocally + }; +})(); diff --git a/jstests/sharding/resharding_clones_duplicate_key.js b/jstests/sharding/resharding_clones_duplicate_key.js index f127e7ac1fb..ad3ad2baa55 100644 --- a/jstests/sharding/resharding_clones_duplicate_key.js +++ b/jstests/sharding/resharding_clones_duplicate_key.js @@ -13,6 +13,7 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/discover_topology.js"); load("jstests/sharding/libs/resharding_test_fixture.js"); +load("jstests/sharding/libs/resharding_test_util.js"); const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 1}); reshardingTest.setup(); @@ -47,48 +48,6 @@ assert.commandWorked(inputCollection.insert([ {_id: 21, info: `moves from ${donorShardNames[1]}`, oldKey: 10, newKey: 10, pad: largeStr}, ])); -/** - * Confirms the shard's abortReason and state are written locally in - * config.localReshardingOperations.recipient, the shard's local ReshardingRecipientDocument. - */ -function assertEventuallyErrorsLocally(shardConn, shardName) { - const localRecipientOpsCollection = - shardConn.getCollection("config.localReshardingOperations.recipient"); - - assert.soon( - () => { - return localRecipientOpsCollection.findOne( - {state: "error", "abortReason.code": ErrorCodes.DuplicateKey}) !== null; - }, - () => { - return "recipient shard " + shardName + " never transitioned to the error state: " + - tojson(localRecipientOpsCollection.find().toArray()); - }); -} - -/** - * Confirms the shard's abortReason and state are written in - * config.reshardingOperations.recipientShards[shardName], the main CoordinatorDocument on the - * configsvr. - */ -function assertEventuallyErrorsInRecipientList(configsvrConn, shardName, nss) { - const reshardingOperationsCollection = - configsvrConn.getCollection("config.reshardingOperations"); - assert.soon( - () => { - return reshardingOperationsCollection.findOne({ - nss, - recipientShards: - {$elemMatch: {id: shardName, "abortReason.code": ErrorCodes.DuplicateKey}} - }) !== null; - }, - () => { - return "recipient shard " + shardName + " never updated its entry in the coordinator" + - " document to state kError and abortReason DuplicateKey: " + - tojson(reshardingOperationsCollection.find().toArray()); - }); -} - const mongos = inputCollection.getMongo(); const recipientShardNames = reshardingTest.recipientShardNames; @@ -98,29 +57,22 @@ const configsvr = new Mongo(topology.configsvr.nodes[0]); const fp = configureFailPoint(recipient0, "removeRecipientDocFailpoint"); -// In the current implementation, the reshardCollection command won't ever complete if one of the -// recipients encounter an unrecoverable error while cloning. To work around this limitation, we -// verify the recipient shard transitioned itself into the "error" state as a result of the -// duplicate key error during resharding's collection cloning. -// -// TODO SERVER-53792: Investigate removing interruptReshardingThread() from this test and instead -// directly asserting that the reshardCollection command fails with an error without losing -// intermediate checks regarding where the recipient communicates its error. -reshardingTest.withReshardingInBackground( // +reshardingTest.withReshardingInBackground( { newShardKeyPattern: {newKey: 1}, newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}], }, () => { // TODO SERVER-51696: Review if these checks can be made in a cpp unittest instead. - assertEventuallyErrorsLocally(recipient0, recipientShardNames[0]); - assertEventuallyErrorsInRecipientList( - configsvr, recipientShardNames[0], inputCollection.getFullName()); - reshardingTest.interruptReshardingThread(); + ReshardingTestUtil.assertRecipientAbortsLocally(recipient0, + recipientShardNames[0], + inputCollection.getFullName(), + ErrorCodes.DuplicateKey); + fp.off(); + ReshardingTestUtil.assertAllParticipantsReportAbortToCoordinator( + configsvr, inputCollection.getFullName(), ErrorCodes.DuplicateKey); }, - {expectedErrorCode: ErrorCodes.Interrupted}); - -fp.off(); + {expectedErrorCode: ErrorCodes.DuplicateKey}); const idleCursors = mongos.getDB("admin") .aggregate([ diff --git a/jstests/sharding/resharding_indexBuilds.js b/jstests/sharding/resharding_indexBuilds.js index 7de174326ca..9e4f4ae982c 100644 --- a/jstests/sharding/resharding_indexBuilds.js +++ b/jstests/sharding/resharding_indexBuilds.js @@ -12,6 +12,7 @@ load("jstests/libs/discover_topology.js"); load("jstests/sharding/libs/resharding_test_fixture.js"); +load("jstests/sharding/libs/resharding_test_util.js"); load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); @@ -47,73 +48,27 @@ let createIndexFailpoint = configureFailPoint(donor0, "hangIndexBuildBeforeCommi createIndexThread.start(); createIndexFailpoint.wait(); -/** - * Confirms the shard's abortReason and state are written locally in - * config.localReshardingOperations.donor, the shard's local ReshardingDonorDocument. - */ -function assertEventuallyErrorsLocally(shardConn, shardName) { - const localDonorOpsCollection = - shardConn.getCollection("config.localReshardingOperations.donor"); - - assert.soon( - () => { - return localDonorOpsCollection.findOne({ - state: "error", - "abortReason.code": ErrorCodes.BackgroundOperationInProgressForNamespace - }) !== null; - }, - () => { - return "donor shard " + shardName + " never transitioned to the error state: " + - tojson(localDonorOpsCollection.findOne()); - }); -} - -/** - * Confirms the shard's abortReason and state are written in - * config.reshardingOperations.donorShards[shardName], the main CoordinatorDocument on the - * configsvr. - */ -function assertEventuallyErrorsInDonorList(configsvrConn, shardName, nss) { - const reshardingOperationsCollection = - configsvrConn.getCollection("config.reshardingOperations"); - assert.soon( - () => { - return reshardingOperationsCollection.findOne({ - nss, - donorShards: { - $elemMatch: { - id: shardName, - "abortReason.code": ErrorCodes.BackgroundOperationInProgressForNamespace - } - } - }) !== null; - }, - () => { - return "donor shard " + shardName + " never updated its entry in the coordinator" + - " document to state kError and abortReason BackgroundOperationInProgressForNamespace: " + - tojson(reshardingOperationsCollection.find().toArray()); - }); -} - -// In the current implementation, the reshardCollection command won't propagate the error to the -// caller if the recipient/donor shard encounters an unrecoverable error. To work around this -// limitation, we verify the recipient shard transitioned itself into the "error" state as a result -// of the duplicate key error during resharding's collection cloning. -// -// TODO SERVER-53792: Ensure the error propagated to the user indicates -// ErrorCodes.BackgroundOperationInProgressForNamespace. -reshardingTest.withReshardingInBackground( // +reshardingTest.withReshardingInBackground( { newShardKeyPattern: {newKey: 1}, newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}], }, () => { // TODO SERVER-51696: Review if these checks can be made in a cpp unittest instead. - assertEventuallyErrorsLocally(donor0, donorShardNames[0]); - assertEventuallyErrorsInDonorList( - configsvr, donorShardNames[0], inputCollection.getFullName()); + ReshardingTestUtil.assertDonorAbortsLocally( + donor0, + donorShardNames[0], + inputCollection.getFullName(), + ErrorCodes.BackgroundOperationInProgressForNamespace); + + // Note: even though the recipient state machine does not exist by the time the donor + // errors, recipients should still acknowledge they saw the coordinator's abort. + ReshardingTestUtil.assertAllParticipantsReportAbortToCoordinator( + configsvr, + inputCollection.getFullName(), + ErrorCodes.BackgroundOperationInProgressForNamespace); }, - {expectedErrorCode: ErrorCodes.InternalError}); + {expectedErrorCode: ErrorCodes.BackgroundOperationInProgressForNamespace}); // Resume index build. createIndexFailpoint.off(); diff --git a/jstests/sharding/resharding_recipient_broadcasts_abortReason.js b/jstests/sharding/resharding_recipient_broadcasts_abortReason.js new file mode 100644 index 00000000000..db76cf0bab6 --- /dev/null +++ b/jstests/sharding/resharding_recipient_broadcasts_abortReason.js @@ -0,0 +1,76 @@ +/** + * Tests that when a donor shard encounters an unrecoverable error, the error gets propagated from + * the failing donor shard, to the coordinator, and then to the remaining participant shards. + * + * @tags: [ + * requires_fcv_49, + * uses_atclustertime, + * __TEMPORARILY_DISABLED__ + * ] + * + * TODO SERVER-54474: Re-enable this test once donors engaged in the critical section are able to + * abort resharding locally after the coordinator transitions to an error state. + */ +(function() { +"use strict"; + +load("jstests/libs/discover_topology.js"); +load("jstests/sharding/libs/resharding_test_fixture.js"); +load("jstests/sharding/libs/resharding_test_util.js"); + +const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2}); + +reshardingTest.setup(); + +const donorShardNames = reshardingTest.donorShardNames; +const inputCollection = reshardingTest.createShardedCollection({ + ns: "reshardingDb.coll", + shardKeyPattern: {oldKey: 1}, + chunks: [ + {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]}, + {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}, + ], +}); + +assert.commandWorked(inputCollection.insert([ + {_id: "stays on shard0", oldKey: -10, newKey: -10}, + {_id: "moves to shard0", oldKey: 10, newKey: -10}, + {_id: "moves to shard1", oldKey: -10, newKey: 10}, + {_id: "stays on shard1", oldKey: 10, newKey: 10}, +])); + +const mongos = inputCollection.getMongo(); +const recipientShardNames = reshardingTest.recipientShardNames; + +const topology = DiscoverTopology.findConnectedNodes(mongos); +const donor_host = topology.shards[donorShardNames[0]].primary; +const donor0 = new Mongo(donor_host); +const configsvr = new Mongo(topology.configsvr.nodes[0]); + +const fp = configureFailPoint(donor0, "reshardingDonorFailsBeforePreparingToMirror"); + +reshardingTest.withReshardingInBackground( + { + newShardKeyPattern: {newKey: 1}, + newChunks: [ + {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]}, + {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]}, + ], + }, + (tempNs) => { + // TODO SERVER-51696: Review if these checks can be made in a cpp unittest instead. + // First, wait for the shard to encounter an unrecoverable error and persist it locally. + ReshardingTestUtil.assertDonorAbortsLocally( + donor0, donorShardNames[0], inputCollection.getFullName(), ErrorCodes.InternalError); + }, + { + expectedErrorCode: ErrorCodes.InternalError, + postAbortDecisionPersistedFn: () => { + ReshardingTestUtil.assertAllParticipantsReportAbortToCoordinator( + configsvr, inputCollection.getFullName(), ErrorCodes.InternalError); + } + }); + +fp.off(); +reshardingTest.teardown(); +})(); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp index 5c8b72bfefe..7329a5cea7d 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp @@ -34,6 +34,7 @@ #include <fmt/format.h> #include "mongo/db/s/resharding/coordinator_document_gen.h" +#include "mongo/db/s/resharding_util.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_chunk.h" @@ -59,87 +60,116 @@ const std::vector<RecipientShardEntry>& getParticipants( } /** - * Generates error an response indicating which participant was found to be in state kError. - */ -Status generateErrorStatus(WithLock, const DonorShardEntry& donor) { - return {ErrorCodes::InternalError, - "Donor shard {} is in an error state"_format(donor.getId().toString())}; -} -Status generateErrorStatus(WithLock, const RecipientShardEntry& recipient) { - return {ErrorCodes::InternalError, - "Recipient shard {} is in an error state"_format(recipient.getId().toString())}; -} - -/** - * Returns true if the participants all have a (non-error) state greater than or equal to - * expectedState, false otherwise. - * - * If any participant is in state kError, returns an error status. + * Returns true if all participants are in a state greater than or equal to the expectedState. */ template <class TState, class TParticipant> -StatusWith<bool> allParticipantsInStateGTE(WithLock lk, - TState expectedState, - const std::vector<TParticipant>& participants) { - bool allInStateGTE = true; +bool allParticipantsInStateGTE(WithLock lk, + TState expectedState, + const std::vector<TParticipant>& participants) { for (const auto& shard : participants) { - auto state = shard.getState(); - if (state != expectedState) { - if (state == TState::kError) { - return generateErrorStatus(lk, shard); - } - - if (allInStateGTE) { - // Ensure allInStateGTE never goes from false -> true. It's possible that while one - // participant has not yet reached the expectedState, another participant is in a - // state greater than the expectedState. - // - // Instead of early returning, continue loop in case another participant reported an - // error. - allInStateGTE = state > expectedState; - } + if (shard.getState() < expectedState) { + return false; } } - return allInStateGTE; + return true; } /** - * Returns true if the state transition is incomplete and the promise cannot yet be fulfilled. This - * includes if one or more of the participants report state kError. In the error case, an error is - * set on the promise. - * - * Otherwise returns false and fulfills the promise if it is not already. + * Returns whether or not all relevant shards have completed their transitions into the + * expectedState. If they have, ensures the promise is fulfilled. */ -template <class T> -bool stateTransitionIncomplete(WithLock lk, +template <class TState> +bool stateTransistionsComplete(WithLock lk, SharedPromise<ReshardingCoordinatorDocument>& sp, - T expectedState, + TState expectedState, const ReshardingCoordinatorDocument& updatedStateDoc) { if (sp.getFuture().isReady()) { // Ensure promise is not fulfilled twice. - return false; + return true; } auto participants = getParticipants(lk, expectedState, updatedStateDoc); - auto swAllParticipantsGTE = allParticipantsInStateGTE(lk, expectedState, participants); - if (!swAllParticipantsGTE.isOK()) { - // By returning true, onReshardingParticipantTransition will not try to fulfill any more of - // the promises. - // ReshardingCoordinator::run() waits on the promises from the ReshardingCoordinatorObserver - // in the same order they're fulfilled by onReshardingParticipantTransition(). If even one - // promise in the chain errors, ReshardingCoordinator::run() will jump to onError and not - // wait for the remaining promises to be fulfilled. - sp.setError(swAllParticipantsGTE.getStatus()); + auto allShardsTransitioned = allParticipantsInStateGTE(lk, expectedState, participants); + if (allShardsTransitioned) { + sp.emplaceValue(updatedStateDoc); return true; } + return false; +} - if (swAllParticipantsGTE.getValue()) { - // All participants are in a state greater than or equal to expected state. - sp.emplaceValue(updatedStateDoc); - return false; +/** + * Appends context regarding the source of the abortReason. + */ +template <class TParticipant> +Status getStatusFromAbortReasonWithShardInfo(const TParticipant& participant, + StringData participantType) { + return getStatusFromAbortReason(participant) + .withContext("{} shard {} reached an unrecoverable error"_format( + participantType, participant.getId().toString())); +} + +/** + * If neither the coordinator nor participants have encountered an unrecoverable error, returns + * boost::none. + * + * Otherwise, returns the abortReason reported by either the coordinator or one of the participants. + */ +boost::optional<Status> getAbortReasonIfExists( + const ReshardingCoordinatorDocument& updatedStateDoc) { + if (updatedStateDoc.getAbortReason()) { + // Note: the absence of context specifying which shard the abortReason originates from + // implies the abortReason originates from the coordinator. + return getStatusFromAbortReason(updatedStateDoc); + } + + for (const auto& donorShard : updatedStateDoc.getDonorShards()) { + if (donorShard.getState() == DonorStateEnum::kError) { + return getStatusFromAbortReasonWithShardInfo(donorShard, "Donor"_sd); + } + } + + for (const auto& recipientShard : updatedStateDoc.getRecipientShards()) { + if (recipientShard.getState() == RecipientStateEnum::kError) { + return getStatusFromAbortReasonWithShardInfo(recipientShard, "Recipient"_sd); + } + } + + return boost::none; +} + +template <class TState, class TParticipant> +bool allParticipantsDoneWithAbortReason(WithLock lk, + TState expectedState, + const std::vector<TParticipant>& participants) { + for (const auto& shard : participants) { + if (!(shard.getState() == expectedState && shard.getAbortReason().is_initialized())) { + return false; + } } return true; } +/** + * Fulfills allParticipantsDoneAbortingSp if all participants have reported to the coordinator that + * they have finished aborting locally. + */ +void checkAllParticipantsAborted(WithLock lk, + SharedPromise<void>& allParticipantsDoneAbortingSp, + const ReshardingCoordinatorDocument& updatedStateDoc) { + if (allParticipantsDoneAbortingSp.getFuture().isReady()) { + return; + } + + bool allDonorsAborted = allParticipantsDoneWithAbortReason( + lk, DonorStateEnum::kDone, updatedStateDoc.getDonorShards()); + bool allRecipientsAborted = allParticipantsDoneWithAbortReason( + lk, RecipientStateEnum::kDone, updatedStateDoc.getRecipientShards()); + + if (allDonorsAborted && allRecipientsAborted) { + allParticipantsDoneAbortingSp.emplaceValue(); + } +} + } // namespace ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default; @@ -156,41 +186,46 @@ ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() { void ReshardingCoordinatorObserver::onReshardingParticipantTransition( const ReshardingCoordinatorDocument& updatedStateDoc) { - stdx::lock_guard<Latch> lk(_mutex); - if (stateTransitionIncomplete(lk, - _allDonorsReportedMinFetchTimestamp, - DonorStateEnum::kDonatingInitialData, - updatedStateDoc)) { + if (auto abortReason = getAbortReasonIfExists(updatedStateDoc)) { + _onAbortOrStepdown(lk, abortReason.get()); + checkAllParticipantsAborted(lk, _allParticipantsDoneAborting, updatedStateDoc); return; } - if (stateTransitionIncomplete( + if (!stateTransistionsComplete(lk, + _allDonorsReportedMinFetchTimestamp, + DonorStateEnum::kDonatingInitialData, + updatedStateDoc)) { + return; + } + + if (!stateTransistionsComplete( lk, _allRecipientsFinishedCloning, RecipientStateEnum::kApplying, updatedStateDoc)) { return; } - if (stateTransitionIncomplete(lk, - _allRecipientsFinishedApplying, - RecipientStateEnum::kSteadyState, - updatedStateDoc)) { + if (!stateTransistionsComplete(lk, + _allRecipientsFinishedApplying, + RecipientStateEnum::kSteadyState, + updatedStateDoc)) { return; } - if (stateTransitionIncomplete(lk, - _allRecipientsReportedStrictConsistencyTimestamp, - RecipientStateEnum::kStrictConsistency, - updatedStateDoc)) { + if (!stateTransistionsComplete(lk, + _allRecipientsReportedStrictConsistencyTimestamp, + RecipientStateEnum::kStrictConsistency, + updatedStateDoc)) { return; } - if (stateTransitionIncomplete( + if (!stateTransistionsComplete( lk, _allRecipientsRenamedCollection, RecipientStateEnum::kDone, updatedStateDoc)) { return; } - if (stateTransitionIncomplete( + if (!stateTransistionsComplete( lk, _allDonorsDroppedOriginalCollection, DonorStateEnum::kDone, updatedStateDoc)) { return; } @@ -226,9 +261,20 @@ ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() { return _allRecipientsRenamedCollection.getFuture(); } +SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllParticipantsDoneAborting() { + return _allParticipantsDoneAborting.getFuture(); +} + void ReshardingCoordinatorObserver::interrupt(Status status) { - stdx::lock_guard<Latch> lg(_mutex); + stdx::lock_guard<Latch> lk(_mutex); + _onAbortOrStepdown(lk, status); + + if (!_allParticipantsDoneAborting.getFuture().isReady()) { + _allParticipantsDoneAborting.setError(status); + } +} +void ReshardingCoordinatorObserver::_onAbortOrStepdown(WithLock, Status status) { if (!_allDonorsReportedMinFetchTimestamp.getFuture().isReady()) { _allDonorsReportedMinFetchTimestamp.setError(status); } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h index 27a64fa53be..50d4c2fdd5e 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h @@ -104,11 +104,23 @@ public: SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllDonorsDroppedOriginalCollection(); /** + * Fulfills the '_allParticipantsDoneAborting' promise when the last recipient or donor writes + * that it is in 'kDone' with an abortReason. + */ + SharedSemiFuture<void> awaitAllParticipantsDoneAborting(); + + /** * Sets errors on any promises that have not yet been fulfilled. */ void interrupt(Status status); private: + /** + * Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors + * (abort resharding). + */ + void _onAbortOrStepdown(WithLock, Status status); + // Protects the state below Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorObserver::_mutex"); @@ -126,6 +138,9 @@ private: * {_allRecipientsReportedStrictConsistencyTimestamp, RecipientStateEnum::kStrictConsistency} * {_allRecipientsRenamedCollection, RecipientStateEnum::kDone} * {_allDonorsDroppedOriginalCollection, DonorStateEnum::kDone} + * {_allParticipantsDoneAborting, + * DonorStateEnum::kDone with abortReason AND + * RecipientStateEnum::kDone with abortReason} */ SharedPromise<ReshardingCoordinatorDocument> _allDonorsReportedMinFetchTimestamp; @@ -139,6 +154,8 @@ private: SharedPromise<ReshardingCoordinatorDocument> _allRecipientsRenamedCollection; SharedPromise<ReshardingCoordinatorDocument> _allDonorsDroppedOriginalCollection; + + SharedPromise<void> _allParticipantsDoneAborting; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp index 1b57c5f02b5..3f6c3c19272 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp @@ -45,25 +45,32 @@ namespace { class ReshardingCoordinatorObserverTest : public unittest::Test { protected: ReshardingCoordinatorDocument makeCoordinatorDocWithRecipientsAndDonors( - std::vector<RecipientShardEntry>& recipients, std::vector<DonorShardEntry>& donors) { + std::vector<RecipientShardEntry>& recipients, + std::vector<DonorShardEntry>& donors, + boost::optional<Status> abortReason = boost::none) { auto coordinatorDoc = ReshardingCoordinatorDocument(); coordinatorDoc.setRecipientShards(std::move(recipients)); coordinatorDoc.setDonorShards(std::move(donors)); + emplaceAbortReasonIfExists(coordinatorDoc, abortReason); return coordinatorDoc; } std::vector<DonorShardEntry> makeMockDonorsInState( - DonorStateEnum donorState, boost::optional<Timestamp> timestamp = boost::none) { - return {makeDonorShard(ShardId{"s1"}, donorState, timestamp), - makeDonorShard(ShardId{"s2"}, donorState, timestamp), - makeDonorShard(ShardId{"s3"}, donorState, timestamp)}; + DonorStateEnum donorState, + boost::optional<Timestamp> timestamp = boost::none, + boost::optional<Status> abortReason = boost::none) { + return {makeDonorShard(ShardId{"s1"}, donorState, timestamp, abortReason), + makeDonorShard(ShardId{"s2"}, donorState, timestamp, abortReason), + makeDonorShard(ShardId{"s3"}, donorState, timestamp, abortReason)}; } std::vector<RecipientShardEntry> makeMockRecipientsInState( - RecipientStateEnum recipientState, boost::optional<Timestamp> timestamp = boost::none) { - return {makeRecipientShard(ShardId{"s1"}, recipientState, timestamp), - makeRecipientShard(ShardId{"s2"}, recipientState, timestamp), - makeRecipientShard(ShardId{"s3"}, recipientState, timestamp)}; + RecipientStateEnum recipientState, + boost::optional<Timestamp> timestamp = boost::none, + boost::optional<Status> abortReason = boost::none) { + return {makeRecipientShard(ShardId{"s1"}, recipientState, timestamp, abortReason), + makeRecipientShard(ShardId{"s2"}, recipientState, timestamp, abortReason), + makeRecipientShard(ShardId{"s3"}, recipientState, timestamp, abortReason)}; } }; @@ -133,12 +140,15 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) { // By default, all donors should be kDonatingInitialData at this stage. auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1)); - std::vector<RecipientShardEntry> recipientShards0{ + std::vector<RecipientShardEntry> recipientShards{ {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)}, - {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kError)}, + {makeRecipientShard(ShardId{"s2"}, + RecipientStateEnum::kError, + boost::none, // timestamp + Status{ErrorCodes::InternalError, "We gotta abort"})}, {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}}; - auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards); - reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); + auto coordinatorDoc = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards); + reshardingObserver->onReshardingParticipantTransition(coordinatorDoc); auto resp = fut.getNoThrow(); // If any participant is in state kError, regardless of other participants' states, an error @@ -148,6 +158,46 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) { reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"}); } +TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) { + auto reshardingObserver = std::make_shared<ReshardingCoordinatorObserver>(); + + auto fut = reshardingObserver->awaitAllParticipantsDoneAborting(); + ASSERT_FALSE(fut.isReady()); + + auto abortReason = Status{ErrorCodes::InternalError, "We gotta abort"}; + + // All participants have an abortReason, but not all are in state kDone yet. + auto donorShards = makeMockDonorsInState(DonorStateEnum::kDone, Timestamp(1, 1), abortReason); + std::vector<RecipientShardEntry> recipientShards0{ + {makeRecipientShard(ShardId{"s1"}, + RecipientStateEnum::kError, + boost::none, // timestamp + abortReason)}, + {makeRecipientShard(ShardId{"s2"}, + RecipientStateEnum::kDone, + boost::none, // timestamp + abortReason)}, + {makeRecipientShard(ShardId{"s3"}, + RecipientStateEnum::kDone, + boost::none, // timestamp + abortReason)}}; + auto coordinatorDoc0 = + makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards, abortReason); + reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); + ASSERT_FALSE(fut.isReady()); + + // All participants in state kDone with abortReason. + auto recipientShards1 = + makeMockRecipientsInState(RecipientStateEnum::kDone, boost::none, abortReason); + auto coordinatorDoc1 = + makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards, abortReason); + reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); + ASSERT_TRUE(fut.isReady()); + + // Since all other promises should be fulfilled with an error, no need to interrupt the observer + // before finishing this test case. +} + /** * Confirms that if one recipient is more than one state behind the other, the promise does not get * fulfilled early. diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 97ebb4407fe..b31eebd66a0 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -62,6 +62,7 @@ using namespace fmt::literals; MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorInSteadyState); MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeDecisionPersisted); +MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCompletion) void assertNumDocsModifiedMatchesExpected(const BatchedCommandRequest& request, const BSONObj& response, @@ -502,7 +503,7 @@ stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNotifyEnum> notifyForSta {CoordinatorStateEnum::kMirroring, ParticipantsToNotifyEnum::kDonors}, {CoordinatorStateEnum::kDecisionPersisted, ParticipantsToNotifyEnum::kNone}, {CoordinatorStateEnum::kDone, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kError, ParticipantsToNotifyEnum::kNone}, + {CoordinatorStateEnum::kError, ParticipantsToNotifyEnum::kDonors}, }; /** @@ -787,9 +788,7 @@ void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions( // Run updates to config.reshardingOperations and config.collections in a transaction auto nextState = coordinatorDoc.getState(); invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end()); - // TODO SERVER-51800 Remove special casing for kError. - invariant(nextState == CoordinatorStateEnum::kError || - notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone, + invariant(notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone, "failed to write state transition with nextState {}"_format( CoordinatorState_serializer(nextState))); @@ -813,13 +812,6 @@ void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions( } }; - // TODO SERVER-51800 Remove special casing for kError. - if (nextState == CoordinatorStateEnum::kError) { - executeStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, std::move(changeMetadataFunc)); - return; - } - bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( opCtx, coordinatorDoc, std::move(changeMetadataFunc)); } @@ -957,7 +949,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( // keep 'this' pointer alive for the remaining callbacks. return _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(executor); }) - .onError([this, self = shared_from_this(), executor](Status status) { + .onError([this, self = shared_from_this(), token, executor](Status status) { stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here. @@ -975,17 +967,24 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( _updateCoordinatorDocStateAndCatalogEntries( CoordinatorStateEnum::kError, _coordinatorDoc, boost::none, status); - // TODO wait for donors and recipients to abort the operation and clean up state - _tellAllRecipientsToRefresh(executor); _tellAllParticipantsToRefresh(createFlushRoutingTableCacheUpdatesCommand(nss), executor); + // Wait for all participants to acknowledge the operation reached an unrecoverable + // error. + future_util::withCancelation( + _reshardingCoordinatorObserver->awaitAllParticipantsDoneAborting(), token) + .get(); + return status; }) .onCompletion([this, self = shared_from_this()](Status status) { // Notify `ReshardingMetrics` as the operation is now complete for external observers. markCompleted(status); + auto opCtx = cc().makeOperationContext(); + reshardingPauseCoordinatorBeforeCompletion.pauseWhileSet(opCtx.get()); + stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here. diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index f86d51a40c2..6797b08376a 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -775,6 +775,10 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionToErrorSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch); + insertChunkAndZoneEntries( + makeChunks(_originalNss, OID::gen(), _oldShardKey, std::vector{OID::gen(), OID::gen()}), + makeZones(_originalNss, _oldShardKey)); + // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kError); diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 97d4f8e9bf9..99872599e00 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -27,12 +27,19 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + #include "mongo/platform/basic.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include <fmt/format.h> +#include "mongo/db/s/sharding_state.h" +#include "mongo/logv2/log.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/grid.h" + namespace mongo { namespace resharding { @@ -62,6 +69,76 @@ void createReshardingStateMachine(OperationContext* opCtx, const ReshardingDocum StateMachine::getOrCreate(opCtx, service, doc.toBSON()); } +/** + * Acknowledges to the coordinator that all abort work is done on the donor. Since the + * ReshardingDonorService doesn't exist, there isn't any work to do. + * + * TODO SERVER-54704: Remove this method, it should no longer be needed after SERVER-54513 is + * completed. + */ +void processAbortReasonNoDonorMachine(OperationContext* opCtx, + const ReshardingFields& reshardingFields, + const NamespaceString& nss) { + { + Lock::ResourceLock rstl( + opCtx->lockState(), resourceIdReplicationStateTransitionLock, MODE_IX); + auto* const replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->canAcceptWritesFor(opCtx, nss)) { + // no-op when node is not primary. + return; + } + } + + auto abortReason = reshardingFields.getAbortReason(); + auto shardId = ShardingState::get(opCtx)->shardId(); + BSONObjBuilder updateBuilder; + updateBuilder.append("donorShards.$.state", DonorState_serializer(DonorStateEnum::kDone)); + updateBuilder.append("donorShards.$.abortReason", *abortReason); + uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument( + opCtx, + NamespaceString::kConfigReshardingOperationsNamespace, + BSON("_id" << reshardingFields.getUuid() << "donorShards.id" << shardId), + BSON("$set" << updateBuilder.done()), + false /* upsert */, + ShardingCatalogClient::kMajorityWriteConcern)); +} + +/** + * Acknowledges to the coordinator that all abort work is done on the recipient. Since the + * ReshardingRecipientService doesn't exist, there isn't any work to do. + * + * TODO SERVER-54704: Remove this method, it should no longer be needed after SERVER-54513 is + * completed. + */ +void processAbortReasonNoRecipientMachine(OperationContext* opCtx, + const ReshardingFields& reshardingFields, + const NamespaceString& nss) { + + { + Lock::ResourceLock rstl( + opCtx->lockState(), resourceIdReplicationStateTransitionLock, MODE_IX); + auto* const replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->canAcceptWritesFor(opCtx, nss)) { + // no-op when node is not primary. + return; + } + } + + auto abortReason = reshardingFields.getAbortReason(); + auto shardId = ShardingState::get(opCtx)->shardId(); + BSONObjBuilder updateBuilder; + updateBuilder.append("recipientShards.$.state", + RecipientState_serializer(RecipientStateEnum::kDone)); + updateBuilder.append("recipientShards.$.abortReason", *abortReason); + uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument( + opCtx, + NamespaceString::kConfigReshardingOperationsNamespace, + BSON("_id" << reshardingFields.getUuid() << "recipientShards.id" << shardId), + BSON("$set" << updateBuilder.done()), + false /* upsert */, + ShardingCatalogClient::kMajorityWriteConcern)); +} + /* * Either constructs a new ReshardingDonorStateMachine with 'reshardingFields' or passes * 'reshardingFields' to an already-existing ReshardingDonorStateMachine. @@ -78,6 +155,11 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx, return; } + if (reshardingFields.getAbortReason()) { + processAbortReasonNoDonorMachine(opCtx, reshardingFields, nss); + return; + } + // If a resharding operation is BEFORE state kPreparingToDonate, then the config.collections // entry won't have yet been created for the temporary resharding collection. The // DonorStateMachine refreshes the temporary resharding collection immediately after being @@ -109,6 +191,7 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx, * 'reshardingFields' to an already-existing ReshardingRecipientStateMachine. */ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx, + const NamespaceString& nss, const CollectionMetadata& metadata, const ReshardingFields& reshardingFields) { if (auto recipientStateMachine = tryGetReshardingStateMachine<ReshardingRecipientService, @@ -119,6 +202,11 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx, return; } + if (reshardingFields.getAbortReason()) { + processAbortReasonNoRecipientMachine(opCtx, reshardingFields, nss); + return; + } + // If a resharding operation is past state kCloning but does not currently have a recipient // document in-memory, this means that the document will be recovered by the // ReshardingRecipientService, and at that time the latest instance of 'reshardingFields' @@ -146,6 +234,38 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx, ReshardingRecipientDocument>(opCtx, recipientDoc); } +/** + * Checks that presence/absence of 'donorShards' and 'recipientShards' fields in the + * reshardingFields are consistent with the 'state' field. + */ +void verifyValidReshardingFields(const ReshardingFields& reshardingFields) { + auto coordinatorState = reshardingFields.getState(); + + if (coordinatorState < CoordinatorStateEnum::kDecisionPersisted) { + // Prior to the state CoordinatorStateEnum::kDecisionPersisted, only the source + // collection's config.collections entry should have donorFields, and only the + // temporary resharding collection's entry should have recipientFields. + uassert(5274201, + fmt::format("reshardingFields must contain either donorFields or recipientFields " + "(and not both) when the " + "coordinator is in state {}. Got reshardingFields {}", + CoordinatorState_serializer(reshardingFields.getState()), + reshardingFields.toBSON().toString()), + reshardingFields.getDonorFields().is_initialized() ^ + reshardingFields.getRecipientFields().is_initialized()); + } else { + // At and after state CoordinatorStateEnum::kDecisionPersisted, the temporary + // resharding collection's config.collections entry has been removed, and so the + // source collection's entry should have both donorFields and recipientFields. + uassert(5274202, + fmt::format("reshardingFields must contain both donorFields and recipientFields " + "when the coordinator's state is greater than or equal to " + "CoordinatorStateEnum::kDecisionPersisted. Got reshardingFields {}", + reshardingFields.toBSON().toString()), + reshardingFields.getDonorFields() && reshardingFields.getRecipientFields()); + } +} + } // namespace ReshardingDonorDocument constructDonorDocumentFromReshardingFields( @@ -192,41 +312,22 @@ void processReshardingFieldsForCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionMetadata& metadata, const ReshardingFields& reshardingFields) { - auto coordinatorState = reshardingFields.getState(); - if (coordinatorState != CoordinatorStateEnum::kError) { - if (coordinatorState < CoordinatorStateEnum::kDecisionPersisted) { - // Prior to the state CoordinatorStateEnum::kDecisionPersisted, only the source - // collection's config.collections entry should have donorFields, and only the - // temporary resharding collection's entry should have recipientFields. - uassert( - 5274201, - fmt::format("reshardingFields must contain either donorFields or recipientFields " - "(and not both) when the " - "coordinator is in state {}. Got reshardingFields {}", - CoordinatorState_serializer(reshardingFields.getState()), - reshardingFields.toBSON().toString()), - reshardingFields.getDonorFields().is_initialized() ^ - reshardingFields.getRecipientFields().is_initialized()); - } else { - // At and after state CoordinatorStateEnum::kDecisionPersisted, the temporary - // resharding collection's config.collections entry has been removed, and so the - // source collection's entry should have both donorFields and recipientFields. - uassert( - 5274202, - fmt::format("reshardingFields must contain both donorFields and recipientFields " - "when the coordinator's state is greater than or equal to " - "CoordinatorStateEnum::kDecisionPersisted. Got reshardingFields {}", - reshardingFields.toBSON().toString()), - reshardingFields.getDonorFields() && reshardingFields.getRecipientFields()); - } + if (reshardingFields.getAbortReason()) { + // The coordinator encountered an unrecoverable error, both donors and recipients should be + // made aware. + processReshardingFieldsForDonorCollection(opCtx, nss, metadata, reshardingFields); + processReshardingFieldsForRecipientCollection(opCtx, nss, metadata, reshardingFields); + return; } + verifyValidReshardingFields(reshardingFields); + if (reshardingFields.getDonorFields()) { processReshardingFieldsForDonorCollection(opCtx, nss, metadata, reshardingFields); } if (reshardingFields.getRecipientFields()) { - processReshardingFieldsForRecipientCollection(opCtx, metadata, reshardingFields); + processReshardingFieldsForRecipientCollection(opCtx, nss, metadata, reshardingFields); } } diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 08de9716f52..816f2084fb4 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -53,6 +53,8 @@ namespace mongo { +MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsBeforePreparingToMirror); + using namespace fmt::literals; namespace { @@ -173,6 +175,11 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( "reshardingId"_attr = _id, "error"_attr = status); _transitionStateAndUpdateCoordinator(DonorStateEnum::kError, boost::none, status); + + // TODO SERVER-52838: Ensure all local collections that may have been created for + // resharding are removed, with the exception of the ReshardingDonorDocument, before + // transitioning to kDone. + _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone, boost::none, status); return status; }) .onCompletion([this, self = shared_from_this()](Status status) { @@ -198,23 +205,8 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( void ReshardingDonorService::DonorStateMachine::interrupt(Status status) { // Resolve any unresolved promises to avoid hanging. - stdx::lock_guard<Latch> lg(_mutex); - if (!_allRecipientsDoneCloning.getFuture().isReady()) { - _allRecipientsDoneCloning.setError(status); - } - - if (!_allRecipientsDoneApplying.getFuture().isReady()) { - _allRecipientsDoneApplying.setError(status); - } - - if (!_finalOplogEntriesWritten.getFuture().isReady()) { - _finalOplogEntriesWritten.setError(status); - } - - if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) { - _coordinatorHasDecisionPersisted.setError(status); - } - + stdx::lock_guard<Latch> lk(_mutex); + _onAbortOrStepdown(lk, status); if (!_completionPromise.getFuture().isReady()) { _completionPromise.setError(status); } @@ -233,17 +225,14 @@ boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCur void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( OperationContext* opCtx, const TypeCollectionReshardingFields& reshardingFields) { - auto coordinatorState = reshardingFields.getState(); - if (coordinatorState == CoordinatorStateEnum::kError) { - // TODO SERVER-52838: Investigate if we want to have a special error code so the donor knows - // when it has recieved the error from the coordinator rather than needing to report an - // error to the coordinator. - interrupt({ErrorCodes::InternalError, - "ReshardingDonorService observed CoordinatorStateEnum::kError"}); + stdx::lock_guard<Latch> lk(_mutex); + if (reshardingFields.getAbortReason()) { + auto status = getStatusFromAbortReason(reshardingFields); + _onAbortOrStepdown(lk, status); return; } - stdx::lock_guard<Latch> lk(_mutex); + auto coordinatorState = reshardingFields.getState(); if (coordinatorState >= CoordinatorStateEnum::kApplying) { ensureFulfilledPromise(lk, _allRecipientsDoneCloning); } @@ -322,9 +311,14 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: return ExecutorFuture<void>(**executor, Status::OK()); } - return _allRecipientsDoneCloning.getFuture().thenRunOn(**executor).then([this]() { - _transitionState(DonorStateEnum::kDonatingOplogEntries); - }); + return _allRecipientsDoneCloning.getFuture() + .thenRunOn(**executor) + .then([this]() { _transitionState(DonorStateEnum::kDonatingOplogEntries); }) + .onCompletion([=](Status s) { + if (MONGO_unlikely(reshardingDonorFailsBeforePreparingToMirror.shouldFail())) { + uasserted(ErrorCodes::InternalError, "Failing for test"); + } + }); } ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: @@ -459,13 +453,16 @@ void ReshardingDonorService::DonorStateMachine::_transitionState( emplaceMinFetchTimestampIfExists(replacementDoc, minFetchTimestamp); emplaceAbortReasonIfExists(replacementDoc, abortReason); + // For logging purposes. + auto oldState = _donorDoc.getState(); auto newState = replacementDoc.getState(); + _updateDonorDocument(std::move(replacementDoc)); LOGV2_INFO(5279505, "Transitioned resharding donor state", "newState"_attr = DonorState_serializer(newState), - "oldState"_attr = DonorState_serializer(_donorDoc.getState()), + "oldState"_attr = DonorState_serializer(oldState), "ns"_attr = _donorDoc.getNss(), "collectionUUID"_attr = _donorDoc.getExistingUUID(), "reshardingUUID"_attr = _donorDoc.get_id()); @@ -542,4 +539,22 @@ void ReshardingDonorService::DonorStateMachine::_removeDonorDocument() { _donorDoc = {}; } +void ReshardingDonorService::DonorStateMachine::_onAbortOrStepdown(WithLock, Status status) { + if (!_allRecipientsDoneCloning.getFuture().isReady()) { + _allRecipientsDoneCloning.setError(status); + } + + if (!_allRecipientsDoneApplying.getFuture().isReady()) { + _allRecipientsDoneApplying.setError(status); + } + + if (!_finalOplogEntriesWritten.getFuture().isReady()) { + _finalOplogEntriesWritten.setError(status); + } + + if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) { + _coordinatorHasDecisionPersisted.setError(status); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 34057c336fa..d0d4b8fc626 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -138,6 +138,10 @@ private: // Removes the local donor document from disk and clears the in-memory state. void _removeDonorDocument(); + // Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors + // (abort resharding). + void _onAbortOrStepdown(WithLock lk, Status status); + // The in-memory representation of the underlying document in // config.localReshardingOperations.donor. ReshardingDonorDocument _donorDoc; diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp index 05144408e12..d3b1959ef60 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp @@ -49,7 +49,10 @@ std::shared_ptr<ReshardingCoordinatorObserver> getReshardingCoordinatorObserver( auto service = registry->lookupServiceByName(kReshardingCoordinatorServiceName); auto instance = ReshardingCoordinatorService::ReshardingCoordinator::lookup(opCtx, service, reshardingId); - invariant(instance); + + iassert( + 5400001, "ReshardingCoordinatorService instance does not exist", instance.is_initialized()); + return (*instance)->getObserver(); } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 67c5267b149..ad2aac82205 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -234,7 +234,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( return _awaitAllDonorsMirroringThenTransitionToStrictConsistency(executor); }) .then([this, executor] { - return __awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(executor); + return _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(executor); }) .then([this] { _renameTemporaryReshardingCollection(); }) .onError([this](Status status) { @@ -245,6 +245,12 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( "error"_attr = status); _transitionState(RecipientStateEnum::kError, boost::none, status); _updateCoordinator(); + + // TODO SERVER-52838: Ensure all local collections that may have been created for + // resharding are removed, with the exception of the ReshardingRecipientDocument, before + // transitioning to kDone. + _transitionState(RecipientStateEnum::kDone, boost::none, status); + _updateCoordinator(); return status; }) .onCompletion([this, self = shared_from_this()](Status status) { @@ -255,13 +261,17 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( return; } - removeRecipientDocFailpoint.pauseWhileSet(); - if (status.isOK()) { // The shared_ptr stored in the PrimaryOnlyService's map for the // ReshardingRecipientService Instance is removed when the recipient state document // tied to the instance is deleted. It is necessary to use shared_from_this() to // extend the lifetime so the code can safely finish executing. + + { + auto opCtx = cc().makeOperationContext(); + removeRecipientDocFailpoint.pauseWhileSet(opCtx.get()); + } + _removeRecipientDocument(); _metrics()->onCompletion(ReshardingMetrics::OperationStatus::kSucceeded); _completionPromise.emplaceValue(); @@ -269,7 +279,6 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( _metrics()->onCompletion(ErrorCodes::isCancelationError(status) ? ReshardingMetrics::OperationStatus::kCanceled : ReshardingMetrics::OperationStatus::kFailed); - // Set error on all promises _completionPromise.setError(status); } }) @@ -278,23 +287,8 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) { // Resolve any unresolved promises to avoid hanging. - stdx::lock_guard<Latch> lg(_mutex); - - if (_oplogFetcherExecutor) { - _oplogFetcherExecutor->shutdown(); - } - - for (auto&& fetcher : _oplogFetchers) { - fetcher->interrupt(status); - } - - for (auto&& threadPool : _oplogApplierWorkers) { - threadPool->shutdown(); - } - - if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) { - _coordinatorHasDecisionPersisted.setError(status); - } + stdx::lock_guard<Latch> lk(_mutex); + _onAbortOrStepdown(lk, status); if (!_completionPromise.getFuture().isReady()) { _completionPromise.setError(status); @@ -314,17 +308,14 @@ boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::repo void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges( OperationContext* opCtx, const TypeCollectionReshardingFields& reshardingFields) { - auto coordinatorState = reshardingFields.getState(); - if (coordinatorState == CoordinatorStateEnum::kError) { - // TODO SERVER-52838: Investigate if we want to have a special error code so the recipient - // knows when it has recieved the error from the coordinator rather than needing to report - // an error to the coordinator. - interrupt({ErrorCodes::InternalError, - "ReshardingDonorService observed CoordinatorStateEnum::kError"}); + stdx::lock_guard<Latch> lk(_mutex); + if (reshardingFields.getAbortReason()) { + auto status = getStatusFromAbortReason(reshardingFields); + _onAbortOrStepdown(lk, status); return; } - stdx::lock_guard<Latch> lk(_mutex); + auto coordinatorState = reshardingFields.getState(); if (coordinatorState >= CoordinatorStateEnum::kDecisionPersisted) { ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted); } @@ -601,7 +592,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: } ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: - __awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming( + _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_recipientDoc.getState() > RecipientStateEnum::kStrictConsistency) { return ExecutorFuture<void>(**executor, Status::OK()); @@ -653,14 +644,17 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( emplaceFetchTimestampIfExists(replacementDoc, std::move(fetchTimestamp)); emplaceAbortReasonIfExists(replacementDoc, std::move(abortReason)); + // For logging purposes. + auto oldState = _recipientDoc.getState(); auto newState = replacementDoc.getState(); + _updateRecipientDocument(std::move(replacementDoc)); _metrics()->setRecipientState(endState); LOGV2_INFO(5279506, "Transitioned resharding recipient state", "newState"_attr = RecipientState_serializer(newState), - "oldState"_attr = RecipientState_serializer(_recipientDoc.getState()), + "oldState"_attr = RecipientState_serializer(oldState), "ns"_attr = _recipientDoc.getNss(), "collectionUUID"_attr = _recipientDoc.getExistingUUID(), "reshardingUUID"_attr = _recipientDoc.get_id()); @@ -762,4 +756,23 @@ ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics() return ReshardingMetrics::get(cc().getServiceContext()); } +void ReshardingRecipientService::RecipientStateMachine::_onAbortOrStepdown(WithLock, + Status status) { + if (_oplogFetcherExecutor) { + _oplogFetcherExecutor->shutdown(); + } + + for (auto&& fetcher : _oplogFetchers) { + fetcher->interrupt(status); + } + + for (auto&& threadPool : _oplogApplierWorkers) { + threadPool->shutdown(); + } + + if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) { + _coordinatorHasDecisionPersisted.setError(status); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 544272b2b27..43cf216f367 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -145,7 +145,7 @@ private: ExecutorFuture<void> _awaitAllDonorsMirroringThenTransitionToStrictConsistency( const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - ExecutorFuture<void> __awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming( + ExecutorFuture<void> _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming( const std::shared_ptr<executor::ScopedTaskExecutor>& executor); void _renameTemporaryReshardingCollection(); @@ -176,6 +176,10 @@ private: ReshardingMetrics* _metrics() const; + // Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors + // (abort resharding). + void _onAbortOrStepdown(WithLock, Status status); + // The in-memory representation of the underlying document in // config.localReshardingOperations.recipient. ReshardingRecipientDocument _recipientDoc; diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 8aa5d563f76..61035669c3f 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -88,19 +88,23 @@ bool documentBelongsToMe(OperationContext* opCtx, DonorShardEntry makeDonorShard(ShardId shardId, DonorStateEnum donorState, - boost::optional<Timestamp> minFetchTimestamp) { + boost::optional<Timestamp> minFetchTimestamp, + boost::optional<Status> abortReason) { DonorShardEntry entry(shardId); entry.setState(donorState); emplaceMinFetchTimestampIfExists(entry, minFetchTimestamp); + emplaceAbortReasonIfExists(entry, abortReason); return entry; } RecipientShardEntry makeRecipientShard(ShardId shardId, RecipientStateEnum recipientState, - boost::optional<Timestamp> strictConsistencyTimestamp) { + boost::optional<Timestamp> strictConsistencyTimestamp, + boost::optional<Status> abortReason) { RecipientShardEntry entry(shardId); entry.setState(recipientState); emplaceStrictConsistencyTimestampIfExists(entry, strictConsistencyTimestamp); + emplaceAbortReasonIfExists(entry, abortReason); return entry; } diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index fec49c02a90..bbc36407ab1 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -144,11 +144,31 @@ void emplaceAbortReasonIfExists(ClassWithAbortReason& c, boost::optional<Status> } /** + * Extract the abortReason BSONObj into a status. + */ +template <class ClassWithAbortReason> +Status getStatusFromAbortReason(ClassWithAbortReason& c) { + invariant(c.getAbortReason()); + auto abortReasonObj = c.getAbortReason().get(); + BSONElement codeElement = abortReasonObj["code"]; + BSONElement errmsgElement = abortReasonObj["errmsg"]; + int code = codeElement.numberInt(); + std::string errmsg; + if (errmsgElement.type() == String) { + errmsg = errmsgElement.String(); + } else if (!errmsgElement.eoo()) { + errmsg = errmsgElement.toString(); + } + return Status(ErrorCodes::Error(code), errmsg, abortReasonObj); +} + +/** * Helper method to construct a DonorShardEntry with the fields specified. */ DonorShardEntry makeDonorShard(ShardId shardId, DonorStateEnum donorState, - boost::optional<Timestamp> minFetchTimestamp = boost::none); + boost::optional<Timestamp> minFetchTimestamp = boost::none, + boost::optional<Status> abortReason = boost::none); /** * Helper method to construct a RecipientShardEntry with the fields specified. @@ -156,7 +176,8 @@ DonorShardEntry makeDonorShard(ShardId shardId, RecipientShardEntry makeRecipientShard( ShardId shardId, RecipientStateEnum recipientState, - boost::optional<Timestamp> strictConsistencyTimestamp = boost::none); + boost::optional<Timestamp> strictConsistencyTimestamp = boost::none, + boost::optional<Status> abortReason = boost::none); /** * Gets the UUID for 'nss' from the 'cm' |