summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2021-02-19 20:12:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-23 22:46:57 +0000
commit6abf5d917e5f5aac880c3201c17bf6749ddd2f55 (patch)
tree5d9dd5ea2565686cb5269b34a3b15e2b86915efc
parentbd5979b18572c26ac44b36c8e5e32972c12a82d5 (diff)
downloadmongo-6abf5d917e5f5aac880c3201c17bf6749ddd2f55.tar.gz
SERVER-54000 Make errors propagate from the ReshardingCoordinator to participants
SERVER-54457 Resharding state transition log messages use correct 'oldState'
-rw-r--r--jstests/sharding/libs/resharding_test_fixture.js28
-rw-r--r--jstests/sharding/libs/resharding_test_util.js84
-rw-r--r--jstests/sharding/resharding_clones_duplicate_key.js68
-rw-r--r--jstests/sharding/resharding_indexBuilds.js75
-rw-r--r--jstests/sharding/resharding_recipient_broadcasts_abortReason.js76
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.cpp196
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.h17
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp76
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp27
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp157
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp73
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp75
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h6
-rw-r--r--src/mongo/db/s/resharding_util.cpp8
-rw-r--r--src/mongo/db/s/resharding_util.h25
18 files changed, 683 insertions, 321 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js
index dd50412379f..037346b66a7 100644
--- a/jstests/sharding/libs/resharding_test_fixture.js
+++ b/jstests/sharding/libs/resharding_test_fixture.js
@@ -64,6 +64,8 @@ var ReshardingTest = class {
/** @private */
this._pauseCoordinatorBeforeDecisionPersistedFailpoint = undefined;
/** @private */
+ this._pauseCoordinatorBeforeCompletionFailpoint = undefined;
+ /** @private */
this._reshardingThread = undefined;
/** @private */
this._isReshardingActive = false;
@@ -184,6 +186,8 @@ var ReshardingTest = class {
configureFailPoint(configPrimary, "reshardingPauseCoordinatorInSteadyState");
this._pauseCoordinatorBeforeDecisionPersistedFailpoint =
configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeDecisionPersisted");
+ this._pauseCoordinatorBeforeCompletionFailpoint =
+ configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeCompletion");
const commandDoneSignal = new CountDownLatch(1);
@@ -234,10 +238,13 @@ var ReshardingTest = class {
* Callers of interruptReshardingThread() will want to set this to ErrorCodes.Interrupted, for
* example.
*/
- withReshardingInBackground(
- {newShardKeyPattern, newChunks},
- duringReshardingFn = (tempNs) => {},
- {expectedErrorCode = ErrorCodes.OK, postCheckConsistencyFn = (tempNs) => {}} = {}) {
+ withReshardingInBackground({newShardKeyPattern, newChunks},
+ duringReshardingFn = (tempNs) => {},
+ {
+ expectedErrorCode = ErrorCodes.OK,
+ postCheckConsistencyFn = (tempNs) => {},
+ postAbortDecisionPersistedFn = () => {}
+ } = {}) {
const commandDoneSignal = this._startReshardingInBackgroundAndAllowCommandFailure(
{newShardKeyPattern, newChunks}, expectedErrorCode);
@@ -249,7 +256,8 @@ var ReshardingTest = class {
this._callFunctionSafely(() => duringReshardingFn(this._tempNs));
this._checkConsistencyAndPostState(expectedErrorCode,
- () => postCheckConsistencyFn(this._tempNs));
+ () => postCheckConsistencyFn(this._tempNs),
+ () => postAbortDecisionPersistedFn());
}
/** @private */
@@ -279,7 +287,8 @@ var ReshardingTest = class {
fn();
} catch (duringReshardingError) {
for (const fp of [this._pauseCoordinatorInSteadyStateFailpoint,
- this._pauseCoordinatorBeforeDecisionPersistedFailpoint]) {
+ this._pauseCoordinatorBeforeDecisionPersistedFailpoint,
+ this._pauseCoordinatorBeforeCompletionFailpoint]) {
try {
fp.off();
} catch (disableFailpointError) {
@@ -321,7 +330,9 @@ var ReshardingTest = class {
}
/** @private */
- _checkConsistencyAndPostState(expectedErrorCode, postCheckConsistencyFn = () => {}) {
+ _checkConsistencyAndPostState(expectedErrorCode,
+ postCheckConsistencyFn = () => {},
+ postAbortDecisionPersistedFn = () => {}) {
if (expectedErrorCode === ErrorCodes.OK) {
this._callFunctionSafely(() => {
// We use the reshardingPauseCoordinatorInSteadyState failpoint so that any
@@ -340,11 +351,14 @@ var ReshardingTest = class {
postCheckConsistencyFn();
this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off();
+ this._pauseCoordinatorBeforeCompletionFailpoint.off();
});
} else {
this._callFunctionSafely(() => {
this._pauseCoordinatorInSteadyStateFailpoint.off();
this._pauseCoordinatorBeforeDecisionPersistedFailpoint.off();
+ postAbortDecisionPersistedFn();
+ this._pauseCoordinatorBeforeCompletionFailpoint.off();
});
}
diff --git a/jstests/sharding/libs/resharding_test_util.js b/jstests/sharding/libs/resharding_test_util.js
new file mode 100644
index 00000000000..265613373e8
--- /dev/null
+++ b/jstests/sharding/libs/resharding_test_util.js
@@ -0,0 +1,84 @@
+/*
+ * Utilities for reshardCollection testing.
+ */
+var ReshardingTestUtil = (function() {
+ /**
+ * Returns whether the shard is 'done' aborting the resharding operation with
+ * 'abortReason.code'.
+ */
+ const shardDoneAbortingWithCode = function(shardEntry, errorCode) {
+ return shardEntry["abortReason"] && shardEntry["abortReason"]["code"] &&
+ shardEntry["abortReason"]["code"] === errorCode && shardEntry["state"] === "done";
+ };
+
+ /**
+ * Confirms all resharding participants eventually write their abortReason with state 'done' to
+ * the main CoordinatorDocument on the configsvr.
+ *
+ * Each participant shard should eventually report its abortReason to
+ * its corresponding entry on the CoordinatorDocument -
+ * config.reshardingOperations.recipientShards[shardName] for recipients and
+ * config.reshardingOperations.donorShards[shardName] for donors.
+ */
+ const assertAllParticipantsReportAbortToCoordinator = function(configsvr, nss, errCode) {
+ const reshardingOperationsCollection =
+ configsvr.getCollection("config.reshardingOperations");
+ assert.soon(
+ () => {
+ const coordinatorDoc = reshardingOperationsCollection.findOne({nss});
+ assert(coordinatorDoc);
+ // Iterate over both the recipientShards and donorShards and check that every shard
+ // entry is in state 'done' and contains an abortReason with the errCode.
+ for (const shardEntry
+ of [...coordinatorDoc.recipientShards, ...coordinatorDoc.donorShards]) {
+ if (!shardDoneAbortingWithCode(shardEntry, errCode)) {
+ return false;
+ }
+ }
+ return true;
+ },
+ () => {
+ return "Not all participants reported to the configsvr's CoordinatorDocument" +
+ " they are done aborting with code " + errCode + ". Got CoordinatorDocument: " +
+ tojson(reshardingOperationsCollection.find().toArray());
+ });
+ };
+
+ /**
+ * Asserts that the particpant eventually reports their abortReason with state 'done' locally
+ * to its ReshardingDonorDocument or ReshardingRecipientDocument, pending the participantType.
+ *
+ * Not exposed to users of ReshardingTestUtil, users should call assertRecipientAbortsLocally or
+ * assertDonorAbortsLocally instead.
+ */
+ const assertParticipantAbortsLocally = function(
+ shardConn, shardName, nss, abortReason, participantType) {
+ const localOpsCollection =
+ shardConn.getCollection(`config.localReshardingOperations.${participantType}`);
+
+ assert.soon(
+ () => {
+ return localOpsCollection.findOne(
+ {nss, state: "done", "abortReason.code": abortReason}) !== null;
+ },
+ () => {
+ return participantType + " shard " + shardName +
+ " never transitioned to an done state with abortReason " + abortReason + ": " +
+ tojson(localDonorOpsCollection.findOne());
+ });
+ };
+
+ const assertRecipientAbortsLocally = function(shardConn, shardName, nss, abortReason) {
+ return assertParticipantAbortsLocally(shardConn, shardName, nss, abortReason, "recipient");
+ };
+
+ const assertDonorAbortsLocally = function(shardConn, shardName, nss, abortReason) {
+ return assertParticipantAbortsLocally(shardConn, shardName, nss, abortReason, "donor");
+ };
+
+ return {
+ assertAllParticipantsReportAbortToCoordinator,
+ assertRecipientAbortsLocally,
+ assertDonorAbortsLocally
+ };
+})();
diff --git a/jstests/sharding/resharding_clones_duplicate_key.js b/jstests/sharding/resharding_clones_duplicate_key.js
index f127e7ac1fb..ad3ad2baa55 100644
--- a/jstests/sharding/resharding_clones_duplicate_key.js
+++ b/jstests/sharding/resharding_clones_duplicate_key.js
@@ -13,6 +13,7 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/discover_topology.js");
load("jstests/sharding/libs/resharding_test_fixture.js");
+load("jstests/sharding/libs/resharding_test_util.js");
const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 1});
reshardingTest.setup();
@@ -47,48 +48,6 @@ assert.commandWorked(inputCollection.insert([
{_id: 21, info: `moves from ${donorShardNames[1]}`, oldKey: 10, newKey: 10, pad: largeStr},
]));
-/**
- * Confirms the shard's abortReason and state are written locally in
- * config.localReshardingOperations.recipient, the shard's local ReshardingRecipientDocument.
- */
-function assertEventuallyErrorsLocally(shardConn, shardName) {
- const localRecipientOpsCollection =
- shardConn.getCollection("config.localReshardingOperations.recipient");
-
- assert.soon(
- () => {
- return localRecipientOpsCollection.findOne(
- {state: "error", "abortReason.code": ErrorCodes.DuplicateKey}) !== null;
- },
- () => {
- return "recipient shard " + shardName + " never transitioned to the error state: " +
- tojson(localRecipientOpsCollection.find().toArray());
- });
-}
-
-/**
- * Confirms the shard's abortReason and state are written in
- * config.reshardingOperations.recipientShards[shardName], the main CoordinatorDocument on the
- * configsvr.
- */
-function assertEventuallyErrorsInRecipientList(configsvrConn, shardName, nss) {
- const reshardingOperationsCollection =
- configsvrConn.getCollection("config.reshardingOperations");
- assert.soon(
- () => {
- return reshardingOperationsCollection.findOne({
- nss,
- recipientShards:
- {$elemMatch: {id: shardName, "abortReason.code": ErrorCodes.DuplicateKey}}
- }) !== null;
- },
- () => {
- return "recipient shard " + shardName + " never updated its entry in the coordinator" +
- " document to state kError and abortReason DuplicateKey: " +
- tojson(reshardingOperationsCollection.find().toArray());
- });
-}
-
const mongos = inputCollection.getMongo();
const recipientShardNames = reshardingTest.recipientShardNames;
@@ -98,29 +57,22 @@ const configsvr = new Mongo(topology.configsvr.nodes[0]);
const fp = configureFailPoint(recipient0, "removeRecipientDocFailpoint");
-// In the current implementation, the reshardCollection command won't ever complete if one of the
-// recipients encounter an unrecoverable error while cloning. To work around this limitation, we
-// verify the recipient shard transitioned itself into the "error" state as a result of the
-// duplicate key error during resharding's collection cloning.
-//
-// TODO SERVER-53792: Investigate removing interruptReshardingThread() from this test and instead
-// directly asserting that the reshardCollection command fails with an error without losing
-// intermediate checks regarding where the recipient communicates its error.
-reshardingTest.withReshardingInBackground( //
+reshardingTest.withReshardingInBackground(
{
newShardKeyPattern: {newKey: 1},
newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}],
},
() => {
// TODO SERVER-51696: Review if these checks can be made in a cpp unittest instead.
- assertEventuallyErrorsLocally(recipient0, recipientShardNames[0]);
- assertEventuallyErrorsInRecipientList(
- configsvr, recipientShardNames[0], inputCollection.getFullName());
- reshardingTest.interruptReshardingThread();
+ ReshardingTestUtil.assertRecipientAbortsLocally(recipient0,
+ recipientShardNames[0],
+ inputCollection.getFullName(),
+ ErrorCodes.DuplicateKey);
+ fp.off();
+ ReshardingTestUtil.assertAllParticipantsReportAbortToCoordinator(
+ configsvr, inputCollection.getFullName(), ErrorCodes.DuplicateKey);
},
- {expectedErrorCode: ErrorCodes.Interrupted});
-
-fp.off();
+ {expectedErrorCode: ErrorCodes.DuplicateKey});
const idleCursors = mongos.getDB("admin")
.aggregate([
diff --git a/jstests/sharding/resharding_indexBuilds.js b/jstests/sharding/resharding_indexBuilds.js
index 7de174326ca..9e4f4ae982c 100644
--- a/jstests/sharding/resharding_indexBuilds.js
+++ b/jstests/sharding/resharding_indexBuilds.js
@@ -12,6 +12,7 @@
load("jstests/libs/discover_topology.js");
load("jstests/sharding/libs/resharding_test_fixture.js");
+load("jstests/sharding/libs/resharding_test_util.js");
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
@@ -47,73 +48,27 @@ let createIndexFailpoint = configureFailPoint(donor0, "hangIndexBuildBeforeCommi
createIndexThread.start();
createIndexFailpoint.wait();
-/**
- * Confirms the shard's abortReason and state are written locally in
- * config.localReshardingOperations.donor, the shard's local ReshardingDonorDocument.
- */
-function assertEventuallyErrorsLocally(shardConn, shardName) {
- const localDonorOpsCollection =
- shardConn.getCollection("config.localReshardingOperations.donor");
-
- assert.soon(
- () => {
- return localDonorOpsCollection.findOne({
- state: "error",
- "abortReason.code": ErrorCodes.BackgroundOperationInProgressForNamespace
- }) !== null;
- },
- () => {
- return "donor shard " + shardName + " never transitioned to the error state: " +
- tojson(localDonorOpsCollection.findOne());
- });
-}
-
-/**
- * Confirms the shard's abortReason and state are written in
- * config.reshardingOperations.donorShards[shardName], the main CoordinatorDocument on the
- * configsvr.
- */
-function assertEventuallyErrorsInDonorList(configsvrConn, shardName, nss) {
- const reshardingOperationsCollection =
- configsvrConn.getCollection("config.reshardingOperations");
- assert.soon(
- () => {
- return reshardingOperationsCollection.findOne({
- nss,
- donorShards: {
- $elemMatch: {
- id: shardName,
- "abortReason.code": ErrorCodes.BackgroundOperationInProgressForNamespace
- }
- }
- }) !== null;
- },
- () => {
- return "donor shard " + shardName + " never updated its entry in the coordinator" +
- " document to state kError and abortReason BackgroundOperationInProgressForNamespace: " +
- tojson(reshardingOperationsCollection.find().toArray());
- });
-}
-
-// In the current implementation, the reshardCollection command won't propagate the error to the
-// caller if the recipient/donor shard encounters an unrecoverable error. To work around this
-// limitation, we verify the recipient shard transitioned itself into the "error" state as a result
-// of the duplicate key error during resharding's collection cloning.
-//
-// TODO SERVER-53792: Ensure the error propagated to the user indicates
-// ErrorCodes.BackgroundOperationInProgressForNamespace.
-reshardingTest.withReshardingInBackground( //
+reshardingTest.withReshardingInBackground(
{
newShardKeyPattern: {newKey: 1},
newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}],
},
() => {
// TODO SERVER-51696: Review if these checks can be made in a cpp unittest instead.
- assertEventuallyErrorsLocally(donor0, donorShardNames[0]);
- assertEventuallyErrorsInDonorList(
- configsvr, donorShardNames[0], inputCollection.getFullName());
+ ReshardingTestUtil.assertDonorAbortsLocally(
+ donor0,
+ donorShardNames[0],
+ inputCollection.getFullName(),
+ ErrorCodes.BackgroundOperationInProgressForNamespace);
+
+ // Note: even though the recipient state machine does not exist by the time the donor
+ // errors, recipients should still acknowledge they saw the coordinator's abort.
+ ReshardingTestUtil.assertAllParticipantsReportAbortToCoordinator(
+ configsvr,
+ inputCollection.getFullName(),
+ ErrorCodes.BackgroundOperationInProgressForNamespace);
},
- {expectedErrorCode: ErrorCodes.InternalError});
+ {expectedErrorCode: ErrorCodes.BackgroundOperationInProgressForNamespace});
// Resume index build.
createIndexFailpoint.off();
diff --git a/jstests/sharding/resharding_recipient_broadcasts_abortReason.js b/jstests/sharding/resharding_recipient_broadcasts_abortReason.js
new file mode 100644
index 00000000000..db76cf0bab6
--- /dev/null
+++ b/jstests/sharding/resharding_recipient_broadcasts_abortReason.js
@@ -0,0 +1,76 @@
+/**
+ * Tests that when a donor shard encounters an unrecoverable error, the error gets propagated from
+ * the failing donor shard, to the coordinator, and then to the remaining participant shards.
+ *
+ * @tags: [
+ * requires_fcv_49,
+ * uses_atclustertime,
+ * __TEMPORARILY_DISABLED__
+ * ]
+ *
+ * TODO SERVER-54474: Re-enable this test once donors engaged in the critical section are able to
+ * abort resharding locally after the coordinator transitions to an error state.
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/discover_topology.js");
+load("jstests/sharding/libs/resharding_test_fixture.js");
+load("jstests/sharding/libs/resharding_test_util.js");
+
+const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2});
+
+reshardingTest.setup();
+
+const donorShardNames = reshardingTest.donorShardNames;
+const inputCollection = reshardingTest.createShardedCollection({
+ ns: "reshardingDb.coll",
+ shardKeyPattern: {oldKey: 1},
+ chunks: [
+ {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
+ {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]},
+ ],
+});
+
+assert.commandWorked(inputCollection.insert([
+ {_id: "stays on shard0", oldKey: -10, newKey: -10},
+ {_id: "moves to shard0", oldKey: 10, newKey: -10},
+ {_id: "moves to shard1", oldKey: -10, newKey: 10},
+ {_id: "stays on shard1", oldKey: 10, newKey: 10},
+]));
+
+const mongos = inputCollection.getMongo();
+const recipientShardNames = reshardingTest.recipientShardNames;
+
+const topology = DiscoverTopology.findConnectedNodes(mongos);
+const donor_host = topology.shards[donorShardNames[0]].primary;
+const donor0 = new Mongo(donor_host);
+const configsvr = new Mongo(topology.configsvr.nodes[0]);
+
+const fp = configureFailPoint(donor0, "reshardingDonorFailsBeforePreparingToMirror");
+
+reshardingTest.withReshardingInBackground(
+ {
+ newShardKeyPattern: {newKey: 1},
+ newChunks: [
+ {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]},
+ {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]},
+ ],
+ },
+ (tempNs) => {
+ // TODO SERVER-51696: Review if these checks can be made in a cpp unittest instead.
+ // First, wait for the shard to encounter an unrecoverable error and persist it locally.
+ ReshardingTestUtil.assertDonorAbortsLocally(
+ donor0, donorShardNames[0], inputCollection.getFullName(), ErrorCodes.InternalError);
+ },
+ {
+ expectedErrorCode: ErrorCodes.InternalError,
+ postAbortDecisionPersistedFn: () => {
+ ReshardingTestUtil.assertAllParticipantsReportAbortToCoordinator(
+ configsvr, inputCollection.getFullName(), ErrorCodes.InternalError);
+ }
+ });
+
+fp.off();
+reshardingTest.teardown();
+})();
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
index 5c8b72bfefe..7329a5cea7d 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
@@ -34,6 +34,7 @@
#include <fmt/format.h>
#include "mongo/db/s/resharding/coordinator_document_gen.h"
+#include "mongo/db/s/resharding_util.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -59,87 +60,116 @@ const std::vector<RecipientShardEntry>& getParticipants(
}
/**
- * Generates error an response indicating which participant was found to be in state kError.
- */
-Status generateErrorStatus(WithLock, const DonorShardEntry& donor) {
- return {ErrorCodes::InternalError,
- "Donor shard {} is in an error state"_format(donor.getId().toString())};
-}
-Status generateErrorStatus(WithLock, const RecipientShardEntry& recipient) {
- return {ErrorCodes::InternalError,
- "Recipient shard {} is in an error state"_format(recipient.getId().toString())};
-}
-
-/**
- * Returns true if the participants all have a (non-error) state greater than or equal to
- * expectedState, false otherwise.
- *
- * If any participant is in state kError, returns an error status.
+ * Returns true if all participants are in a state greater than or equal to the expectedState.
*/
template <class TState, class TParticipant>
-StatusWith<bool> allParticipantsInStateGTE(WithLock lk,
- TState expectedState,
- const std::vector<TParticipant>& participants) {
- bool allInStateGTE = true;
+bool allParticipantsInStateGTE(WithLock lk,
+ TState expectedState,
+ const std::vector<TParticipant>& participants) {
for (const auto& shard : participants) {
- auto state = shard.getState();
- if (state != expectedState) {
- if (state == TState::kError) {
- return generateErrorStatus(lk, shard);
- }
-
- if (allInStateGTE) {
- // Ensure allInStateGTE never goes from false -> true. It's possible that while one
- // participant has not yet reached the expectedState, another participant is in a
- // state greater than the expectedState.
- //
- // Instead of early returning, continue loop in case another participant reported an
- // error.
- allInStateGTE = state > expectedState;
- }
+ if (shard.getState() < expectedState) {
+ return false;
}
}
- return allInStateGTE;
+ return true;
}
/**
- * Returns true if the state transition is incomplete and the promise cannot yet be fulfilled. This
- * includes if one or more of the participants report state kError. In the error case, an error is
- * set on the promise.
- *
- * Otherwise returns false and fulfills the promise if it is not already.
+ * Returns whether or not all relevant shards have completed their transitions into the
+ * expectedState. If they have, ensures the promise is fulfilled.
*/
-template <class T>
-bool stateTransitionIncomplete(WithLock lk,
+template <class TState>
+bool stateTransistionsComplete(WithLock lk,
SharedPromise<ReshardingCoordinatorDocument>& sp,
- T expectedState,
+ TState expectedState,
const ReshardingCoordinatorDocument& updatedStateDoc) {
if (sp.getFuture().isReady()) {
// Ensure promise is not fulfilled twice.
- return false;
+ return true;
}
auto participants = getParticipants(lk, expectedState, updatedStateDoc);
- auto swAllParticipantsGTE = allParticipantsInStateGTE(lk, expectedState, participants);
- if (!swAllParticipantsGTE.isOK()) {
- // By returning true, onReshardingParticipantTransition will not try to fulfill any more of
- // the promises.
- // ReshardingCoordinator::run() waits on the promises from the ReshardingCoordinatorObserver
- // in the same order they're fulfilled by onReshardingParticipantTransition(). If even one
- // promise in the chain errors, ReshardingCoordinator::run() will jump to onError and not
- // wait for the remaining promises to be fulfilled.
- sp.setError(swAllParticipantsGTE.getStatus());
+ auto allShardsTransitioned = allParticipantsInStateGTE(lk, expectedState, participants);
+ if (allShardsTransitioned) {
+ sp.emplaceValue(updatedStateDoc);
return true;
}
+ return false;
+}
- if (swAllParticipantsGTE.getValue()) {
- // All participants are in a state greater than or equal to expected state.
- sp.emplaceValue(updatedStateDoc);
- return false;
+/**
+ * Appends context regarding the source of the abortReason.
+ */
+template <class TParticipant>
+Status getStatusFromAbortReasonWithShardInfo(const TParticipant& participant,
+ StringData participantType) {
+ return getStatusFromAbortReason(participant)
+ .withContext("{} shard {} reached an unrecoverable error"_format(
+ participantType, participant.getId().toString()));
+}
+
+/**
+ * If neither the coordinator nor participants have encountered an unrecoverable error, returns
+ * boost::none.
+ *
+ * Otherwise, returns the abortReason reported by either the coordinator or one of the participants.
+ */
+boost::optional<Status> getAbortReasonIfExists(
+ const ReshardingCoordinatorDocument& updatedStateDoc) {
+ if (updatedStateDoc.getAbortReason()) {
+ // Note: the absence of context specifying which shard the abortReason originates from
+ // implies the abortReason originates from the coordinator.
+ return getStatusFromAbortReason(updatedStateDoc);
+ }
+
+ for (const auto& donorShard : updatedStateDoc.getDonorShards()) {
+ if (donorShard.getState() == DonorStateEnum::kError) {
+ return getStatusFromAbortReasonWithShardInfo(donorShard, "Donor"_sd);
+ }
+ }
+
+ for (const auto& recipientShard : updatedStateDoc.getRecipientShards()) {
+ if (recipientShard.getState() == RecipientStateEnum::kError) {
+ return getStatusFromAbortReasonWithShardInfo(recipientShard, "Recipient"_sd);
+ }
+ }
+
+ return boost::none;
+}
+
+template <class TState, class TParticipant>
+bool allParticipantsDoneWithAbortReason(WithLock lk,
+ TState expectedState,
+ const std::vector<TParticipant>& participants) {
+ for (const auto& shard : participants) {
+ if (!(shard.getState() == expectedState && shard.getAbortReason().is_initialized())) {
+ return false;
+ }
}
return true;
}
+/**
+ * Fulfills allParticipantsDoneAbortingSp if all participants have reported to the coordinator that
+ * they have finished aborting locally.
+ */
+void checkAllParticipantsAborted(WithLock lk,
+ SharedPromise<void>& allParticipantsDoneAbortingSp,
+ const ReshardingCoordinatorDocument& updatedStateDoc) {
+ if (allParticipantsDoneAbortingSp.getFuture().isReady()) {
+ return;
+ }
+
+ bool allDonorsAborted = allParticipantsDoneWithAbortReason(
+ lk, DonorStateEnum::kDone, updatedStateDoc.getDonorShards());
+ bool allRecipientsAborted = allParticipantsDoneWithAbortReason(
+ lk, RecipientStateEnum::kDone, updatedStateDoc.getRecipientShards());
+
+ if (allDonorsAborted && allRecipientsAborted) {
+ allParticipantsDoneAbortingSp.emplaceValue();
+ }
+}
+
} // namespace
ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default;
@@ -156,41 +186,46 @@ ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() {
void ReshardingCoordinatorObserver::onReshardingParticipantTransition(
const ReshardingCoordinatorDocument& updatedStateDoc) {
-
stdx::lock_guard<Latch> lk(_mutex);
- if (stateTransitionIncomplete(lk,
- _allDonorsReportedMinFetchTimestamp,
- DonorStateEnum::kDonatingInitialData,
- updatedStateDoc)) {
+ if (auto abortReason = getAbortReasonIfExists(updatedStateDoc)) {
+ _onAbortOrStepdown(lk, abortReason.get());
+ checkAllParticipantsAborted(lk, _allParticipantsDoneAborting, updatedStateDoc);
return;
}
- if (stateTransitionIncomplete(
+ if (!stateTransistionsComplete(lk,
+ _allDonorsReportedMinFetchTimestamp,
+ DonorStateEnum::kDonatingInitialData,
+ updatedStateDoc)) {
+ return;
+ }
+
+ if (!stateTransistionsComplete(
lk, _allRecipientsFinishedCloning, RecipientStateEnum::kApplying, updatedStateDoc)) {
return;
}
- if (stateTransitionIncomplete(lk,
- _allRecipientsFinishedApplying,
- RecipientStateEnum::kSteadyState,
- updatedStateDoc)) {
+ if (!stateTransistionsComplete(lk,
+ _allRecipientsFinishedApplying,
+ RecipientStateEnum::kSteadyState,
+ updatedStateDoc)) {
return;
}
- if (stateTransitionIncomplete(lk,
- _allRecipientsReportedStrictConsistencyTimestamp,
- RecipientStateEnum::kStrictConsistency,
- updatedStateDoc)) {
+ if (!stateTransistionsComplete(lk,
+ _allRecipientsReportedStrictConsistencyTimestamp,
+ RecipientStateEnum::kStrictConsistency,
+ updatedStateDoc)) {
return;
}
- if (stateTransitionIncomplete(
+ if (!stateTransistionsComplete(
lk, _allRecipientsRenamedCollection, RecipientStateEnum::kDone, updatedStateDoc)) {
return;
}
- if (stateTransitionIncomplete(
+ if (!stateTransistionsComplete(
lk, _allDonorsDroppedOriginalCollection, DonorStateEnum::kDone, updatedStateDoc)) {
return;
}
@@ -226,9 +261,20 @@ ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() {
return _allRecipientsRenamedCollection.getFuture();
}
+SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllParticipantsDoneAborting() {
+ return _allParticipantsDoneAborting.getFuture();
+}
+
void ReshardingCoordinatorObserver::interrupt(Status status) {
- stdx::lock_guard<Latch> lg(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
+ _onAbortOrStepdown(lk, status);
+
+ if (!_allParticipantsDoneAborting.getFuture().isReady()) {
+ _allParticipantsDoneAborting.setError(status);
+ }
+}
+void ReshardingCoordinatorObserver::_onAbortOrStepdown(WithLock, Status status) {
if (!_allDonorsReportedMinFetchTimestamp.getFuture().isReady()) {
_allDonorsReportedMinFetchTimestamp.setError(status);
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
index 27a64fa53be..50d4c2fdd5e 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
@@ -104,11 +104,23 @@ public:
SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllDonorsDroppedOriginalCollection();
/**
+ * Fulfills the '_allParticipantsDoneAborting' promise when the last recipient or donor writes
+ * that it is in 'kDone' with an abortReason.
+ */
+ SharedSemiFuture<void> awaitAllParticipantsDoneAborting();
+
+ /**
* Sets errors on any promises that have not yet been fulfilled.
*/
void interrupt(Status status);
private:
+ /**
+ * Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors
+ * (abort resharding).
+ */
+ void _onAbortOrStepdown(WithLock, Status status);
+
// Protects the state below
Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorObserver::_mutex");
@@ -126,6 +138,9 @@ private:
* {_allRecipientsReportedStrictConsistencyTimestamp, RecipientStateEnum::kStrictConsistency}
* {_allRecipientsRenamedCollection, RecipientStateEnum::kDone}
* {_allDonorsDroppedOriginalCollection, DonorStateEnum::kDone}
+ * {_allParticipantsDoneAborting,
+ * DonorStateEnum::kDone with abortReason AND
+ * RecipientStateEnum::kDone with abortReason}
*/
SharedPromise<ReshardingCoordinatorDocument> _allDonorsReportedMinFetchTimestamp;
@@ -139,6 +154,8 @@ private:
SharedPromise<ReshardingCoordinatorDocument> _allRecipientsRenamedCollection;
SharedPromise<ReshardingCoordinatorDocument> _allDonorsDroppedOriginalCollection;
+
+ SharedPromise<void> _allParticipantsDoneAborting;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
index 1b57c5f02b5..3f6c3c19272 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
@@ -45,25 +45,32 @@ namespace {
class ReshardingCoordinatorObserverTest : public unittest::Test {
protected:
ReshardingCoordinatorDocument makeCoordinatorDocWithRecipientsAndDonors(
- std::vector<RecipientShardEntry>& recipients, std::vector<DonorShardEntry>& donors) {
+ std::vector<RecipientShardEntry>& recipients,
+ std::vector<DonorShardEntry>& donors,
+ boost::optional<Status> abortReason = boost::none) {
auto coordinatorDoc = ReshardingCoordinatorDocument();
coordinatorDoc.setRecipientShards(std::move(recipients));
coordinatorDoc.setDonorShards(std::move(donors));
+ emplaceAbortReasonIfExists(coordinatorDoc, abortReason);
return coordinatorDoc;
}
std::vector<DonorShardEntry> makeMockDonorsInState(
- DonorStateEnum donorState, boost::optional<Timestamp> timestamp = boost::none) {
- return {makeDonorShard(ShardId{"s1"}, donorState, timestamp),
- makeDonorShard(ShardId{"s2"}, donorState, timestamp),
- makeDonorShard(ShardId{"s3"}, donorState, timestamp)};
+ DonorStateEnum donorState,
+ boost::optional<Timestamp> timestamp = boost::none,
+ boost::optional<Status> abortReason = boost::none) {
+ return {makeDonorShard(ShardId{"s1"}, donorState, timestamp, abortReason),
+ makeDonorShard(ShardId{"s2"}, donorState, timestamp, abortReason),
+ makeDonorShard(ShardId{"s3"}, donorState, timestamp, abortReason)};
}
std::vector<RecipientShardEntry> makeMockRecipientsInState(
- RecipientStateEnum recipientState, boost::optional<Timestamp> timestamp = boost::none) {
- return {makeRecipientShard(ShardId{"s1"}, recipientState, timestamp),
- makeRecipientShard(ShardId{"s2"}, recipientState, timestamp),
- makeRecipientShard(ShardId{"s3"}, recipientState, timestamp)};
+ RecipientStateEnum recipientState,
+ boost::optional<Timestamp> timestamp = boost::none,
+ boost::optional<Status> abortReason = boost::none) {
+ return {makeRecipientShard(ShardId{"s1"}, recipientState, timestamp, abortReason),
+ makeRecipientShard(ShardId{"s2"}, recipientState, timestamp, abortReason),
+ makeRecipientShard(ShardId{"s3"}, recipientState, timestamp, abortReason)};
}
};
@@ -133,12 +140,15 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) {
// By default, all donors should be kDonatingInitialData at this stage.
auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1));
- std::vector<RecipientShardEntry> recipientShards0{
+ std::vector<RecipientShardEntry> recipientShards{
{makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
- {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kError)},
+ {makeRecipientShard(ShardId{"s2"},
+ RecipientStateEnum::kError,
+ boost::none, // timestamp
+ Status{ErrorCodes::InternalError, "We gotta abort"})},
{makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}};
- auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards);
- reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
+ auto coordinatorDoc = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards);
+ reshardingObserver->onReshardingParticipantTransition(coordinatorDoc);
auto resp = fut.getNoThrow();
// If any participant is in state kError, regardless of other participants' states, an error
@@ -148,6 +158,46 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) {
reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"});
}
+TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) {
+ auto reshardingObserver = std::make_shared<ReshardingCoordinatorObserver>();
+
+ auto fut = reshardingObserver->awaitAllParticipantsDoneAborting();
+ ASSERT_FALSE(fut.isReady());
+
+ auto abortReason = Status{ErrorCodes::InternalError, "We gotta abort"};
+
+ // All participants have an abortReason, but not all are in state kDone yet.
+ auto donorShards = makeMockDonorsInState(DonorStateEnum::kDone, Timestamp(1, 1), abortReason);
+ std::vector<RecipientShardEntry> recipientShards0{
+ {makeRecipientShard(ShardId{"s1"},
+ RecipientStateEnum::kError,
+ boost::none, // timestamp
+ abortReason)},
+ {makeRecipientShard(ShardId{"s2"},
+ RecipientStateEnum::kDone,
+ boost::none, // timestamp
+ abortReason)},
+ {makeRecipientShard(ShardId{"s3"},
+ RecipientStateEnum::kDone,
+ boost::none, // timestamp
+ abortReason)}};
+ auto coordinatorDoc0 =
+ makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards, abortReason);
+ reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
+ ASSERT_FALSE(fut.isReady());
+
+ // All participants in state kDone with abortReason.
+ auto recipientShards1 =
+ makeMockRecipientsInState(RecipientStateEnum::kDone, boost::none, abortReason);
+ auto coordinatorDoc1 =
+ makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards, abortReason);
+ reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
+ ASSERT_TRUE(fut.isReady());
+
+ // Since all other promises should be fulfilled with an error, no need to interrupt the observer
+ // before finishing this test case.
+}
+
/**
* Confirms that if one recipient is more than one state behind the other, the promise does not get
* fulfilled early.
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 97ebb4407fe..b31eebd66a0 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -62,6 +62,7 @@ using namespace fmt::literals;
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorInSteadyState);
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeDecisionPersisted);
+MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCompletion)
void assertNumDocsModifiedMatchesExpected(const BatchedCommandRequest& request,
const BSONObj& response,
@@ -502,7 +503,7 @@ stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNotifyEnum> notifyForSta
{CoordinatorStateEnum::kMirroring, ParticipantsToNotifyEnum::kDonors},
{CoordinatorStateEnum::kDecisionPersisted, ParticipantsToNotifyEnum::kNone},
{CoordinatorStateEnum::kDone, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kError, ParticipantsToNotifyEnum::kNone},
+ {CoordinatorStateEnum::kError, ParticipantsToNotifyEnum::kDonors},
};
/**
@@ -787,9 +788,7 @@ void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(
// Run updates to config.reshardingOperations and config.collections in a transaction
auto nextState = coordinatorDoc.getState();
invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end());
- // TODO SERVER-51800 Remove special casing for kError.
- invariant(nextState == CoordinatorStateEnum::kError ||
- notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone,
+ invariant(notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone,
"failed to write state transition with nextState {}"_format(
CoordinatorState_serializer(nextState)));
@@ -813,13 +812,6 @@ void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(
}
};
- // TODO SERVER-51800 Remove special casing for kError.
- if (nextState == CoordinatorStateEnum::kError) {
- executeStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, std::move(changeMetadataFunc));
- return;
- }
-
bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
opCtx, coordinatorDoc, std::move(changeMetadataFunc));
}
@@ -957,7 +949,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
// keep 'this' pointer alive for the remaining callbacks.
return _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(executor);
})
- .onError([this, self = shared_from_this(), executor](Status status) {
+ .onError([this, self = shared_from_this(), token, executor](Status status) {
stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here.
@@ -975,17 +967,24 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
_updateCoordinatorDocStateAndCatalogEntries(
CoordinatorStateEnum::kError, _coordinatorDoc, boost::none, status);
- // TODO wait for donors and recipients to abort the operation and clean up state
- _tellAllRecipientsToRefresh(executor);
_tellAllParticipantsToRefresh(createFlushRoutingTableCacheUpdatesCommand(nss),
executor);
+ // Wait for all participants to acknowledge the operation reached an unrecoverable
+ // error.
+ future_util::withCancelation(
+ _reshardingCoordinatorObserver->awaitAllParticipantsDoneAborting(), token)
+ .get();
+
return status;
})
.onCompletion([this, self = shared_from_this()](Status status) {
// Notify `ReshardingMetrics` as the operation is now complete for external observers.
markCompleted(status);
+ auto opCtx = cc().makeOperationContext();
+ reshardingPauseCoordinatorBeforeCompletion.pauseWhileSet(opCtx.get());
+
stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here.
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index f86d51a40c2..6797b08376a 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -775,6 +775,10 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionToErrorSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch);
+ insertChunkAndZoneEntries(
+ makeChunks(_originalNss, OID::gen(), _oldShardKey, std::vector{OID::gen(), OID::gen()}),
+ makeZones(_originalNss, _oldShardKey));
+
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
expectedCoordinatorDoc.setState(CoordinatorStateEnum::kError);
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index 97d4f8e9bf9..99872599e00 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -27,12 +27,19 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include <fmt/format.h>
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/grid.h"
+
namespace mongo {
namespace resharding {
@@ -62,6 +69,76 @@ void createReshardingStateMachine(OperationContext* opCtx, const ReshardingDocum
StateMachine::getOrCreate(opCtx, service, doc.toBSON());
}
+/**
+ * Acknowledges to the coordinator that all abort work is done on the donor. Since the
+ * ReshardingDonorService doesn't exist, there isn't any work to do.
+ *
+ * TODO SERVER-54704: Remove this method, it should no longer be needed after SERVER-54513 is
+ * completed.
+ */
+void processAbortReasonNoDonorMachine(OperationContext* opCtx,
+ const ReshardingFields& reshardingFields,
+ const NamespaceString& nss) {
+ {
+ Lock::ResourceLock rstl(
+ opCtx->lockState(), resourceIdReplicationStateTransitionLock, MODE_IX);
+ auto* const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->canAcceptWritesFor(opCtx, nss)) {
+ // no-op when node is not primary.
+ return;
+ }
+ }
+
+ auto abortReason = reshardingFields.getAbortReason();
+ auto shardId = ShardingState::get(opCtx)->shardId();
+ BSONObjBuilder updateBuilder;
+ updateBuilder.append("donorShards.$.state", DonorState_serializer(DonorStateEnum::kDone));
+ updateBuilder.append("donorShards.$.abortReason", *abortReason);
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument(
+ opCtx,
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ BSON("_id" << reshardingFields.getUuid() << "donorShards.id" << shardId),
+ BSON("$set" << updateBuilder.done()),
+ false /* upsert */,
+ ShardingCatalogClient::kMajorityWriteConcern));
+}
+
+/**
+ * Acknowledges to the coordinator that all abort work is done on the recipient. Since the
+ * ReshardingRecipientService doesn't exist, there isn't any work to do.
+ *
+ * TODO SERVER-54704: Remove this method, it should no longer be needed after SERVER-54513 is
+ * completed.
+ */
+void processAbortReasonNoRecipientMachine(OperationContext* opCtx,
+ const ReshardingFields& reshardingFields,
+ const NamespaceString& nss) {
+
+ {
+ Lock::ResourceLock rstl(
+ opCtx->lockState(), resourceIdReplicationStateTransitionLock, MODE_IX);
+ auto* const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->canAcceptWritesFor(opCtx, nss)) {
+ // no-op when node is not primary.
+ return;
+ }
+ }
+
+ auto abortReason = reshardingFields.getAbortReason();
+ auto shardId = ShardingState::get(opCtx)->shardId();
+ BSONObjBuilder updateBuilder;
+ updateBuilder.append("recipientShards.$.state",
+ RecipientState_serializer(RecipientStateEnum::kDone));
+ updateBuilder.append("recipientShards.$.abortReason", *abortReason);
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument(
+ opCtx,
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ BSON("_id" << reshardingFields.getUuid() << "recipientShards.id" << shardId),
+ BSON("$set" << updateBuilder.done()),
+ false /* upsert */,
+ ShardingCatalogClient::kMajorityWriteConcern));
+}
+
/*
* Either constructs a new ReshardingDonorStateMachine with 'reshardingFields' or passes
* 'reshardingFields' to an already-existing ReshardingDonorStateMachine.
@@ -78,6 +155,11 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx,
return;
}
+ if (reshardingFields.getAbortReason()) {
+ processAbortReasonNoDonorMachine(opCtx, reshardingFields, nss);
+ return;
+ }
+
// If a resharding operation is BEFORE state kPreparingToDonate, then the config.collections
// entry won't have yet been created for the temporary resharding collection. The
// DonorStateMachine refreshes the temporary resharding collection immediately after being
@@ -109,6 +191,7 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx,
* 'reshardingFields' to an already-existing ReshardingRecipientStateMachine.
*/
void processReshardingFieldsForRecipientCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
const CollectionMetadata& metadata,
const ReshardingFields& reshardingFields) {
if (auto recipientStateMachine = tryGetReshardingStateMachine<ReshardingRecipientService,
@@ -119,6 +202,11 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx,
return;
}
+ if (reshardingFields.getAbortReason()) {
+ processAbortReasonNoRecipientMachine(opCtx, reshardingFields, nss);
+ return;
+ }
+
// If a resharding operation is past state kCloning but does not currently have a recipient
// document in-memory, this means that the document will be recovered by the
// ReshardingRecipientService, and at that time the latest instance of 'reshardingFields'
@@ -146,6 +234,38 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx,
ReshardingRecipientDocument>(opCtx, recipientDoc);
}
+/**
+ * Checks that presence/absence of 'donorShards' and 'recipientShards' fields in the
+ * reshardingFields are consistent with the 'state' field.
+ */
+void verifyValidReshardingFields(const ReshardingFields& reshardingFields) {
+ auto coordinatorState = reshardingFields.getState();
+
+ if (coordinatorState < CoordinatorStateEnum::kDecisionPersisted) {
+ // Prior to the state CoordinatorStateEnum::kDecisionPersisted, only the source
+ // collection's config.collections entry should have donorFields, and only the
+ // temporary resharding collection's entry should have recipientFields.
+ uassert(5274201,
+ fmt::format("reshardingFields must contain either donorFields or recipientFields "
+ "(and not both) when the "
+ "coordinator is in state {}. Got reshardingFields {}",
+ CoordinatorState_serializer(reshardingFields.getState()),
+ reshardingFields.toBSON().toString()),
+ reshardingFields.getDonorFields().is_initialized() ^
+ reshardingFields.getRecipientFields().is_initialized());
+ } else {
+ // At and after state CoordinatorStateEnum::kDecisionPersisted, the temporary
+ // resharding collection's config.collections entry has been removed, and so the
+ // source collection's entry should have both donorFields and recipientFields.
+ uassert(5274202,
+ fmt::format("reshardingFields must contain both donorFields and recipientFields "
+ "when the coordinator's state is greater than or equal to "
+ "CoordinatorStateEnum::kDecisionPersisted. Got reshardingFields {}",
+ reshardingFields.toBSON().toString()),
+ reshardingFields.getDonorFields() && reshardingFields.getRecipientFields());
+ }
+}
+
} // namespace
ReshardingDonorDocument constructDonorDocumentFromReshardingFields(
@@ -192,41 +312,22 @@ void processReshardingFieldsForCollection(OperationContext* opCtx,
const NamespaceString& nss,
const CollectionMetadata& metadata,
const ReshardingFields& reshardingFields) {
- auto coordinatorState = reshardingFields.getState();
- if (coordinatorState != CoordinatorStateEnum::kError) {
- if (coordinatorState < CoordinatorStateEnum::kDecisionPersisted) {
- // Prior to the state CoordinatorStateEnum::kDecisionPersisted, only the source
- // collection's config.collections entry should have donorFields, and only the
- // temporary resharding collection's entry should have recipientFields.
- uassert(
- 5274201,
- fmt::format("reshardingFields must contain either donorFields or recipientFields "
- "(and not both) when the "
- "coordinator is in state {}. Got reshardingFields {}",
- CoordinatorState_serializer(reshardingFields.getState()),
- reshardingFields.toBSON().toString()),
- reshardingFields.getDonorFields().is_initialized() ^
- reshardingFields.getRecipientFields().is_initialized());
- } else {
- // At and after state CoordinatorStateEnum::kDecisionPersisted, the temporary
- // resharding collection's config.collections entry has been removed, and so the
- // source collection's entry should have both donorFields and recipientFields.
- uassert(
- 5274202,
- fmt::format("reshardingFields must contain both donorFields and recipientFields "
- "when the coordinator's state is greater than or equal to "
- "CoordinatorStateEnum::kDecisionPersisted. Got reshardingFields {}",
- reshardingFields.toBSON().toString()),
- reshardingFields.getDonorFields() && reshardingFields.getRecipientFields());
- }
+ if (reshardingFields.getAbortReason()) {
+ // The coordinator encountered an unrecoverable error, both donors and recipients should be
+ // made aware.
+ processReshardingFieldsForDonorCollection(opCtx, nss, metadata, reshardingFields);
+ processReshardingFieldsForRecipientCollection(opCtx, nss, metadata, reshardingFields);
+ return;
}
+ verifyValidReshardingFields(reshardingFields);
+
if (reshardingFields.getDonorFields()) {
processReshardingFieldsForDonorCollection(opCtx, nss, metadata, reshardingFields);
}
if (reshardingFields.getRecipientFields()) {
- processReshardingFieldsForRecipientCollection(opCtx, metadata, reshardingFields);
+ processReshardingFieldsForRecipientCollection(opCtx, nss, metadata, reshardingFields);
}
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 08de9716f52..816f2084fb4 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -53,6 +53,8 @@
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsBeforePreparingToMirror);
+
using namespace fmt::literals;
namespace {
@@ -173,6 +175,11 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
"reshardingId"_attr = _id,
"error"_attr = status);
_transitionStateAndUpdateCoordinator(DonorStateEnum::kError, boost::none, status);
+
+ // TODO SERVER-52838: Ensure all local collections that may have been created for
+ // resharding are removed, with the exception of the ReshardingDonorDocument, before
+ // transitioning to kDone.
+ _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone, boost::none, status);
return status;
})
.onCompletion([this, self = shared_from_this()](Status status) {
@@ -198,23 +205,8 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {
// Resolve any unresolved promises to avoid hanging.
- stdx::lock_guard<Latch> lg(_mutex);
- if (!_allRecipientsDoneCloning.getFuture().isReady()) {
- _allRecipientsDoneCloning.setError(status);
- }
-
- if (!_allRecipientsDoneApplying.getFuture().isReady()) {
- _allRecipientsDoneApplying.setError(status);
- }
-
- if (!_finalOplogEntriesWritten.getFuture().isReady()) {
- _finalOplogEntriesWritten.setError(status);
- }
-
- if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) {
- _coordinatorHasDecisionPersisted.setError(status);
- }
-
+ stdx::lock_guard<Latch> lk(_mutex);
+ _onAbortOrStepdown(lk, status);
if (!_completionPromise.getFuture().isReady()) {
_completionPromise.setError(status);
}
@@ -233,17 +225,14 @@ boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCur
void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
OperationContext* opCtx, const TypeCollectionReshardingFields& reshardingFields) {
- auto coordinatorState = reshardingFields.getState();
- if (coordinatorState == CoordinatorStateEnum::kError) {
- // TODO SERVER-52838: Investigate if we want to have a special error code so the donor knows
- // when it has recieved the error from the coordinator rather than needing to report an
- // error to the coordinator.
- interrupt({ErrorCodes::InternalError,
- "ReshardingDonorService observed CoordinatorStateEnum::kError"});
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (reshardingFields.getAbortReason()) {
+ auto status = getStatusFromAbortReason(reshardingFields);
+ _onAbortOrStepdown(lk, status);
return;
}
- stdx::lock_guard<Latch> lk(_mutex);
+ auto coordinatorState = reshardingFields.getState();
if (coordinatorState >= CoordinatorStateEnum::kApplying) {
ensureFulfilledPromise(lk, _allRecipientsDoneCloning);
}
@@ -322,9 +311,14 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
return ExecutorFuture<void>(**executor, Status::OK());
}
- return _allRecipientsDoneCloning.getFuture().thenRunOn(**executor).then([this]() {
- _transitionState(DonorStateEnum::kDonatingOplogEntries);
- });
+ return _allRecipientsDoneCloning.getFuture()
+ .thenRunOn(**executor)
+ .then([this]() { _transitionState(DonorStateEnum::kDonatingOplogEntries); })
+ .onCompletion([=](Status s) {
+ if (MONGO_unlikely(reshardingDonorFailsBeforePreparingToMirror.shouldFail())) {
+ uasserted(ErrorCodes::InternalError, "Failing for test");
+ }
+ });
}
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
@@ -459,13 +453,16 @@ void ReshardingDonorService::DonorStateMachine::_transitionState(
emplaceMinFetchTimestampIfExists(replacementDoc, minFetchTimestamp);
emplaceAbortReasonIfExists(replacementDoc, abortReason);
+ // For logging purposes.
+ auto oldState = _donorDoc.getState();
auto newState = replacementDoc.getState();
+
_updateDonorDocument(std::move(replacementDoc));
LOGV2_INFO(5279505,
"Transitioned resharding donor state",
"newState"_attr = DonorState_serializer(newState),
- "oldState"_attr = DonorState_serializer(_donorDoc.getState()),
+ "oldState"_attr = DonorState_serializer(oldState),
"ns"_attr = _donorDoc.getNss(),
"collectionUUID"_attr = _donorDoc.getExistingUUID(),
"reshardingUUID"_attr = _donorDoc.get_id());
@@ -542,4 +539,22 @@ void ReshardingDonorService::DonorStateMachine::_removeDonorDocument() {
_donorDoc = {};
}
+void ReshardingDonorService::DonorStateMachine::_onAbortOrStepdown(WithLock, Status status) {
+ if (!_allRecipientsDoneCloning.getFuture().isReady()) {
+ _allRecipientsDoneCloning.setError(status);
+ }
+
+ if (!_allRecipientsDoneApplying.getFuture().isReady()) {
+ _allRecipientsDoneApplying.setError(status);
+ }
+
+ if (!_finalOplogEntriesWritten.getFuture().isReady()) {
+ _finalOplogEntriesWritten.setError(status);
+ }
+
+ if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) {
+ _coordinatorHasDecisionPersisted.setError(status);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 34057c336fa..d0d4b8fc626 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -138,6 +138,10 @@ private:
// Removes the local donor document from disk and clears the in-memory state.
void _removeDonorDocument();
+ // Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors
+ // (abort resharding).
+ void _onAbortOrStepdown(WithLock lk, Status status);
+
// The in-memory representation of the underlying document in
// config.localReshardingOperations.donor.
ReshardingDonorDocument _donorDoc;
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp
index 05144408e12..d3b1959ef60 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp
@@ -49,7 +49,10 @@ std::shared_ptr<ReshardingCoordinatorObserver> getReshardingCoordinatorObserver(
auto service = registry->lookupServiceByName(kReshardingCoordinatorServiceName);
auto instance =
ReshardingCoordinatorService::ReshardingCoordinator::lookup(opCtx, service, reshardingId);
- invariant(instance);
+
+ iassert(
+ 5400001, "ReshardingCoordinatorService instance does not exist", instance.is_initialized());
+
return (*instance)->getObserver();
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 67c5267b149..ad2aac82205 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -234,7 +234,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
return _awaitAllDonorsMirroringThenTransitionToStrictConsistency(executor);
})
.then([this, executor] {
- return __awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(executor);
+ return _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(executor);
})
.then([this] { _renameTemporaryReshardingCollection(); })
.onError([this](Status status) {
@@ -245,6 +245,12 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
"error"_attr = status);
_transitionState(RecipientStateEnum::kError, boost::none, status);
_updateCoordinator();
+
+ // TODO SERVER-52838: Ensure all local collections that may have been created for
+ // resharding are removed, with the exception of the ReshardingRecipientDocument, before
+ // transitioning to kDone.
+ _transitionState(RecipientStateEnum::kDone, boost::none, status);
+ _updateCoordinator();
return status;
})
.onCompletion([this, self = shared_from_this()](Status status) {
@@ -255,13 +261,17 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
return;
}
- removeRecipientDocFailpoint.pauseWhileSet();
-
if (status.isOK()) {
// The shared_ptr stored in the PrimaryOnlyService's map for the
// ReshardingRecipientService Instance is removed when the recipient state document
// tied to the instance is deleted. It is necessary to use shared_from_this() to
// extend the lifetime so the code can safely finish executing.
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ removeRecipientDocFailpoint.pauseWhileSet(opCtx.get());
+ }
+
_removeRecipientDocument();
_metrics()->onCompletion(ReshardingMetrics::OperationStatus::kSucceeded);
_completionPromise.emplaceValue();
@@ -269,7 +279,6 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
_metrics()->onCompletion(ErrorCodes::isCancelationError(status)
? ReshardingMetrics::OperationStatus::kCanceled
: ReshardingMetrics::OperationStatus::kFailed);
- // Set error on all promises
_completionPromise.setError(status);
}
})
@@ -278,23 +287,8 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) {
// Resolve any unresolved promises to avoid hanging.
- stdx::lock_guard<Latch> lg(_mutex);
-
- if (_oplogFetcherExecutor) {
- _oplogFetcherExecutor->shutdown();
- }
-
- for (auto&& fetcher : _oplogFetchers) {
- fetcher->interrupt(status);
- }
-
- for (auto&& threadPool : _oplogApplierWorkers) {
- threadPool->shutdown();
- }
-
- if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) {
- _coordinatorHasDecisionPersisted.setError(status);
- }
+ stdx::lock_guard<Latch> lk(_mutex);
+ _onAbortOrStepdown(lk, status);
if (!_completionPromise.getFuture().isReady()) {
_completionPromise.setError(status);
@@ -314,17 +308,14 @@ boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::repo
void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges(
OperationContext* opCtx, const TypeCollectionReshardingFields& reshardingFields) {
- auto coordinatorState = reshardingFields.getState();
- if (coordinatorState == CoordinatorStateEnum::kError) {
- // TODO SERVER-52838: Investigate if we want to have a special error code so the recipient
- // knows when it has recieved the error from the coordinator rather than needing to report
- // an error to the coordinator.
- interrupt({ErrorCodes::InternalError,
- "ReshardingDonorService observed CoordinatorStateEnum::kError"});
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (reshardingFields.getAbortReason()) {
+ auto status = getStatusFromAbortReason(reshardingFields);
+ _onAbortOrStepdown(lk, status);
return;
}
- stdx::lock_guard<Latch> lk(_mutex);
+ auto coordinatorState = reshardingFields.getState();
if (coordinatorState >= CoordinatorStateEnum::kDecisionPersisted) {
ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted);
}
@@ -601,7 +592,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
}
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
- __awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(
+ _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_recipientDoc.getState() > RecipientStateEnum::kStrictConsistency) {
return ExecutorFuture<void>(**executor, Status::OK());
@@ -653,14 +644,17 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
emplaceFetchTimestampIfExists(replacementDoc, std::move(fetchTimestamp));
emplaceAbortReasonIfExists(replacementDoc, std::move(abortReason));
+ // For logging purposes.
+ auto oldState = _recipientDoc.getState();
auto newState = replacementDoc.getState();
+
_updateRecipientDocument(std::move(replacementDoc));
_metrics()->setRecipientState(endState);
LOGV2_INFO(5279506,
"Transitioned resharding recipient state",
"newState"_attr = RecipientState_serializer(newState),
- "oldState"_attr = RecipientState_serializer(_recipientDoc.getState()),
+ "oldState"_attr = RecipientState_serializer(oldState),
"ns"_attr = _recipientDoc.getNss(),
"collectionUUID"_attr = _recipientDoc.getExistingUUID(),
"reshardingUUID"_attr = _recipientDoc.get_id());
@@ -762,4 +756,23 @@ ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics()
return ReshardingMetrics::get(cc().getServiceContext());
}
+void ReshardingRecipientService::RecipientStateMachine::_onAbortOrStepdown(WithLock,
+ Status status) {
+ if (_oplogFetcherExecutor) {
+ _oplogFetcherExecutor->shutdown();
+ }
+
+ for (auto&& fetcher : _oplogFetchers) {
+ fetcher->interrupt(status);
+ }
+
+ for (auto&& threadPool : _oplogApplierWorkers) {
+ threadPool->shutdown();
+ }
+
+ if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) {
+ _coordinatorHasDecisionPersisted.setError(status);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 544272b2b27..43cf216f367 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -145,7 +145,7 @@ private:
ExecutorFuture<void> _awaitAllDonorsMirroringThenTransitionToStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
- ExecutorFuture<void> __awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(
+ ExecutorFuture<void> _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void _renameTemporaryReshardingCollection();
@@ -176,6 +176,10 @@ private:
ReshardingMetrics* _metrics() const;
+ // Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors
+ // (abort resharding).
+ void _onAbortOrStepdown(WithLock, Status status);
+
// The in-memory representation of the underlying document in
// config.localReshardingOperations.recipient.
ReshardingRecipientDocument _recipientDoc;
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 8aa5d563f76..61035669c3f 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -88,19 +88,23 @@ bool documentBelongsToMe(OperationContext* opCtx,
DonorShardEntry makeDonorShard(ShardId shardId,
DonorStateEnum donorState,
- boost::optional<Timestamp> minFetchTimestamp) {
+ boost::optional<Timestamp> minFetchTimestamp,
+ boost::optional<Status> abortReason) {
DonorShardEntry entry(shardId);
entry.setState(donorState);
emplaceMinFetchTimestampIfExists(entry, minFetchTimestamp);
+ emplaceAbortReasonIfExists(entry, abortReason);
return entry;
}
RecipientShardEntry makeRecipientShard(ShardId shardId,
RecipientStateEnum recipientState,
- boost::optional<Timestamp> strictConsistencyTimestamp) {
+ boost::optional<Timestamp> strictConsistencyTimestamp,
+ boost::optional<Status> abortReason) {
RecipientShardEntry entry(shardId);
entry.setState(recipientState);
emplaceStrictConsistencyTimestampIfExists(entry, strictConsistencyTimestamp);
+ emplaceAbortReasonIfExists(entry, abortReason);
return entry;
}
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index fec49c02a90..bbc36407ab1 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -144,11 +144,31 @@ void emplaceAbortReasonIfExists(ClassWithAbortReason& c, boost::optional<Status>
}
/**
+ * Extract the abortReason BSONObj into a status.
+ */
+template <class ClassWithAbortReason>
+Status getStatusFromAbortReason(ClassWithAbortReason& c) {
+ invariant(c.getAbortReason());
+ auto abortReasonObj = c.getAbortReason().get();
+ BSONElement codeElement = abortReasonObj["code"];
+ BSONElement errmsgElement = abortReasonObj["errmsg"];
+ int code = codeElement.numberInt();
+ std::string errmsg;
+ if (errmsgElement.type() == String) {
+ errmsg = errmsgElement.String();
+ } else if (!errmsgElement.eoo()) {
+ errmsg = errmsgElement.toString();
+ }
+ return Status(ErrorCodes::Error(code), errmsg, abortReasonObj);
+}
+
+/**
* Helper method to construct a DonorShardEntry with the fields specified.
*/
DonorShardEntry makeDonorShard(ShardId shardId,
DonorStateEnum donorState,
- boost::optional<Timestamp> minFetchTimestamp = boost::none);
+ boost::optional<Timestamp> minFetchTimestamp = boost::none,
+ boost::optional<Status> abortReason = boost::none);
/**
* Helper method to construct a RecipientShardEntry with the fields specified.
@@ -156,7 +176,8 @@ DonorShardEntry makeDonorShard(ShardId shardId,
RecipientShardEntry makeRecipientShard(
ShardId shardId,
RecipientStateEnum recipientState,
- boost::optional<Timestamp> strictConsistencyTimestamp = boost::none);
+ boost::optional<Timestamp> strictConsistencyTimestamp = boost::none,
+ boost::optional<Status> abortReason = boost::none);
/**
* Gets the UUID for 'nss' from the 'cm'