diff options
43 files changed, 732 insertions, 602 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js index 923cc84ef5b..80c4d278961 100644 --- a/jstests/sharding/libs/resharding_test_fixture.js +++ b/jstests/sharding/libs/resharding_test_fixture.js @@ -502,7 +502,7 @@ var ReshardingTest = class { /** @private */ _checkCoordinatorPostState(expectedErrorCode) { assert.eq([], - this._st.config.reshardingOperations.find({nss: this._ns}).toArray(), + this._st.config.reshardingOperations.find({ns: this._ns}).toArray(), "expected config.reshardingOperations to be empty, but found it wasn't"); assert.eq([], diff --git a/jstests/sharding/libs/resharding_test_util.js b/jstests/sharding/libs/resharding_test_util.js index 265613373e8..7d9953b332f 100644 --- a/jstests/sharding/libs/resharding_test_util.js +++ b/jstests/sharding/libs/resharding_test_util.js @@ -7,8 +7,9 @@ var ReshardingTestUtil = (function() { * 'abortReason.code'. */ const shardDoneAbortingWithCode = function(shardEntry, errorCode) { - return shardEntry["abortReason"] && shardEntry["abortReason"]["code"] && - shardEntry["abortReason"]["code"] === errorCode && shardEntry["state"] === "done"; + return shardEntry.mutableState.abortReason && + shardEntry.mutableState.abortReason.code === errorCode && + shardEntry.mutableState.state === "done"; }; /** @@ -20,12 +21,12 @@ var ReshardingTestUtil = (function() { * config.reshardingOperations.recipientShards[shardName] for recipients and * config.reshardingOperations.donorShards[shardName] for donors. */ - const assertAllParticipantsReportAbortToCoordinator = function(configsvr, nss, errCode) { + const assertAllParticipantsReportAbortToCoordinator = function(configsvr, ns, errCode) { const reshardingOperationsCollection = configsvr.getCollection("config.reshardingOperations"); assert.soon( () => { - const coordinatorDoc = reshardingOperationsCollection.findOne({nss}); + const coordinatorDoc = reshardingOperationsCollection.findOne({ns}); assert(coordinatorDoc); // Iterate over both the recipientShards and donorShards and check that every shard // entry is in state 'done' and contains an abortReason with the errCode. @@ -52,28 +53,31 @@ var ReshardingTestUtil = (function() { * assertDonorAbortsLocally instead. */ const assertParticipantAbortsLocally = function( - shardConn, shardName, nss, abortReason, participantType) { + shardConn, shardName, ns, abortReason, participantType) { const localOpsCollection = shardConn.getCollection(`config.localReshardingOperations.${participantType}`); assert.soon( () => { - return localOpsCollection.findOne( - {nss, state: "done", "abortReason.code": abortReason}) !== null; + return localOpsCollection.findOne({ + ns, + "mutableState.state": "done", + "mutableState.abortReason.code": abortReason, + }) !== null; }, () => { return participantType + " shard " + shardName + " never transitioned to an done state with abortReason " + abortReason + ": " + - tojson(localDonorOpsCollection.findOne()); + tojson(localOpsCollection.findOne()); }); }; - const assertRecipientAbortsLocally = function(shardConn, shardName, nss, abortReason) { - return assertParticipantAbortsLocally(shardConn, shardName, nss, abortReason, "recipient"); + const assertRecipientAbortsLocally = function(shardConn, shardName, ns, abortReason) { + return assertParticipantAbortsLocally(shardConn, shardName, ns, abortReason, "recipient"); }; - const assertDonorAbortsLocally = function(shardConn, shardName, nss, abortReason) { - return assertParticipantAbortsLocally(shardConn, shardName, nss, abortReason, "donor"); + const assertDonorAbortsLocally = function(shardConn, shardName, ns, abortReason) { + return assertParticipantAbortsLocally(shardConn, shardName, ns, abortReason, "donor"); }; return { diff --git a/jstests/sharding/reshard_collection_basic.js b/jstests/sharding/reshard_collection_basic.js index 02668400626..39974eaddc9 100644 --- a/jstests/sharding/reshard_collection_basic.js +++ b/jstests/sharding/reshard_collection_basic.js @@ -89,27 +89,23 @@ let verifyTemporaryReshardingCollectionExistsWithCorrectOptions = (expectedRecip let verifyAllShardingCollectionsRemoved = (tempReshardingCollName) => { assert.eq(0, mongos.getDB(kDbName)[tempReshardingCollName].find().itcount()); - assert.eq(0, mongosConfig.reshardingOperations.find({nss: ns}).itcount()); + assert.eq(0, mongosConfig.reshardingOperations.find({ns}).itcount()); assert.eq(0, mongosConfig.collections.find({reshardingFields: {$exists: true}}).itcount()); + assert.eq( + 0, + st.rs0.getPrimary().getDB('config').localReshardingOperations.donor.find({ns}).itcount()); assert.eq(0, st.rs0.getPrimary() .getDB('config') - .localReshardingOperations.donor.find({nss: ns}) - .itcount()); - assert.eq(0, - st.rs0.getPrimary() - .getDB('config') - .localReshardingOperations.recipient.find({nss: ns}) - .itcount()); - assert.eq(0, - st.rs1.getPrimary() - .getDB('config') - .localReshardingOperations.donor.find({nss: ns}) + .localReshardingOperations.recipient.find({ns}) .itcount()); + assert.eq( + 0, + st.rs1.getPrimary().getDB('config').localReshardingOperations.donor.find({ns}).itcount()); assert.eq(0, st.rs1.getPrimary() .getDB('config') - .localReshardingOperations.recipient.find({nss: ns}) + .localReshardingOperations.recipient.find({ns}) .itcount()); }; diff --git a/jstests/sharding/resharding_array_shard_key.js b/jstests/sharding/resharding_array_shard_key.js index a2abe1b45ab..3ce4647be97 100644 --- a/jstests/sharding/resharding_array_shard_key.js +++ b/jstests/sharding/resharding_array_shard_key.js @@ -31,7 +31,7 @@ function awaitEstablishmentOfFetchTimestamp(inputCollection) { const mongos = inputCollection.getMongo(); assert.soon(() => { const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - nss: inputCollection.getFullName() + ns: inputCollection.getFullName() }); return coordinatorDoc !== null && coordinatorDoc.fetchTimestamp !== undefined; }); diff --git a/jstests/sharding/resharding_fails_on_nonempty_stash.js b/jstests/sharding/resharding_fails_on_nonempty_stash.js index fdfe2f65baf..670f25adf06 100644 --- a/jstests/sharding/resharding_fails_on_nonempty_stash.js +++ b/jstests/sharding/resharding_fails_on_nonempty_stash.js @@ -51,7 +51,7 @@ reshardingTest.withReshardingInBackground( const mongos = inputCollection.getMongo(); assert.soon(() => { const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - nss: inputCollection.getFullName() + ns: inputCollection.getFullName() }); return coordinatorDoc !== null && coordinatorDoc.fetchTimestamp !== undefined; }); diff --git a/jstests/sharding/resharding_generate_oplog_entries.js b/jstests/sharding/resharding_generate_oplog_entries.js index 4d926644c77..511669c8a55 100644 --- a/jstests/sharding/resharding_generate_oplog_entries.js +++ b/jstests/sharding/resharding_generate_oplog_entries.js @@ -44,7 +44,7 @@ function simulateResharding() { let donorReshardingFields = { "uuid": uuid, "state": "preparing-to-donate", - "donorFields": {"reshardingKey": {y: 1}} + "donorFields": {"tempNs": tempReshardingNss, "reshardingKey": {y: 1}} }; assert.commandWorked(st.configRS.getPrimary().getDB("config").collections.update( {_id: ns}, {"$set": {"reshardingFields": donorReshardingFields}})); diff --git a/jstests/sharding/resharding_metrics.js b/jstests/sharding/resharding_metrics.js index a6ba754a6db..939b04cb500 100644 --- a/jstests/sharding/resharding_metrics.js +++ b/jstests/sharding/resharding_metrics.js @@ -88,7 +88,7 @@ function verifyCurrentOpOutput(reshardingTest, inputCollection) { const mongos = inputCollection.getMongo(); assert.soon(() => { const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - nss: inputCollection.getFullName() + ns: inputCollection.getFullName() }); return coordinatorDoc !== null && coordinatorDoc.fetchTimestamp !== undefined; }); diff --git a/jstests/sharding/resharding_metrics_increment.js b/jstests/sharding/resharding_metrics_increment.js index 0c99089dbf0..336b78cd108 100644 --- a/jstests/sharding/resharding_metrics_increment.js +++ b/jstests/sharding/resharding_metrics_increment.js @@ -60,7 +60,7 @@ function awaitEstablishmentOfFetchTimestamp(inputCollection) { const mongos = inputCollection.getMongo(); assert.soon(() => { const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - nss: inputCollection.getFullName() + ns: inputCollection.getFullName() }); return coordinatorDoc !== null && coordinatorDoc.fetchTimestamp !== undefined; }); diff --git a/jstests/sharding/resharding_min_fetch_ts_with_txn.js b/jstests/sharding/resharding_min_fetch_ts_with_txn.js index cedbb99638a..102643f02c5 100644 --- a/jstests/sharding/resharding_min_fetch_ts_with_txn.js +++ b/jstests/sharding/resharding_min_fetch_ts_with_txn.js @@ -57,7 +57,7 @@ reshardingTest.withReshardingInBackground( }); let coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - nss: sourceCollection.getFullName() + ns: sourceCollection.getFullName() }); assert.neq(null, coordinatorDoc); @@ -68,7 +68,7 @@ reshardingTest.withReshardingInBackground( assert.soon(() => { coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - nss: sourceCollection.getFullName() + ns: sourceCollection.getFullName() }); return coordinatorDoc !== null && coordinatorDoc.fetchTimestamp !== undefined; diff --git a/jstests/sharding/resharding_replicate_updates_as_insert_delete.js b/jstests/sharding/resharding_replicate_updates_as_insert_delete.js index ab0944e05d2..8bfbff8b259 100644 --- a/jstests/sharding/resharding_replicate_updates_as_insert_delete.js +++ b/jstests/sharding/resharding_replicate_updates_as_insert_delete.js @@ -55,7 +55,7 @@ reshardingTest.withReshardingInBackground( // // assert.soon(() => tempColl.findOne(docToUpdate) !== null); assert.soon(() => { const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - nss: testColl.getFullName() + ns: testColl.getFullName() }); return coordinatorDoc !== null && coordinatorDoc.state === "applying"; diff --git a/jstests/sharding/resharding_size_estimate.js b/jstests/sharding/resharding_size_estimate.js index 75a112b7c6a..27988d2257c 100644 --- a/jstests/sharding/resharding_size_estimate.js +++ b/jstests/sharding/resharding_size_estimate.js @@ -78,7 +78,8 @@ reshardingTest.withReshardingInBackground( let donorShards = doc.donorShards; for (let i = 0; i < donorShards.length; i++) { if (donorShards[i].id === shardName) { - return donorShards[i].cloneSizeInfo; + const {bytesToClone, documentsToClone} = donorShards[i].mutableState; + return {bytesToClone, documentsToClone}; } } assert(false, 'could not find ' + shardName + ' in donorShards.'); @@ -87,7 +88,7 @@ reshardingTest.withReshardingInBackground( let coordinatorDoc = {}; assert.soon(() => { coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - nss: inputCollection.getFullName() + ns: inputCollection.getFullName() }); return coordinatorDoc !== null && coordinatorDoc.fetchTimestamp !== undefined; }); @@ -108,22 +109,24 @@ reshardingTest.withReshardingInBackground( assert.eq(s1Estimate.documentsToClone, numDocumentsPerShard); const verifyApproximateCopySizeForRecipients = (doc, s0Estimate, s1Estimate) => { - const approxCopySize = doc.approxCopySize; - assert(approxCopySize !== undefined, - "Unable to find 'approxCopySize' in the coordinator document"); + const {approxBytesToCopy, approxDocumentsToCopy} = doc; + assert(approxBytesToCopy !== undefined, + "Unable to find 'approxBytesToCopy' in the coordinator document"); + assert(approxDocumentsToCopy !== undefined, + "Unable to find 'approxDocumentsToCopy' in the coordinator document"); const numRecipients = doc.recipientShards.length; assert.neq(numRecipients, 0, "Unexpected number of recipients"); const expectedApproxDocumentsToCopy = (s0Estimate.documentsToClone + s1Estimate.documentsToClone) / numRecipients; - assert.eq(approxCopySize.approxDocumentsToCopy, + assert.eq(approxDocumentsToCopy, expectedApproxDocumentsToCopy, "Unexpected value for 'approxDocumentsToCopy' in the coordinator document"); const expectedApproxBytesToCopy = (s0Estimate.bytesToClone + s1Estimate.bytesToClone) / numRecipients; - assert.eq(approxCopySize.approxBytesToCopy, + assert.eq(approxBytesToCopy, expectedApproxBytesToCopy, "Unexpected value for 'approxBytesToCopy' in the coordinator document"); }; diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp index 1590131e02a..86744c1594f 100644 --- a/src/mongo/db/s/collection_metadata.cpp +++ b/src/mongo/db/s/collection_metadata.cpp @@ -134,8 +134,8 @@ bool CollectionMetadata::disallowWritesForResharding(const UUID& currentCollecti "decision persisted", recipientFields); - const auto& originalUUID = recipientFields->getExistingUUID(); - const auto& reshardingUUID = reshardingFields->getUuid(); + const auto& originalUUID = recipientFields->getSourceUUID(); + const auto& reshardingUUID = reshardingFields->getReshardingUUID(); if (currentCollectionUUID == originalUUID) { // This shard must be both a donor and recipient. Neither the drop or renameCollection have diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index 4152bf1241b..d3b585f9d90 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -32,6 +32,7 @@ #include "mongo/base/status.h" #include "mongo/db/range_arithmetic.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/resharding_util.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/sharding_test_fixture_common.h" @@ -120,7 +121,9 @@ protected: {kThisShard, kOtherShard}, existingUuid, kNss}; reshardingFields.setRecipientFields(std::move(recipientFields)); } else if (state == CoordinatorStateEnum::kBlockingWrites) { - TypeCollectionDonorFields donorFields{KeyPattern{BSON("newKey" << 1)}}; + TypeCollectionDonorFields donorFields{ + constructTemporaryReshardingNss(kNss.db(), existingUuid), + KeyPattern{BSON("newKey" << 1)}}; reshardingFields.setDonorFields(std::move(donorFields)); } diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index ea757e7666f..49b57f258b4 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -119,16 +119,18 @@ public: nss.db(), getCollectionUUIDFromChunkManger(nss, cm)); auto coordinatorDoc = - ReshardingCoordinatorDocument(std::move(tempReshardingNss), - std::move(CoordinatorStateEnum::kUnused), + ReshardingCoordinatorDocument(std::move(CoordinatorStateEnum::kUnused), {}, // donorShards {}); // recipientShards // Generate the resharding metadata for the ReshardingCoordinatorDocument. auto reshardingUUID = UUID::gen(); auto existingUUID = getCollectionUUIDFromChunkManger(ns(), cm); - auto commonMetadata = CommonReshardingMetadata( - std::move(reshardingUUID), ns(), std::move(existingUUID), request().getKey()); + auto commonMetadata = CommonReshardingMetadata(std::move(reshardingUUID), + ns(), + std::move(existingUUID), + std::move(tempReshardingNss), + request().getKey()); coordinatorDoc.setCommonReshardingMetadata(std::move(commonMetadata)); coordinatorDoc.setZones(request().getZones()); coordinatorDoc.setPresetReshardedChunks(request().get_presetReshardedChunks()); diff --git a/src/mongo/db/s/resharding/coordinator_document.idl b/src/mongo/db/s/resharding/coordinator_document.idl index b3110f0a391..f52169f9aea 100644 --- a/src/mongo/db/s/resharding/coordinator_document.idl +++ b/src/mongo/db/s/resharding/coordinator_document.idl @@ -39,35 +39,30 @@ structs: DonorShardEntry: description: "Represents a donor shard for a particular resharding operation on the coordinator." - inline_chained_structs: true - chained_structs: - MinFetchTimestamp: MinFetchTimestampStruct - AbortReason: AbortReasonStruct generate_comparison_operators: false - strict: true + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: id: shard_id - state: - type: DonorState - optional: true - cloneSizeInfo: - type: ReshardingCloneSize - optional: true + # We intentionally have the mutable state nested in a subobject to make it easy to + # overwrite with a single $set. + mutableState: DonorShardContext RecipientShardEntry: description: "Represents a recipient shard for a particular resharding operation on the coordinator." - inline_chained_structs: true - chained_structs: - StrictConsistencyTimestamp: StrictConsistencyTimestampStruct - AbortReason: AbortReasonStruct generate_comparison_operators: false - strict: true + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: id: shard_id - state: - type: RecipientState - optional: true + # We intentionally have the mutable state nested in a subobject to make it easy to + # overwrite with a single $set. + mutableState: RecipientShardContext ReshardingCoordinatorDocument: description: "Represents a resharding operation on the coordinator." @@ -75,14 +70,14 @@ structs: chained_structs: CommonReshardingMetadata: CommonReshardingMetadata FetchTimestamp: FetchTimestampStruct + ReshardingApproxCopySize: ReshardingApproxCopySizeStruct AbortReason: AbortReasonStruct generate_comparison_operators: false - strict: true + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: - tempReshardingNss: - type: namespacestring - description: "The namespace of the temporary resharding collection that exists on - recipient shards." state: CoordinatorState donorShards: type: array<DonorShardEntry> @@ -94,10 +89,6 @@ structs: type: array<object_owned> description: "The zones associated with the new shard key." optional: true - approxCopySize: - type: ReshardingApproxCopySize - description: "The approximate data that each recipient clones for this operation." - optional: true presetReshardedChunks: type: array<object_owned> description: >- diff --git a/src/mongo/db/s/resharding/donor_document.idl b/src/mongo/db/s/resharding/donor_document.idl index edb2269a2b8..c282f93874c 100644 --- a/src/mongo/db/s/resharding/donor_document.idl +++ b/src/mongo/db/s/resharding/donor_document.idl @@ -26,8 +26,8 @@ # it in the license file. # -# This file defines the format of documents stored in config.localReshardingOperations on the donor -# shard for a resharding operation. +# This file defines the format of documents stored in config.localReshardingOperations.donor on the +# donor shard for a resharding operation. global: cpp_namespace: "mongo" @@ -41,9 +41,12 @@ structs: inline_chained_structs: true chained_structs: CommonReshardingMetadata: CommonReshardingMetadata - MinFetchTimestamp: MinFetchTimestampStruct - AbortReason: AbortReasonStruct generate_comparison_operators: false - strict: true + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: - state: DonorState + # We intentionally have the mutable state nested in a subobject to make it easy to + # overwrite with a single $set. + mutableState: DonorShardContext diff --git a/src/mongo/db/s/resharding/donor_oplog_id.idl b/src/mongo/db/s/resharding/donor_oplog_id.idl index 6ab3fbc2913..0c4fdeecc7f 100644 --- a/src/mongo/db/s/resharding/donor_oplog_id.idl +++ b/src/mongo/db/s/resharding/donor_oplog_id.idl @@ -39,6 +39,9 @@ structs: description: >- Represents the set of timestamps that belong to an operation from the donor shard. generate_comparison_operators: true + # Use strict:true because this type is used as the structure for the _id value in documents + # and requires being an exact match. + strict: true fields: clusterTime: type: timestamp diff --git a/src/mongo/db/s/resharding/recipient_document.idl b/src/mongo/db/s/resharding/recipient_document.idl index f9d98bb207e..4c56423307e 100644 --- a/src/mongo/db/s/resharding/recipient_document.idl +++ b/src/mongo/db/s/resharding/recipient_document.idl @@ -26,8 +26,8 @@ # it in the license file. # -# This file defines the format of documents stored in config.localReshardingOperations on the -# recipient shard for a resharding operation. +# This file defines the format of documents stored in config.localReshardingOperations.recipient on +# the recipient shard for a resharding operation. global: cpp_namespace: "mongo" @@ -42,12 +42,15 @@ structs: chained_structs: CommonReshardingMetadata: CommonReshardingMetadata FetchTimestamp: FetchTimestampStruct - StrictConsistencyTimestamp: StrictConsistencyTimestampStruct - AbortReason: AbortReasonStruct generate_comparison_operators: false - strict: true + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: - state: RecipientState + # We intentionally have the mutable state nested in a subobject to make it easy to + # overwrite with a single $set. + mutableState: RecipientShardContext donorShards: type: array<shard_id> description: "The list of donor shards that report to this recipient." diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp index 7329a5cea7d..7dbc37576ba 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp @@ -67,7 +67,7 @@ bool allParticipantsInStateGTE(WithLock lk, TState expectedState, const std::vector<TParticipant>& participants) { for (const auto& shard : participants) { - if (shard.getState() < expectedState) { + if (shard.getMutableState().getState() < expectedState) { return false; } } @@ -103,7 +103,7 @@ bool stateTransistionsComplete(WithLock lk, template <class TParticipant> Status getStatusFromAbortReasonWithShardInfo(const TParticipant& participant, StringData participantType) { - return getStatusFromAbortReason(participant) + return getStatusFromAbortReason(participant.getMutableState()) .withContext("{} shard {} reached an unrecoverable error"_format( participantType, participant.getId().toString())); } @@ -123,13 +123,13 @@ boost::optional<Status> getAbortReasonIfExists( } for (const auto& donorShard : updatedStateDoc.getDonorShards()) { - if (donorShard.getState() == DonorStateEnum::kError) { + if (donorShard.getMutableState().getState() == DonorStateEnum::kError) { return getStatusFromAbortReasonWithShardInfo(donorShard, "Donor"_sd); } } for (const auto& recipientShard : updatedStateDoc.getRecipientShards()) { - if (recipientShard.getState() == RecipientStateEnum::kError) { + if (recipientShard.getMutableState().getState() == RecipientStateEnum::kError) { return getStatusFromAbortReasonWithShardInfo(recipientShard, "Recipient"_sd); } } @@ -142,7 +142,8 @@ bool allParticipantsDoneWithAbortReason(WithLock lk, TState expectedState, const std::vector<TParticipant>& participants) { for (const auto& shard : participants) { - if (!(shard.getState() == expectedState && shard.getAbortReason().is_initialized())) { + if (!(shard.getMutableState().getState() == expectedState && + shard.getMutableState().getAbortReason().is_initialized())) { return false; } } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp index 3f6c3c19272..ffb44e7abdd 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp @@ -68,9 +68,9 @@ protected: RecipientStateEnum recipientState, boost::optional<Timestamp> timestamp = boost::none, boost::optional<Status> abortReason = boost::none) { - return {makeRecipientShard(ShardId{"s1"}, recipientState, timestamp, abortReason), - makeRecipientShard(ShardId{"s2"}, recipientState, timestamp, abortReason), - makeRecipientShard(ShardId{"s3"}, recipientState, timestamp, abortReason)}; + return {makeRecipientShard(ShardId{"s1"}, recipientState, abortReason), + makeRecipientShard(ShardId{"s2"}, recipientState, abortReason), + makeRecipientShard(ShardId{"s3"}, recipientState, abortReason)}; } }; @@ -144,7 +144,6 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) { {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)}, {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kError, - boost::none, // timestamp Status{ErrorCodes::InternalError, "We gotta abort"})}, {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}}; auto coordinatorDoc = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards); @@ -169,18 +168,9 @@ TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) { // All participants have an abortReason, but not all are in state kDone yet. auto donorShards = makeMockDonorsInState(DonorStateEnum::kDone, Timestamp(1, 1), abortReason); std::vector<RecipientShardEntry> recipientShards0{ - {makeRecipientShard(ShardId{"s1"}, - RecipientStateEnum::kError, - boost::none, // timestamp - abortReason)}, - {makeRecipientShard(ShardId{"s2"}, - RecipientStateEnum::kDone, - boost::none, // timestamp - abortReason)}, - {makeRecipientShard(ShardId{"s3"}, - RecipientStateEnum::kDone, - boost::none, // timestamp - abortReason)}}; + {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kError, abortReason)}, + {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kDone, abortReason)}, + {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kDone, abortReason)}}; auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards, abortReason); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 8c3ec48a49f..e20b8f447ec 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -123,8 +123,8 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, // Remove the coordinator document. return BatchedCommandRequest::buildDeleteOp( NamespaceString::kConfigReshardingOperationsNamespace, - BSON("_id" << coordinatorDoc.get_id()), // query - false // multi + BSON("_id" << coordinatorDoc.getReshardingUUID()), // query + false // multi ); default: { // Partially update the coordinator document. @@ -148,10 +148,18 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, *abortReason); } - if (auto approxCopySize = coordinatorDoc.getApproxCopySize()) { - // If the approxCopySize exists, include it in the update. - setBuilder.append(ReshardingCoordinatorDocument::kApproxCopySizeFieldName, - approxCopySize->toBSON()); + if (auto approxBytesToCopy = coordinatorDoc.getApproxBytesToCopy()) { + // If the approxBytesToCopy exists, include it in the update. + setBuilder.append( + ReshardingCoordinatorDocument::kApproxBytesToCopyFieldName, + *approxBytesToCopy); + } + + if (auto approxDocumentsToCopy = coordinatorDoc.getApproxDocumentsToCopy()) { + // If the approxDocumentsToCopy exists, include it in the update. + setBuilder.append( + ReshardingCoordinatorDocument::kApproxDocumentsToCopyFieldName, + *approxDocumentsToCopy); } if (nextState == CoordinatorStateEnum::kPreparingToDonate) { @@ -163,7 +171,7 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, return BatchedCommandRequest::buildUpdateOp( NamespaceString::kConfigReshardingOperationsNamespace, - BSON("_id" << coordinatorDoc.get_id()), + BSON("_id" << coordinatorDoc.getReshardingUUID()), updateBuilder.obj(), false, // upsert false // multi @@ -204,7 +212,7 @@ TypeCollectionRecipientFields constructRecipientFields( const ReshardingCoordinatorDocument& coordinatorDoc) { auto donorShardIds = extractShardIds(coordinatorDoc.getDonorShards()); TypeCollectionRecipientFields recipientFields( - std::move(donorShardIds), coordinatorDoc.getExistingUUID(), coordinatorDoc.getNss()); + std::move(donorShardIds), coordinatorDoc.getSourceUUID(), coordinatorDoc.getSourceNss()); emplaceFetchTimestampIfExists(recipientFields, coordinatorDoc.getFetchTimestamp()); return recipientFields; } @@ -218,9 +226,11 @@ BSONObj createReshardingFieldsUpdateForOriginalNss( switch (nextState) { case CoordinatorStateEnum::kInitializing: { // Append 'reshardingFields' to the config.collections entry for the original nss - TypeCollectionReshardingFields originalEntryReshardingFields(coordinatorDoc.get_id()); + TypeCollectionReshardingFields originalEntryReshardingFields( + coordinatorDoc.getReshardingUUID()); originalEntryReshardingFields.setState(coordinatorDoc.getState()); - TypeCollectionDonorFields donorField(coordinatorDoc.getReshardingKey()); + TypeCollectionDonorFields donorField(coordinatorDoc.getTempReshardingNss(), + coordinatorDoc.getReshardingKey()); originalEntryReshardingFields.setDonorFields(donorField); return BSON("$set" << BSON(CollectionType::kReshardingFieldsFieldName @@ -238,7 +248,7 @@ BSONObj createReshardingFieldsUpdateForOriginalNss( // 'state' field and add the 'recipientFields' to the 'reshardingFields' section. auto recipientFields = constructRecipientFields(coordinatorDoc); BSONObj setFields = - BSON("uuid" << coordinatorDoc.get_id() << "key" + BSON("uuid" << coordinatorDoc.getReshardingUUID() << "key" << coordinatorDoc.getReshardingKey().toBSON() << "lastmodEpoch" << newCollectionEpoch.get() << "lastmod" << opCtx->getServiceContext()->getPreciseClockSource()->now() @@ -292,7 +302,7 @@ void updateConfigCollectionsForOriginalNss(OperationContext* opCtx, auto request = BatchedCommandRequest::buildUpdateOp( CollectionType::ConfigNS, - BSON(CollectionType::kNssFieldName << coordinatorDoc.getNss().ns()), // query + BSON(CollectionType::kNssFieldName << coordinatorDoc.getSourceNss().ns()), // query writeOp, false, // upsert false // multi @@ -410,9 +420,9 @@ void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx, // exist, so cannot pass a value for expectedNumModified const auto chunksQuery = [&]() { if (newCollectionTimestamp) { - return BSON(ChunkType::collectionUUID() << coordinatorDoc.getExistingUUID()); + return BSON(ChunkType::collectionUUID() << coordinatorDoc.getSourceUUID()); } else { - return BSON(ChunkType::ns(coordinatorDoc.getNss().ns())); + return BSON(ChunkType::ns(coordinatorDoc.getSourceNss().ns())); } }(); ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( @@ -431,8 +441,8 @@ void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx, TagsType::ConfigNS, BatchedCommandRequest::buildDeleteOp( TagsType::ConfigNS, - BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query - true // multi + BSON(ChunkType::ns(coordinatorDoc.getSourceNss().ns())), // query + true // multi ), txnNumber); } @@ -447,7 +457,7 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, // newCollectionEpoch. const auto chunksQuery = [&]() { if (newCollectionTimestamp) { - return BSON(ChunkType::collectionUUID() << coordinatorDoc.get_id()); + return BSON(ChunkType::collectionUUID() << coordinatorDoc.getReshardingUUID()); } else { return BSON(ChunkType::ns(coordinatorDoc.getTempReshardingNss().ns())); } @@ -456,7 +466,7 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, if (newCollectionTimestamp) { return BSON("$set" << BSON("lastmodEpoch" << newCollectionEpoch)); } else { - return BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns() << "lastmodEpoch" + return BSON("$set" << BSON("ns" << coordinatorDoc.getSourceNss().ns() << "lastmodEpoch" << newCollectionEpoch)); } }(); @@ -472,10 +482,10 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, auto tagsRequest = BatchedCommandRequest::buildUpdateOp( TagsType::ConfigNS, - BSON(TagsType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query - BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns())), // update - false, // upsert - true // multi + BSON(TagsType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query + BSON("$set" << BSON("ns" << coordinatorDoc.getSourceNss().ns())), // update + false, // upsert + true // multi ); // Update the 'ns' field to be the original collection namespace for all tags documents that @@ -577,7 +587,7 @@ void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( // metadata. ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( opCtx, - updatedCoordinatorDoc.getNss(), + updatedCoordinatorDoc.getSourceNss(), extractShardIds(updatedCoordinatorDoc.getDonorShards()), std::move(changeMetadataFunc)); } else if (participantsToNotify == ParticipantsToNotifyEnum::kRecipients) { @@ -596,7 +606,7 @@ void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( // shards would not apply. ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( opCtx, - updatedCoordinatorDoc.getNss(), + updatedCoordinatorDoc.getSourceNss(), extractShardIds(updatedCoordinatorDoc.getRecipientShards()), std::move(changeMetadataFunc)); } @@ -612,13 +622,13 @@ CollectionType createTempReshardingCollectionType( CollectionType collType(coordinatorDoc.getTempReshardingNss(), chunkVersion.epoch(), opCtx->getServiceContext()->getPreciseClockSource()->now(), - coordinatorDoc.get_id()); + coordinatorDoc.getReshardingUUID()); collType.setKeyPattern(coordinatorDoc.getReshardingKey()); collType.setDefaultCollation(collation); collType.setUnique(false); collType.setTimestamp(chunkVersion.getTimestamp()); - TypeCollectionReshardingFields tempEntryReshardingFields(coordinatorDoc.get_id()); + TypeCollectionReshardingFields tempEntryReshardingFields(coordinatorDoc.getReshardingUUID()); tempEntryReshardingFields.setState(coordinatorDoc.getState()); auto recipientFields = constructRecipientFields(coordinatorDoc); @@ -631,7 +641,7 @@ CollectionType createTempReshardingCollectionType( void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) { auto originalCollType = Grid::get(opCtx)->catalogClient()->getCollection( - opCtx, coordinatorDoc.getNss(), repl::ReadConcernLevel::kMajorityReadConcern); + opCtx, coordinatorDoc.getSourceNss(), repl::ReadConcernLevel::kMajorityReadConcern); const auto collation = originalCollType.getDefaultCollation(); executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) { @@ -646,7 +656,7 @@ void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx, str::stream() << "Only one resharding operation is allowed to be active at a " "time, aborting resharding op for " - << coordinatorDoc.getNss()); + << coordinatorDoc.getSourceNss()); } throw; @@ -676,9 +686,9 @@ std::vector<DonorShardEntry> constructDonorShardEntries(const std::set<ShardId>& donorShardIds.end(), std::back_inserter(donorShards), [](const ShardId& shardId) -> DonorShardEntry { - DonorShardEntry entry{shardId}; - entry.setState(DonorStateEnum::kUnused); - return entry; + DonorShardContext donorCtx; + donorCtx.setState(DonorStateEnum::kUnused); + return DonorShardEntry{shardId, std::move(donorCtx)}; }); return donorShards; } @@ -690,9 +700,9 @@ std::vector<RecipientShardEntry> constructRecipientShardEntries( recipientShardIds.end(), std::back_inserter(recipientShards), [](const ShardId& shardId) -> RecipientShardEntry { - RecipientShardEntry entry{shardId}; - entry.setState(RecipientStateEnum::kUnused); - return entry; + RecipientShardContext recipientCtx; + recipientCtx.setState(RecipientStateEnum::kUnused); + return RecipientShardEntry{shardId, std::move(recipientCtx)}; }); return recipientShards; } @@ -701,7 +711,7 @@ ParticipantShardsAndChunks calculateParticipantShardsAndChunks( OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) { const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( - opCtx, coordinatorDoc.getNss())); + opCtx, coordinatorDoc.getSourceNss())); std::set<ShardId> donorShardIds; cm.getAllShardIds(&donorShardIds); @@ -722,7 +732,7 @@ ParticipantShardsAndChunks calculateParticipantShardsAndChunks( ReshardedChunk::parse(IDLParserErrorContext("ReshardedChunk"), obj); if (version.getTimestamp()) { initialChunks.emplace_back( - coordinatorDoc.get_id(), + coordinatorDoc.getReshardingUUID(), ChunkRange{reshardedChunk.getMin(), reshardedChunk.getMax()}, version, reshardedChunk.getRecipientShardId()); @@ -743,7 +753,7 @@ ParticipantShardsAndChunks calculateParticipantShardsAndChunks( cm.forEachChunk([&](const auto& chunk) { // TODO SERVER-49526 Change the range to refer to the new shard key pattern. if (version.getTimestamp()) { - initialChunks.emplace_back(coordinatorDoc.get_id(), + initialChunks.emplace_back(coordinatorDoc.getReshardingUUID(), ChunkRange{chunk.getMin(), chunk.getMax()}, version, chunk.getShardId()); @@ -910,15 +920,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::~ReshardingCoordinator() { void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc( const ReshardingCoordinatorDocument& doc) { - invariant(doc.get_id() == _coordinatorDoc.get_id()); + invariant(doc.getReshardingUUID() == _coordinatorDoc.getReshardingUUID()); LOGV2_INFO(5343001, "Transitioned resharding coordinator state", "newState"_attr = CoordinatorState_serializer(doc.getState()), "oldState"_attr = CoordinatorState_serializer(_coordinatorDoc.getState()), - "ns"_attr = doc.getNss(), - "collectionUUID"_attr = doc.getCommonReshardingMetadata().getExistingUUID(), - "reshardingUUID"_attr = doc.get_id()); + "namespace"_attr = doc.getSourceNss(), + "collectionUUID"_attr = doc.getSourceUUID(), + "reshardingUUID"_attr = doc.getReshardingUUID()); _coordinatorDoc = doc; } @@ -997,7 +1007,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( }) .then([this, executor] { _tellAllParticipantsToRefresh( - createFinishReshardCollectionCommand(_coordinatorDoc.getNss()), executor); + createFinishReshardCollectionCommand(_coordinatorDoc.getSourceNss()), executor); }) .then([this, self = shared_from_this(), executor] { // The shared_ptr maintaining the ReshardingCoordinatorService Instance object gets @@ -1014,7 +1024,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( } } - auto nss = _coordinatorDoc.getNss(); + auto nss = _coordinatorDoc.getSourceNss(); LOGV2(4956902, "Resharding failed", @@ -1082,8 +1092,8 @@ boost::optional<BSONObj> ReshardingCoordinatorService::ReshardingCoordinator::re MongoProcessInterface::CurrentOpSessionsMode) noexcept { ReshardingMetrics::ReporterOptions options( ReshardingMetrics::ReporterOptions::Role::kCoordinator, - _coordinatorDoc.get_id(), - _coordinatorDoc.getNss(), + _coordinatorDoc.getReshardingUUID(), + _coordinatorDoc.getSourceNss(), _coordinatorDoc.getReshardingKey().toBSON(), false); return ReshardingMetrics::get(cc().getServiceContext())->reportForCurrentOp(options); @@ -1149,12 +1159,23 @@ void emplaceApproxBytesToCopyIfExists(ReshardingCoordinatorDocument& coordinator return; } - if (auto alreadyExistingApproxCopySize = coordinatorDoc.getApproxCopySize()) { - invariant(approxCopySize->toBSON().woCompare(alreadyExistingApproxCopySize->toBSON()) == 0, - "Expected the existing and the new values for approxCopySize to be equal"); + invariant(bool(coordinatorDoc.getApproxBytesToCopy()) == + bool(coordinatorDoc.getApproxDocumentsToCopy()), + "Expected approxBytesToCopy and approxDocumentsToCopy to either both be set or to" + " both be unset"); + + if (auto alreadyExistingApproxBytesToCopy = coordinatorDoc.getApproxBytesToCopy()) { + invariant(approxCopySize->getApproxBytesToCopy() == *alreadyExistingApproxBytesToCopy, + "Expected the existing and the new values for approxBytesToCopy to be equal"); } - coordinatorDoc.setApproxCopySize(std::move(approxCopySize)); + if (auto alreadyExistingApproxDocumentsToCopy = coordinatorDoc.getApproxDocumentsToCopy()) { + invariant(approxCopySize->getApproxDocumentsToCopy() == + *alreadyExistingApproxDocumentsToCopy, + "Expected the existing and the new values for approxDocumentsToCopy to be equal"); + } + + coordinatorDoc.setReshardingApproxCopySizeStruct(std::move(*approxCopySize)); } ReshardingApproxCopySize computeApproxCopySize(ReshardingCoordinatorDocument& coordinatorDoc) { @@ -1166,9 +1187,12 @@ ReshardingApproxCopySize computeApproxCopySize(ReshardingCoordinatorDocument& co // Compute the aggregate for the number of documents and bytes to copy. long aggBytesToCopy = 0, aggDocumentsToCopy = 0; for (auto donor : coordinatorDoc.getDonorShards()) { - if (const auto cloneSizeInfo = donor.getCloneSizeInfo(); cloneSizeInfo) { - aggBytesToCopy += cloneSizeInfo->getBytesToClone().get_value_or(0); - aggDocumentsToCopy += cloneSizeInfo->getDocumentsToClone().get_value_or(0); + if (const auto bytesToClone = donor.getMutableState().getBytesToClone()) { + aggBytesToCopy += *bytesToClone; + } + + if (const auto documentsToClone = donor.getMutableState().getDocumentsToClone()) { + aggDocumentsToCopy += *documentsToClone; } } @@ -1341,7 +1365,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe _coordinatorDoc.getState() == CoordinatorStateEnum::kError) { nssToRefresh = _coordinatorDoc.getTempReshardingNss(); } else { - nssToRefresh = _coordinatorDoc.getNss(); + nssToRefresh = _coordinatorDoc.getSourceNss(); } sharding_util::tellShardsToRefreshCollection( @@ -1353,7 +1377,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefres auto opCtx = cc().makeOperationContext(); auto donorIds = extractShardIds(_coordinatorDoc.getDonorShards()); sharding_util::tellShardsToRefreshCollection( - opCtx.get(), donorIds, _coordinatorDoc.getNss(), **executor); + opCtx.get(), donorIds, _coordinatorDoc.getSourceNss(), **executor); } void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh( diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 3dc4417f02e..99f8285d2b8 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -85,11 +85,10 @@ protected: ReshardingCoordinatorDocument makeCoordinatorDoc( CoordinatorStateEnum state, boost::optional<Timestamp> fetchTimestamp = boost::none) { CommonReshardingMetadata meta( - _reshardingUUID, _originalNss, UUID::gen(), _newShardKey.toBSON()); - ReshardingCoordinatorDocument doc(_tempNss, - state, - {DonorShardEntry(ShardId("shard0000"))}, - {RecipientShardEntry(ShardId("shard0001"))}); + _reshardingUUID, _originalNss, UUID::gen(), _tempNss, _newShardKey.toBSON()); + ReshardingCoordinatorDocument doc(state, + {DonorShardEntry(ShardId("shard0000"), {})}, + {RecipientShardEntry(ShardId("shard0001"), {})}); doc.setCommonReshardingMetadata(meta); emplaceFetchTimestampIfExists(doc, std::move(fetchTimestamp)); return doc; @@ -112,7 +111,7 @@ protected: } CollectionType collType( - coordinatorDoc.getNss(), std::move(epoch), lastUpdated, std::move(uuid)); + coordinatorDoc.getSourceNss(), std::move(epoch), lastUpdated, std::move(uuid)); collType.setKeyPattern(shardKey); collType.setUnique(false); if (reshardingFields) @@ -196,10 +195,10 @@ protected: client.insert(NamespaceString::kConfigReshardingOperationsNamespace.ns(), coordinatorDoc.toBSON()); - TypeCollectionReshardingFields reshardingFields(coordinatorDoc.get_id()); + TypeCollectionReshardingFields reshardingFields(coordinatorDoc.getReshardingUUID()); reshardingFields.setState(coordinatorDoc.getState()); - reshardingFields.setDonorFields( - TypeCollectionDonorFields(coordinatorDoc.getReshardingKey())); + reshardingFields.setDonorFields(TypeCollectionDonorFields( + coordinatorDoc.getTempReshardingNss(), coordinatorDoc.getReshardingKey())); auto originalNssCatalogEntry = makeOriginalCollectionCatalogEntry( coordinatorDoc, @@ -235,13 +234,14 @@ protected: OperationContext* opCtx, ReshardingCoordinatorDocument expectedCoordinatorDoc) { DBDirectClient client(opCtx); auto doc = client.findOne(NamespaceString::kConfigReshardingOperationsNamespace.ns(), - Query(BSON("nss" << expectedCoordinatorDoc.getNss().ns()))); + Query(BSON("ns" << expectedCoordinatorDoc.getSourceNss().ns()))); auto coordinatorDoc = ReshardingCoordinatorDocument::parse( IDLParserErrorContext("ReshardingCoordinatorTest"), doc); - ASSERT_EQUALS(coordinatorDoc.get_id(), expectedCoordinatorDoc.get_id()); - ASSERT_EQUALS(coordinatorDoc.getNss(), expectedCoordinatorDoc.getNss()); + ASSERT_EQUALS(coordinatorDoc.getReshardingUUID(), + expectedCoordinatorDoc.getReshardingUUID()); + ASSERT_EQUALS(coordinatorDoc.getSourceNss(), expectedCoordinatorDoc.getSourceNss()); ASSERT_EQUALS(coordinatorDoc.getTempReshardingNss(), expectedCoordinatorDoc.getTempReshardingNss()); ASSERT_EQUALS(coordinatorDoc.getReshardingKey().toBSON().woCompare( @@ -284,14 +284,14 @@ protected: onDiskDonorShards.end(), [shardId](DonorShardEntry d) { return d.getId() == shardId; }); ASSERT(onDiskIt != onDiskDonorShards.end()); - if (it->getMinFetchTimestamp()) { - ASSERT(onDiskIt->getMinFetchTimestamp()); - ASSERT_EQUALS(onDiskIt->getMinFetchTimestamp().get(), - it->getMinFetchTimestamp().get()); + if (it->getMutableState().getMinFetchTimestamp()) { + ASSERT(onDiskIt->getMutableState().getMinFetchTimestamp()); + ASSERT_EQUALS(onDiskIt->getMutableState().getMinFetchTimestamp().get(), + it->getMutableState().getMinFetchTimestamp().get()); } else { - ASSERT(!onDiskIt->getMinFetchTimestamp()); + ASSERT(!onDiskIt->getMutableState().getMinFetchTimestamp()); } - ASSERT(onDiskIt->getState() == it->getState()); + ASSERT(onDiskIt->getMutableState().getState() == it->getMutableState().getState()); } auto expectedRecipientShards = expectedCoordinatorDoc.getRecipientShards(); @@ -304,14 +304,7 @@ protected: onDiskRecipientShards.end(), [shardId](RecipientShardEntry r) { return r.getId() == shardId; }); ASSERT(onDiskIt != onDiskRecipientShards.end()); - if (it->getStrictConsistencyTimestamp()) { - ASSERT(onDiskIt->getStrictConsistencyTimestamp()); - ASSERT_EQUALS(onDiskIt->getStrictConsistencyTimestamp().get(), - it->getStrictConsistencyTimestamp().get()); - } else { - ASSERT(!onDiskIt->getStrictConsistencyTimestamp()); - } - ASSERT(onDiskIt->getState() == it->getState()); + ASSERT(onDiskIt->getMutableState().getState() == it->getMutableState().getState()); } } @@ -344,7 +337,8 @@ protected: ASSERT(onDiskEntry.getReshardingFields()); auto onDiskReshardingFields = onDiskEntry.getReshardingFields().get(); - ASSERT(onDiskReshardingFields.getUuid() == expectedReshardingFields->getUuid()); + ASSERT(onDiskReshardingFields.getReshardingUUID() == + expectedReshardingFields->getReshardingUUID()); ASSERT(onDiskReshardingFields.getState() == expectedReshardingFields->getState()); ASSERT(onDiskReshardingFields.getDonorFields()); @@ -394,12 +388,13 @@ protected: ASSERT(onDiskEntry.getReshardingFields()); auto onDiskReshardingFields = onDiskEntry.getReshardingFields().get(); - ASSERT_EQUALS(onDiskReshardingFields.getUuid(), expectedReshardingFields.getUuid()); + ASSERT_EQUALS(onDiskReshardingFields.getReshardingUUID(), + expectedReshardingFields.getReshardingUUID()); ASSERT(onDiskReshardingFields.getState() == expectedReshardingFields.getState()); ASSERT(onDiskReshardingFields.getRecipientFields()); - ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getOriginalNamespace(), - expectedReshardingFields.getRecipientFields()->getOriginalNamespace()); + ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getSourceNss(), + expectedReshardingFields.getRecipientFields()->getSourceNss()); if (expectedReshardingFields.getRecipientFields()->getFetchTimestamp()) { ASSERT(onDiskReshardingFields.getRecipientFields()->getFetchTimestamp()); @@ -472,9 +467,11 @@ protected: // Check the resharding fields and allowMigrations in the config.collections entry for the // original collection - TypeCollectionReshardingFields expectedReshardingFields(expectedCoordinatorDoc.get_id()); + TypeCollectionReshardingFields expectedReshardingFields( + expectedCoordinatorDoc.getReshardingUUID()); expectedReshardingFields.setState(expectedCoordinatorDoc.getState()); - TypeCollectionDonorFields donorField(expectedCoordinatorDoc.getReshardingKey()); + TypeCollectionDonorFields donorField(expectedCoordinatorDoc.getTempReshardingNss(), + expectedCoordinatorDoc.getReshardingKey()); expectedReshardingFields.setDonorFields(donorField); if (auto abortReason = expectedCoordinatorDoc.getAbortReason()) { AbortReason abortReasonStruct; @@ -518,10 +515,12 @@ protected: auto opCtx = operationContext(); DBDirectClient client(opCtx); - TypeCollectionReshardingFields reshardingFields(expectedCoordinatorDoc.get_id()); + TypeCollectionReshardingFields reshardingFields( + expectedCoordinatorDoc.getReshardingUUID()); reshardingFields.setState(expectedCoordinatorDoc.getState()); reshardingFields.setDonorFields( - TypeCollectionDonorFields(expectedCoordinatorDoc.getReshardingKey())); + TypeCollectionDonorFields(expectedCoordinatorDoc.getTempReshardingNss(), + expectedCoordinatorDoc.getReshardingKey())); auto originalNssCatalogEntry = makeOriginalCollectionCatalogEntry( expectedCoordinatorDoc, @@ -618,7 +617,7 @@ protected: // Check that the entry is removed from config.reshardingOperations DBDirectClient client(opCtx); auto doc = client.findOne(NamespaceString::kConfigReshardingOperationsNamespace.ns(), - Query(BSON("nss" << expectedCoordinatorDoc.getNss().ns()))); + Query(BSON("ns" << expectedCoordinatorDoc.getSourceNss().ns()))); ASSERT(doc.isEmpty()); // Check that the resharding fields are removed from the config.collections entry and diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 4367fa45891..0ad26e7eae1 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -86,7 +86,8 @@ void createReshardingStateMachine(OperationContext* opCtx, const ReshardingDocum // exception. This is safe because PrimaryOnlyService::onStepUp() will have constructed a // new instance of the resharding state machine. auto dupeKeyInfo = ex.extraInfo<DuplicateKeyErrorInfo>(); - invariant(dupeKeyInfo->getDuplicatedKeyValue().binaryEqual(BSON("_id" << doc.get_id()))); + invariant(dupeKeyInfo->getDuplicatedKeyValue().binaryEqual( + BSON("_id" << doc.getReshardingUUID()))); } } @@ -118,7 +119,7 @@ void processAbortReasonNoDonorMachine(OperationContext* opCtx, uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument( opCtx, NamespaceString::kConfigReshardingOperationsNamespace, - BSON("_id" << reshardingFields.getUuid() << "donorShards.id" << shardId), + BSON("_id" << reshardingFields.getReshardingUUID() << "donorShards.id" << shardId), BSON("$set" << updateBuilder.done()), false /* upsert */, ShardingCatalogClient::kMajorityWriteConcern)); @@ -154,7 +155,7 @@ void processAbortReasonNoRecipientMachine(OperationContext* opCtx, uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument( opCtx, NamespaceString::kConfigReshardingOperationsNamespace, - BSON("_id" << reshardingFields.getUuid() << "recipientShards.id" << shardId), + BSON("_id" << reshardingFields.getReshardingUUID() << "recipientShards.id" << shardId), BSON("$set" << updateBuilder.done()), false /* upsert */, ShardingCatalogClient::kMajorityWriteConcern)); @@ -171,7 +172,7 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx, if (auto donorStateMachine = tryGetReshardingStateMachine<ReshardingDonorService, DonorStateMachine, ReshardingDonorDocument>( - opCtx, reshardingFields.getUuid())) { + opCtx, reshardingFields.getReshardingUUID())) { donorStateMachine->get()->onReshardingFieldsChanges(opCtx, reshardingFields); return; } @@ -218,7 +219,7 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx, if (auto recipientStateMachine = tryGetReshardingStateMachine<ReshardingRecipientService, RecipientStateMachine, ReshardingRecipientDocument>( - opCtx, reshardingFields.getUuid())) { + opCtx, reshardingFields.getReshardingUUID())) { recipientStateMachine->get()->onReshardingFieldsChanges(opCtx, reshardingFields); return; } @@ -247,7 +248,7 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx, } auto recipientDoc = - constructRecipientDocumentFromReshardingFields(opCtx, metadata, reshardingFields); + constructRecipientDocumentFromReshardingFields(opCtx, nss, metadata, reshardingFields); createReshardingStateMachine<ReshardingRecipientService, RecipientStateMachine, ReshardingRecipientDocument>(opCtx, recipientDoc); @@ -291,12 +292,17 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields( const NamespaceString& nss, const CollectionMetadata& metadata, const ReshardingFields& reshardingFields) { - auto donorDoc = ReshardingDonorDocument(DonorStateEnum::kPreparingToDonate); + DonorShardContext donorCtx; + donorCtx.setState(DonorStateEnum::kPreparingToDonate); + auto donorDoc = ReshardingDonorDocument{std::move(donorCtx)}; + + auto sourceUUID = getCollectionUUIDFromChunkManger(nss, *metadata.getChunkManager()); auto commonMetadata = - CommonReshardingMetadata(reshardingFields.getUuid(), + CommonReshardingMetadata(reshardingFields.getReshardingUUID(), nss, - getCollectionUUIDFromChunkManger(nss, *metadata.getChunkManager()), + sourceUUID, + reshardingFields.getDonorFields()->getTempReshardingNss(), reshardingFields.getDonorFields()->getReshardingKey().toBSON()); donorDoc.setCommonReshardingMetadata(std::move(commonMetadata)); @@ -305,21 +311,26 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields( ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( OperationContext* opCtx, + const NamespaceString& nss, const CollectionMetadata& metadata, const ReshardingFields& reshardingFields) { // The recipient state machines are created before the donor shards are prepared to donate but // will remain idle until the donor shards are prepared to donate. invariant(!reshardingFields.getRecipientFields()->getFetchTimestamp()); - auto recipientDoc = - ReshardingRecipientDocument(RecipientStateEnum::kAwaitingFetchTimestamp, - reshardingFields.getRecipientFields()->getDonorShardIds()); + RecipientShardContext recipientCtx; + recipientCtx.setState(RecipientStateEnum::kAwaitingFetchTimestamp); - auto commonMetadata = - CommonReshardingMetadata(reshardingFields.getUuid(), - reshardingFields.getRecipientFields()->getOriginalNamespace(), - reshardingFields.getRecipientFields()->getExistingUUID(), - metadata.getShardKeyPattern().toBSON()); + auto recipientDoc = ReshardingRecipientDocument{ + std::move(recipientCtx), reshardingFields.getRecipientFields()->getDonorShardIds()}; + + auto sourceNss = reshardingFields.getRecipientFields()->getSourceNss(); + auto sourceUUID = reshardingFields.getRecipientFields()->getSourceUUID(); + auto commonMetadata = CommonReshardingMetadata(reshardingFields.getReshardingUUID(), + sourceNss, + sourceUUID, + nss, + metadata.getShardKeyPattern().toBSON()); recipientDoc.setCommonReshardingMetadata(std::move(commonMetadata)); return recipientDoc; diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h index 309be7d5230..8930ddd3fe0 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h @@ -44,7 +44,7 @@ using ReshardingFields = TypeCollectionReshardingFields; template <class Service, class StateMachine, class ReshardingDocument> boost::optional<std::shared_ptr<StateMachine>> tryGetReshardingStateMachine( OperationContext* opCtx, const UUID& reshardingUUID) { - auto instanceId = BSON(ReshardingDocument::k_idFieldName << reshardingUUID); + auto instanceId = BSON(ReshardingDocument::kReshardingUUIDFieldName << reshardingUUID); auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); auto service = registry->lookupServiceByName(Service::kServiceName); return StateMachine::lookup(opCtx, service, instanceId); @@ -60,6 +60,7 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields( ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( OperationContext* opCtx, + const NamespaceString& nss, const CollectionMetadata& metadata, const ReshardingFields& reshardingFields); diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index 0da9f7f56b1..e5193d93374 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -72,7 +72,7 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, reshardingFields, kShardIds, kExistingUUID, kOriginalNss); auto recipientDoc = resharding::constructRecipientDocumentFromReshardingFields( - opCtx, metadata, reshardingFields); + opCtx, kTemporaryReshardingNss, metadata, reshardingFields); assertRecipientDocMatchesReshardingFields(metadata, reshardingFields, recipientDoc); } diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h index 86178f5dc09..790f6eb5ddb 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h @@ -135,7 +135,7 @@ protected: void appendDonorFieldsToReshardingFields(ReshardingFields& fields, const BSONObj& reshardingKey) { - fields.setDonorFields(TypeCollectionDonorFields(reshardingKey)); + fields.setDonorFields(TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey)); } void appendRecipientFieldsToReshardingFields( @@ -156,9 +156,9 @@ protected: const UUID& existingUUID, const BSONObj& reshardingKey, const ReshardingDocument& reshardingDoc) { - ASSERT_EQ(reshardingDoc.get_id(), reshardingUUID); - ASSERT_EQ(reshardingDoc.getNss(), nss); - ASSERT_EQ(reshardingDoc.getExistingUUID(), existingUUID); + ASSERT_EQ(reshardingDoc.getReshardingUUID(), reshardingUUID); + ASSERT_EQ(reshardingDoc.getSourceNss(), nss); + ASSERT_EQ(reshardingDoc.getSourceUUID(), existingUUID); ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey); } @@ -168,12 +168,12 @@ protected: const ReshardingDonorDocument& donorDoc) { assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>( nss, - reshardingFields.getUuid(), + reshardingFields.getReshardingUUID(), existingUUID, reshardingFields.getDonorFields()->getReshardingKey().toBSON(), donorDoc); - ASSERT(donorDoc.getState() == DonorStateEnum::kPreparingToDonate); - ASSERT(donorDoc.getMinFetchTimestamp() == boost::none); + ASSERT(donorDoc.getMutableState().getState() == DonorStateEnum::kPreparingToDonate); + ASSERT(donorDoc.getMutableState().getMinFetchTimestamp() == boost::none); } void assertRecipientDocMatchesReshardingFields( @@ -181,13 +181,14 @@ protected: const ReshardingFields& reshardingFields, const ReshardingRecipientDocument& recipientDoc) { assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>( - reshardingFields.getRecipientFields()->getOriginalNamespace(), - reshardingFields.getUuid(), - reshardingFields.getRecipientFields()->getExistingUUID(), + reshardingFields.getRecipientFields()->getSourceNss(), + reshardingFields.getReshardingUUID(), + reshardingFields.getRecipientFields()->getSourceUUID(), metadata.getShardKeyPattern().toBSON(), recipientDoc); - ASSERT(recipientDoc.getState() == RecipientStateEnum::kAwaitingFetchTimestamp); + ASSERT(recipientDoc.getMutableState().getState() == + RecipientStateEnum::kAwaitingFetchTimestamp); ASSERT(!recipientDoc.getFetchTimestamp()); auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds(); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index edc2ac24298..79c16d97ddf 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -74,13 +74,8 @@ ChunkManager getShardedCollectionRoutingInfoWithRefreshAndFlush(const NamespaceS return routingInfo; } -void refreshTemporaryReshardingCollection(const ReshardingDonorDocument& donorDoc) { - auto tempNss = - constructTemporaryReshardingNss(donorDoc.getNss().db(), donorDoc.getExistingUUID()); - std::ignore = getShardedCollectionRoutingInfoWithRefreshAndFlush(tempNss); -} - -Timestamp generateMinFetchTimestamp(const ReshardingDonorDocument& donorDoc) { +Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss, + const CollectionUUID& sourceUUID) { auto opCtx = cc().makeOperationContext(); // Do a no-op write and use the OpTime as the minFetchTimestamp @@ -89,19 +84,19 @@ Timestamp generateMinFetchTimestamp(const ReshardingDonorDocument& donorDoc) { "resharding donor minFetchTimestamp", NamespaceString::kRsOplogNamespace.ns(), [&] { - AutoGetDb db(opCtx.get(), donorDoc.getNss().db(), MODE_IX); - Lock::CollectionLock collLock(opCtx.get(), donorDoc.getNss(), MODE_S); + AutoGetDb db(opCtx.get(), sourceNss.db(), MODE_IX); + Lock::CollectionLock collLock(opCtx.get(), sourceNss, MODE_S); AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite); const std::string msg = str::stream() - << "All future oplog entries on the namespace " << donorDoc.getNss().ns() + << "All future oplog entries on the namespace " << sourceNss.ns() << " must include a 'destinedRecipient' field"; WriteUnitOfWork wuow(opCtx.get()); opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( opCtx.get(), - donorDoc.getNss(), - donorDoc.getExistingUUID(), + sourceNss, + sourceUUID, {}, BSON("msg" << msg), boost::none, @@ -141,10 +136,13 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::cons } ReshardingDonorService::DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc) + : DonorStateMachine(ReshardingDonorDocument::parse({"DonorStateMachine"}, donorDoc)) {} + +ReshardingDonorService::DonorStateMachine::DonorStateMachine( + const ReshardingDonorDocument& donorDoc) : repl::PrimaryOnlyService::TypedInstance<DonorStateMachine>(), - _donorDoc(ReshardingDonorDocument::parse(IDLParserErrorContext("ReshardingDonorDocument"), - donorDoc)), - _id(_donorDoc.getCommonReshardingMetadata().get_id()) {} + _metadata{donorDoc.getCommonReshardingMetadata()}, + _donorCtx{donorDoc.getMutableState()} {} ReshardingDonorService::DonorStateMachine::~DonorStateMachine() { stdx::lock_guard<Latch> lg(_mutex); @@ -174,15 +172,18 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( .onError([this](Status status) { LOGV2(4956400, "Resharding operation donor state machine failed", - "namespace"_attr = _donorDoc.getNss().ns(), - "reshardingId"_attr = _id, + "namespace"_attr = _metadata.getSourceNss(), + "reshardingUUID"_attr = _metadata.getReshardingUUID(), "error"_attr = status); - _transitionStateAndUpdateCoordinator(DonorStateEnum::kError, boost::none, status); + + _transitionToError(status); + _updateCoordinator(); // TODO SERVER-52838: Ensure all local collections that may have been created for // resharding are removed, with the exception of the ReshardingDonorDocument, before // transitioning to kDone. - _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone, boost::none, status); + _transitionState(DonorStateEnum::kDone); + _updateCoordinator(); return status; }) .onCompletion([this, self = shared_from_this()](Status status) { @@ -227,9 +228,9 @@ boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCur MongoProcessInterface::CurrentOpConnectionsMode connMode, MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { ReshardingMetrics::ReporterOptions options(ReshardingMetrics::ReporterOptions::Role::kDonor, - _id, - _donorDoc.getNss(), - _donorDoc.getReshardingKey().toBSON(), + _metadata.getReshardingUUID(), + _metadata.getSourceNss(), + _metadata.getReshardingKey().toBSON(), false); return ReshardingMetrics::get(cc().getServiceContext())->reportForCurrentOp(options); } @@ -249,7 +250,7 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( } if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) { - _critSec.emplace(opCtx->getServiceContext(), _donorDoc.getNss()); + _critSec.emplace(opCtx->getServiceContext(), _metadata.getSourceNss()); ensureFulfilledPromise(lk, _allRecipientsDoneApplying); } @@ -265,58 +266,59 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( void ReshardingDonorService::DonorStateMachine:: _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() { - if (_donorDoc.getState() > DonorStateEnum::kPreparingToDonate) { - invariant(_donorDoc.getMinFetchTimestamp()); + if (_donorCtx.getState() > DonorStateEnum::kPreparingToDonate) { + invariant(_donorCtx.getMinFetchTimestamp()); + invariant(_donorCtx.getBytesToClone()); + invariant(_donorCtx.getDocumentsToClone()); return; } - ReshardingCloneSize cloneSizeEstimate; + int64_t bytesToClone = 0; + int64_t documentsToClone = 0; + { auto opCtx = cc().makeOperationContext(); auto rawOpCtx = opCtx.get(); - const auto shardId = ShardingState::get(rawOpCtx)->shardId(); - - const auto& nss = _donorDoc.getNss(); - const auto& nssUUID = _donorDoc.getExistingUUID(); - const auto& reshardingUUID = _donorDoc.get_id(); - - AutoGetCollectionForRead coll(rawOpCtx, _donorDoc.getNss()); - if (!coll) { - cloneSizeEstimate.setBytesToClone(0); - cloneSizeEstimate.setDocumentsToClone(0); - } else { - cloneSizeEstimate.setBytesToClone(coll->dataSize(rawOpCtx)); - cloneSizeEstimate.setDocumentsToClone(coll->numRecords(rawOpCtx)); - } - LOGV2_DEBUG(5390702, - 2, - "Resharding estimated size", - "reshardingUUID"_attr = reshardingUUID, - "namespace"_attr = nss, - "donorShardId"_attr = shardId, - "sizeInfo"_attr = cloneSizeEstimate); + AutoGetCollection coll(rawOpCtx, _metadata.getSourceNss(), MODE_IS); + if (coll) { + IndexBuildsCoordinator::get(rawOpCtx)->assertNoIndexBuildInProgForCollection( + coll->uuid()); - IndexBuildsCoordinator::get(rawOpCtx)->assertNoIndexBuildInProgForCollection(nssUUID); + bytesToClone = coll->dataSize(rawOpCtx); + documentsToClone = coll->numRecords(rawOpCtx); + } } - // Recipient shards expect to read from the donor shard's existing sharded collection - // and the config.cache.chunks collection of the temporary resharding collection using - // {atClusterTime: <fetchTimestamp>}. Refreshing the temporary resharding collection on - // the donor shards causes them to create the config.cache.chunks collection. Without - // this refresh, the {atClusterTime: <fetchTimestamp>} read on the config.cache.chunks - // namespace would fail with a SnapshotUnavailable error response. - refreshTemporaryReshardingCollection(_donorDoc); - - auto minFetchTimestamp = generateMinFetchTimestamp(_donorDoc); - _transitionStateAndUpdateCoordinator( - DonorStateEnum::kDonatingInitialData, minFetchTimestamp, boost::none, cloneSizeEstimate); + // Recipient shards expect to read from the donor shard's existing sharded collection and the + // config.cache.chunks collection of the temporary resharding collection using + // {atClusterTime: <fetchTimestamp>}. Refreshing the temporary resharding collection on the + // donor shards causes them to create the config.cache.chunks collection. Without this refresh, + // the {atClusterTime: <fetchTimestamp>} read on the config.cache.chunks namespace would fail + // with a SnapshotUnavailable error response. + std::ignore = + getShardedCollectionRoutingInfoWithRefreshAndFlush(_metadata.getTempReshardingNss()); + + Timestamp minFetchTimestamp = + generateMinFetchTimestamp(_metadata.getSourceNss(), _metadata.getSourceUUID()); + + LOGV2_DEBUG(5390702, + 2, + "Collection being resharded now ready for recipients to begin cloning", + "namespace"_attr = _metadata.getSourceNss(), + "minFetchTimestamp"_attr = minFetchTimestamp, + "bytesToClone"_attr = bytesToClone, + "documentsToClone"_attr = documentsToClone, + "reshardingUUID"_attr = _metadata.getReshardingUUID()); + + _transitionToDonatingInitialData(minFetchTimestamp, bytesToClone, documentsToClone); + _updateCoordinator(); } ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: _awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - if (_donorDoc.getState() > DonorStateEnum::kDonatingInitialData) { + if (_donorCtx.getState() > DonorStateEnum::kDonatingInitialData) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -333,7 +335,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - if (_donorDoc.getState() > DonorStateEnum::kDonatingOplogEntries) { + if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -344,28 +346,25 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: void ReshardingDonorService::DonorStateMachine:: _writeTransactionOplogEntryThenTransitionToBlockingWrites() { - if (_donorDoc.getState() > DonorStateEnum::kPreparingToBlockWrites) { + if (_donorCtx.getState() > DonorStateEnum::kPreparingToBlockWrites) { return; } { - const auto& nss = _donorDoc.getNss(); - const auto& nssUUID = _donorDoc.getExistingUUID(); - const auto& reshardingUUID = _donorDoc.get_id(); auto opCtx = cc().makeOperationContext(); auto rawOpCtx = opCtx.get(); auto generateOplogEntry = [&](ShardId destinedRecipient) { repl::MutableOplogEntry oplog; - oplog.setNss(nss); + oplog.setNss(_metadata.getSourceNss()); oplog.setOpType(repl::OpTypeEnum::kNoop); - oplog.setUuid(nssUUID); + oplog.setUuid(_metadata.getSourceUUID()); oplog.setDestinedRecipient(destinedRecipient); oplog.setObject( BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", - nss.toString()))); - oplog.setObject2( - BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << reshardingUUID)); + _metadata.getSourceNss().toString()))); + oplog.setObject2(BSON("type" << kReshardFinalOpLogType << "reshardingUUID" + << _metadata.getReshardingUUID())); oplog.setOpTime(OplogSlot()); oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); return oplog; @@ -374,7 +373,8 @@ void ReshardingDonorService::DonorStateMachine:: try { Timer latency; - const auto recipients = getRecipientShards(rawOpCtx, nss, nssUUID); + const auto recipients = + getRecipientShards(rawOpCtx, _metadata.getSourceNss(), _metadata.getSourceUUID()); for (const auto& recipient : recipients) { auto oplog = generateOplogEntry(recipient); @@ -401,8 +401,8 @@ void ReshardingDonorService::DonorStateMachine:: LOGV2_DEBUG(5279504, 0, "Committed oplog entries to temporarily block writes for resharding", - "namespace"_attr = nss, - "reshardingUUID"_attr = reshardingUUID, + "namespace"_attr = _metadata.getSourceNss(), + "reshardingUUID"_attr = _metadata.getReshardingUUID(), "numRecipients"_attr = recipients.size(), "duration"_attr = duration_cast<Milliseconds>(latency.elapsed())); ensureFulfilledPromise(lg, _finalOplogEntriesWritten); @@ -412,7 +412,7 @@ void ReshardingDonorService::DonorStateMachine:: stdx::lock_guard<Latch> lg(_mutex); LOGV2_ERROR(5279508, "Exception while writing resharding final oplog entries", - "reshardingUUID"_attr = reshardingUUID, + "reshardingUUID"_attr = _metadata.getReshardingUUID(), "error"_attr = status); ensureFulfilledPromise(lg, _finalOplogEntriesWritten, status); uassertStatusOK(status); @@ -429,7 +429,7 @@ SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitFinalOplo ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: _awaitCoordinatorHasDecisionPersistedThenTransitionToDropping( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - if (_donorDoc.getState() > DonorStateEnum::kBlockingWrites) { + if (_donorCtx.getState() > DonorStateEnum::kBlockingWrites) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -439,80 +439,76 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: } void ReshardingDonorService::DonorStateMachine::_dropOriginalCollection() { - if (_donorDoc.getState() > DonorStateEnum::kDropping) { + if (_donorCtx.getState() > DonorStateEnum::kDropping) { return; } { auto opCtx = cc().makeOperationContext(); resharding::data_copy::ensureCollectionDropped( - opCtx.get(), _donorDoc.getNss(), _donorDoc.getExistingUUID()); + opCtx.get(), _metadata.getSourceNss(), _metadata.getSourceUUID()); } - _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone); + _transitionState(DonorStateEnum::kDone); + _updateCoordinator(); } -void ReshardingDonorService::DonorStateMachine::_transitionState( - DonorStateEnum endState, - boost::optional<Timestamp> minFetchTimestamp, - boost::optional<Status> abortReason) { - ReshardingDonorDocument replacementDoc(_donorDoc); - replacementDoc.setState(endState); +void ReshardingDonorService::DonorStateMachine::_transitionState(DonorStateEnum newState) { + invariant(newState != DonorStateEnum::kDonatingInitialData && + newState != DonorStateEnum::kError); - emplaceMinFetchTimestampIfExists(replacementDoc, minFetchTimestamp); - emplaceAbortReasonIfExists(replacementDoc, abortReason); + auto newDonorCtx = _donorCtx; + newDonorCtx.setState(newState); + _transitionState(std::move(newDonorCtx)); +} +void ReshardingDonorService::DonorStateMachine::_transitionState(DonorShardContext&& newDonorCtx) { // For logging purposes. - auto oldState = _donorDoc.getState(); - auto newState = replacementDoc.getState(); + auto oldState = _donorCtx.getState(); + auto newState = newDonorCtx.getState(); - _updateDonorDocument(std::move(replacementDoc)); + _updateDonorDocument(std::move(newDonorCtx)); LOGV2_INFO(5279505, "Transitioned resharding donor state", "newState"_attr = DonorState_serializer(newState), "oldState"_attr = DonorState_serializer(oldState), - "ns"_attr = _donorDoc.getNss(), - "collectionUUID"_attr = _donorDoc.getExistingUUID(), - "reshardingUUID"_attr = _donorDoc.get_id()); + "namespace"_attr = _metadata.getSourceNss(), + "collectionUUID"_attr = _metadata.getSourceUUID(), + "reshardingUUID"_attr = _metadata.getReshardingUUID()); } -void ReshardingDonorService::DonorStateMachine::_transitionStateAndUpdateCoordinator( - DonorStateEnum endState, - boost::optional<Timestamp> minFetchTimestamp, - boost::optional<Status> abortReason, - boost::optional<ReshardingCloneSize> cloneSizeEstimate) { - _transitionState(endState, minFetchTimestamp, abortReason); +void ReshardingDonorService::DonorStateMachine::_transitionToDonatingInitialData( + Timestamp minFetchTimestamp, int64_t bytesToClone, int64_t documentsToClone) { + auto newDonorCtx = _donorCtx; + newDonorCtx.setState(DonorStateEnum::kDonatingInitialData); + newDonorCtx.setMinFetchTimestamp(minFetchTimestamp); + newDonorCtx.setBytesToClone(bytesToClone); + newDonorCtx.setDocumentsToClone(documentsToClone); + _transitionState(std::move(newDonorCtx)); +} + +void ReshardingDonorService::DonorStateMachine::_transitionToError(Status abortReason) { + auto newDonorCtx = _donorCtx; + newDonorCtx.setState(DonorStateEnum::kError); + emplaceAbortReasonIfExists(newDonorCtx, abortReason); + _transitionState(std::move(newDonorCtx)); +} +void ReshardingDonorService::DonorStateMachine::_updateCoordinator() { auto opCtx = cc().makeOperationContext(); auto shardId = ShardingState::get(opCtx.get())->shardId(); - BSONObjBuilder updateBuilder; - updateBuilder.append("donorShards.$.state", DonorState_serializer(endState)); - - if (minFetchTimestamp) { - updateBuilder.append("donorShards.$.minFetchTimestamp", minFetchTimestamp.get()); - } - - if (abortReason) { - BSONObjBuilder abortReasonBuilder; - abortReason.get().serializeErrorToBSON(&abortReasonBuilder); - updateBuilder.append("donorShards.$.abortReason", abortReasonBuilder.obj()); - } - - if (cloneSizeEstimate) { - updateBuilder.append("donorShards.$.cloneSizeInfo", cloneSizeEstimate.get().toBSON()); - } - uassertStatusOK( Grid::get(opCtx.get()) ->catalogClient() - ->updateConfigDocument(opCtx.get(), - NamespaceString::kConfigReshardingOperationsNamespace, - BSON("_id" << _donorDoc.get_id() << "donorShards.id" << shardId), - BSON("$set" << updateBuilder.done()), - false /* upsert */, - ShardingCatalogClient::kMajorityWriteConcern)); + ->updateConfigDocument( + opCtx.get(), + NamespaceString::kConfigReshardingOperationsNamespace, + BSON("_id" << _metadata.getReshardingUUID() << "donorShards.id" << shardId), + BSON("$set" << BSON("donorShards.$.mutableState" << _donorCtx.toBSON())), + false /* upsert */, + ShardingCatalogClient::kMajorityWriteConcern)); } void ReshardingDonorService::DonorStateMachine::insertStateDocument( @@ -523,26 +519,28 @@ void ReshardingDonorService::DonorStateMachine::insertStateDocument( } void ReshardingDonorService::DonorStateMachine::_updateDonorDocument( - ReshardingDonorDocument&& replacementDoc) { + DonorShardContext&& newDonorCtx) { auto opCtx = cc().makeOperationContext(); PersistentTaskStore<ReshardingDonorDocument> store( NamespaceString::kDonorReshardingOperationsNamespace); - store.update(opCtx.get(), - BSON(ReshardingDonorDocument::k_idFieldName << _id), - replacementDoc.toBSON(), - WriteConcerns::kMajorityWriteConcern); + store.update( + opCtx.get(), + BSON(ReshardingDonorDocument::kReshardingUUIDFieldName << _metadata.getReshardingUUID()), + BSON("$set" << BSON(ReshardingDonorDocument::kMutableStateFieldName + << newDonorCtx.toBSON())), + WriteConcerns::kMajorityWriteConcern); - _donorDoc = replacementDoc; + _donorCtx = newDonorCtx; } void ReshardingDonorService::DonorStateMachine::_removeDonorDocument() { auto opCtx = cc().makeOperationContext(); PersistentTaskStore<ReshardingDonorDocument> store( NamespaceString::kDonorReshardingOperationsNamespace); - store.remove(opCtx.get(), - BSON(ReshardingDonorDocument::k_idFieldName << _id), - WriteConcerns::kMajorityWriteConcern); - _donorDoc = {}; + store.remove( + opCtx.get(), + BSON(ReshardingDonorDocument::kReshardingUUIDFieldName << _metadata.getReshardingUUID()), + WriteConcerns::kMajorityWriteConcern); } void ReshardingDonorService::DonorStateMachine::_onAbortOrStepdown(WithLock, Status status) { diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 6e772b886ec..1f73febf005 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -101,6 +101,8 @@ public: const ReshardingDonorDocument& donorDoc); private: + DonorStateMachine(const ReshardingDonorDocument& donorDoc); + // The following functions correspond to the actions to take at a particular donor state. void _transitionToPreparingToDonate(); @@ -121,36 +123,41 @@ private: // or NamespaceNotFound. void _dropOriginalCollection(); - // Transitions the state on-disk and in-memory to 'endState'. - void _transitionState(DonorStateEnum endState, - boost::optional<Timestamp> minFetchTimestamp = boost::none, - boost::optional<Status> abortReason = boost::none); + // Transitions the on-disk and in-memory state to 'newState'. + void _transitionState(DonorStateEnum newState); + + void _transitionState(DonorShardContext&& newDonorCtx); + + // Transitions the on-disk and in-memory state to DonorStateEnum::kDonatingInitialData. + void _transitionToDonatingInitialData(Timestamp minFetchTimestamp, + int64_t bytesToClone, + int64_t documentsToClone); - void _transitionStateAndUpdateCoordinator( - DonorStateEnum endState, - boost::optional<Timestamp> minFetchTimestamp = boost::none, - boost::optional<Status> abortReason = boost::none, - boost::optional<ReshardingCloneSize> cloneSizeEstimate = boost::none); + // Transitions the on-disk and in-memory state to DonorStateEnum::kError. + void _transitionToError(Status abortReason); - // Updates the donor document on-disk and in-memory with the 'replacementDoc.' - void _updateDonorDocument(ReshardingDonorDocument&& replacementDoc); + void _updateCoordinator(); - // Removes the local donor document from disk and clears the in-memory state. + // Updates the mutable portion of the on-disk and in-memory donor document with 'newDonorCtx'. + void _updateDonorDocument(DonorShardContext&& newDonorCtx); + + // Removes the local donor document from disk. void _removeDonorDocument(); // Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors // (abort resharding). void _onAbortOrStepdown(WithLock lk, Status status); - // The in-memory representation of the underlying document in + // The in-memory representation of the immutable portion of the document in // config.localReshardingOperations.donor. - ReshardingDonorDocument _donorDoc; + const CommonReshardingMetadata _metadata; - // The id both for the resharding operation and for the primary-only-service instance. - const UUID _id; + // The in-memory representation of the mutable portion of the document in + // config.localReshardingOperations.donor. + DonorShardContext _donorCtx; // Protects the promises below - Mutex _mutex = MONGO_MAKE_LATCH("ReshardingDonor::_mutex"); + Mutex _mutex = MONGO_MAKE_LATCH("DonorStateMachine::_mutex"); boost::optional<ReshardingCriticalSection> _critSec; diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index 415fa8c2247..6f686c59731 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -142,7 +142,6 @@ protected: std::shared_ptr<ReshardingDonorService::DonorStateMachine> getStateMachineInstace( OperationContext* opCtx, ReshardingDonorDocument initialState) { - auto instanceId = BSON(ReshardingDonorDocument::k_idFieldName << initialState.get_id()); auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); auto service = registry->lookupServiceByName(ReshardingDonorService::kServiceName); return ReshardingDonorService::DonorStateMachine::getOrCreate( @@ -150,7 +149,7 @@ protected: } std::vector<BSONObj> getOplogWritesForDonorDocument(const ReshardingDonorDocument& doc) { - auto reshardNs = doc.getNss().toString(); + auto reshardNs = doc.getSourceNss().toString(); DBDirectClient client(operationContext()); auto result = client.query(NamespaceString(NamespaceString::kRsOplogNamespace.ns()), BSON("ns" << reshardNs)); @@ -191,13 +190,19 @@ protected: TEST_F(ReshardingDonorServiceTest, ShouldWriteFinalOpLogEntryAfterTransitionToPreparingToBlockWrites) { - ReshardingDonorDocument doc(DonorStateEnum::kPreparingToBlockWrites); + DonorShardContext donorCtx; + donorCtx.setState(DonorStateEnum::kPreparingToBlockWrites); + + ReshardingDonorDocument doc(std::move(donorCtx)); CommonReshardingMetadata metadata(kReshardingUUID, mongo::NamespaceString(kReshardNs), kExistingUUID, + kTemporaryReshardingNss, KeyPattern(kReshardingKeyPattern)); doc.setCommonReshardingMetadata(metadata); - doc.getMinFetchTimestampStruct().setMinFetchTimestamp(Timestamp{0xf00}); + doc.getMutableState().setMinFetchTimestamp(Timestamp(10, 1)); + doc.getMutableState().setBytesToClone(1000); + doc.getMutableState().setDocumentsToClone(5); auto donorStateMachine = getStateMachineInstace(operationContext(), doc); ASSERT(donorStateMachine); @@ -227,7 +232,7 @@ TEST_F(ReshardingDonorServiceTest, ASSERT(o2.hasField("reshardingUUID")); auto actualReshardingUUIDBson = o2.getField("reshardingUUID"); auto actualReshardingUUID = UUID::parse(actualReshardingUUIDBson); - ASSERT_EQUALS(doc.get_id(), actualReshardingUUID); + ASSERT_EQUALS(doc.getReshardingUUID(), actualReshardingUUID); ASSERT(oplog.hasField("ui")); auto actualUiBson = oplog.getField("ui"); diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp index 026d16587e6..54cf5b5ecd0 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp @@ -59,8 +59,8 @@ std::shared_ptr<ReshardingCoordinatorObserver> getReshardingCoordinatorObserver( boost::optional<Timestamp> parseNewMinFetchTimestampValue(const BSONObj& obj) { auto doc = ReshardingDonorDocument::parse(IDLParserErrorContext("Resharding"), obj); - if (doc.getState() == DonorStateEnum::kDonatingInitialData) { - return doc.getMinFetchTimestamp().get(); + if (doc.getMutableState().getState() == DonorStateEnum::kDonatingInitialData) { + return doc.getMutableState().getMinFetchTimestamp().get(); } else { return boost::none; } @@ -208,8 +208,8 @@ void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEn if (args.nss == NamespaceString::kConfigReshardingOperationsNamespace) { auto newCoordinatorDoc = ReshardingCoordinatorDocument::parse( IDLParserErrorContext("reshardingCoordinatorDoc"), args.updateArgs.updatedDoc); - auto reshardingId = - BSON(ReshardingCoordinatorDocument::k_idFieldName << newCoordinatorDoc.get_id()); + auto reshardingId = BSON(ReshardingCoordinatorDocument::kReshardingUUIDFieldName + << newCoordinatorDoc.getReshardingUUID()); auto observer = getReshardingCoordinatorObserver(opCtx, reshardingId); opCtx->recoveryUnit()->onCommit( [observer = std::move(observer), newCoordinatorDoc = std::move(newCoordinatorDoc)]( diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl b/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl index 03b997873b4..6ba2c9a0305 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl @@ -40,6 +40,10 @@ imports: structs: ReshardingOplogApplierProgress: description: "Used for storing the progress made by the resharding oplog applier." + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: _id: type: ReshardingSourceId @@ -50,4 +54,3 @@ structs: description: >- The minimum point where the resharding oplog applier can start without missing any oplog it needs to process. - diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 54259a87643..117d0fffea4 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -217,10 +217,16 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService:: ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( const BSONObj& recipientDoc) + : RecipientStateMachine( + ReshardingRecipientDocument::parse({"RecipientStateMachine"}, recipientDoc)) {} + +ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( + const ReshardingRecipientDocument& recipientDoc) : repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(), - _recipientDoc(ReshardingRecipientDocument::parse( - IDLParserErrorContext("ReshardingRecipientDocument"), recipientDoc)), - _id(_recipientDoc.getCommonReshardingMetadata().get_id()) {} + _metadata{recipientDoc.getCommonReshardingMetadata()}, + _donorShardIds{recipientDoc.getDonorShards()}, + _recipientCtx{recipientDoc.getMutableState()}, + _fetchTimestamp{recipientDoc.getFetchTimestamp()} {} ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() { stdx::lock_guard<Latch> lg(_mutex); @@ -252,16 +258,17 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( .onError([this](Status status) { LOGV2(4956500, "Resharding operation recipient state machine failed", - "namespace"_attr = _recipientDoc.getNss().ns(), - "reshardingId"_attr = _id, + "namespace"_attr = _metadata.getSourceNss(), + "reshardingUUID"_attr = _metadata.getReshardingUUID(), "error"_attr = status); - _transitionState(RecipientStateEnum::kError, boost::none, status); + + _transitionToError(status); _updateCoordinator(); // TODO SERVER-52838: Ensure all local collections that may have been created for // resharding are removed, with the exception of the ReshardingRecipientDocument, before // transitioning to kDone. - _transitionState(RecipientStateEnum::kDone, boost::none, status); + _transitionState(RecipientStateEnum::kDone); _updateCoordinator(); return status; }) @@ -319,9 +326,9 @@ boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::repo MongoProcessInterface::CurrentOpConnectionsMode, MongoProcessInterface::CurrentOpSessionsMode) noexcept { ReshardingMetrics::ReporterOptions options(ReshardingMetrics::ReporterOptions::Role::kRecipient, - _id, - _recipientDoc.getNss(), - _recipientDoc.getReshardingKey().toBSON(), + _metadata.getReshardingUUID(), + _metadata.getSourceNss(), + _metadata.getReshardingKey().toBSON(), false); return _metrics()->reportForCurrentOp(options); } @@ -351,47 +358,44 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - if (_recipientDoc.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { - invariant(_recipientDoc.getFetchTimestamp()); + if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { + invariant(_fetchTimestamp); return ExecutorFuture(**executor); } return _allDonorsPreparedToDonate.getFuture() .thenRunOn(**executor) - .then([this](Timestamp fetchTimestamp) { - _transitionState(RecipientStateEnum::kCreatingCollection, fetchTimestamp); - }); + .then( + [this](Timestamp fetchTimestamp) { _transitionToCreatingCollection(fetchTimestamp); }); } void ReshardingRecipientService::RecipientStateMachine:: _createTemporaryReshardingCollectionThenTransitionToCloning() { - if (_recipientDoc.getState() > RecipientStateEnum::kCreatingCollection) { + if (_recipientCtx.getState() > RecipientStateEnum::kCreatingCollection) { return; } { auto opCtx = cc().makeOperationContext(); - auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), - _recipientDoc.getExistingUUID()); resharding::createTemporaryReshardingCollectionLocally(opCtx.get(), - _recipientDoc.getNss(), - tempNss, - _recipientDoc.get_id(), - _recipientDoc.getExistingUUID(), - *_recipientDoc.getFetchTimestamp()); + _metadata.getSourceNss(), + _metadata.getTempReshardingNss(), + _metadata.getReshardingUUID(), + _metadata.getSourceUUID(), + *_fetchTimestamp); - ShardKeyPattern shardKeyPattern(_recipientDoc.getReshardingKey()); + ShardKeyPattern shardKeyPattern{_metadata.getReshardingKey()}; auto catalogCache = Grid::get(opCtx.get())->catalogCache(); shardVersionRetry(opCtx.get(), catalogCache, - tempNss, + _metadata.getTempReshardingNss(), "validating shard key index for reshardCollection"_sd, [&] { shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible( opCtx.get(), - tempNss, + _metadata.getTempReshardingNss(), shardKeyPattern.toBSON(), shardKeyPattern, CollationSpec::kSimpleSpec, @@ -406,7 +410,8 @@ void ReshardingRecipientService::RecipientStateMachine:: void ReshardingRecipientService::RecipientStateMachine::_initTxnCloner( OperationContext* opCtx, const Timestamp& fetchTimestamp) { auto catalogCache = Grid::get(opCtx)->catalogCache(); - auto routingInfo = catalogCache->getShardedCollectionRoutingInfo(opCtx, _recipientDoc.getNss()); + auto routingInfo = + catalogCache->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss()); std::set<ShardId> shardList; const auto myShardId = ShardingState::get(opCtx)->shardId(); @@ -414,8 +419,8 @@ void ReshardingRecipientService::RecipientStateMachine::_initTxnCloner( shardList.erase(myShardId); for (const auto& shard : shardList) { - _txnCloners.push_back( - std::make_unique<ReshardingTxnCloner>(ReshardingSourceId(_id, shard), fetchTimestamp)); + _txnCloners.push_back(std::make_unique<ReshardingTxnCloner>( + ReshardingSourceId(_metadata.getReshardingUUID(), shard), fetchTimestamp)); } } @@ -423,32 +428,30 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancelationToken& cancelToken) { - if (_recipientDoc.getState() > RecipientStateEnum::kCloning) { + if (_recipientCtx.getState() > RecipientStateEnum::kCloning) { return ExecutorFuture(**executor); } auto* serviceContext = Client::getCurrent()->getServiceContext(); - auto fetchTimestamp = *_recipientDoc.getFetchTimestamp(); - auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), - _recipientDoc.getExistingUUID()); + auto fetchTimestamp = *_fetchTimestamp; _collectionCloner = std::make_unique<ReshardingCollectionCloner>( std::make_unique<ReshardingCollectionCloner::Env>(_metrics()), - ShardKeyPattern(_recipientDoc.getReshardingKey()), - _recipientDoc.getNss(), - _recipientDoc.getExistingUUID(), + ShardKeyPattern{_metadata.getReshardingKey()}, + _metadata.getSourceNss(), + _metadata.getSourceUUID(), ShardingState::get(serviceContext)->shardId(), fetchTimestamp, - std::move(tempNss)); + _metadata.getTempReshardingNss()); { auto scopedOpCtx = cc().makeOperationContext(); auto opCtx = scopedOpCtx.get(); - _initTxnCloner(opCtx, *_recipientDoc.getFetchTimestamp()); + _initTxnCloner(opCtx, *_fetchTimestamp); } - auto numDonors = _recipientDoc.getDonorShards().size(); + auto numDonors = _donorShardIds.size(); _oplogFetchers.reserve(numDonors); _oplogFetcherFutures.reserve(numDonors); @@ -458,8 +461,8 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin } const auto& recipientId = ShardingState::get(serviceContext)->shardId(); - for (const auto& donor : _recipientDoc.getDonorShards()) { - auto oplogBufferNss = getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor); + for (const auto& donor : _donorShardIds) { + auto oplogBufferNss = getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor); auto opCtx = cc().makeOperationContext(); auto idToResumeFrom = resharding::getFetcherIdToResumeFrom(opCtx.get(), oplogBufferNss, fetchTimestamp); @@ -468,8 +471,8 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin stdx::lock_guard<Latch> lk(_mutex); _oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>( std::make_unique<ReshardingOplogFetcher::Env>(getGlobalServiceContext(), _metrics()), - _recipientDoc.get_id(), - _recipientDoc.getExistingUUID(), + _metadata.getReshardingUUID(), + _metadata.getSourceUUID(), // The recipient fetches oplog entries from the donor starting from the largest _id // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. @@ -511,7 +514,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin } void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSteadyState() { - if (_recipientDoc.getState() > RecipientStateEnum::kApplying) { + if (_recipientCtx.getState() > RecipientStateEnum::kApplying) { return; } @@ -532,31 +535,29 @@ void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSt ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - if (_recipientDoc.getState() > RecipientStateEnum::kSteadyState) { + if (_recipientCtx.getState() > RecipientStateEnum::kSteadyState) { return ExecutorFuture<void>(**executor, Status::OK()); } - auto numDonors = _recipientDoc.getDonorShards().size(); + auto numDonors = _donorShardIds.size(); _oplogAppliers.reserve(numDonors); _oplogApplierWorkers.reserve(numDonors); const auto& sourceChunkMgr = [&] { auto opCtx = cc().makeOperationContext(); auto catalogCache = Grid::get(opCtx.get())->catalogCache(); - return catalogCache->getShardedCollectionRoutingInfo(opCtx.get(), _recipientDoc.getNss()); + return catalogCache->getShardedCollectionRoutingInfo(opCtx.get(), _metadata.getSourceNss()); }(); auto stashCollections = [&] { auto opCtx = cc().makeOperationContext(); - return resharding::ensureStashCollectionsExist(opCtx.get(), - sourceChunkMgr, - _recipientDoc.getExistingUUID(), - _recipientDoc.getDonorShards()); + return resharding::ensureStashCollectionsExist( + opCtx.get(), sourceChunkMgr, _metadata.getSourceUUID(), _donorShardIds); }(); auto futuresToWaitOn = std::move(_oplogFetcherFutures); - for (size_t donorIdx = 0; donorIdx < _recipientDoc.getDonorShards().size(); ++donorIdx) { - const auto& donor = _recipientDoc.getDonorShards()[donorIdx]; + for (size_t donorIdx = 0; donorIdx < _donorShardIds.size(); ++donorIdx) { + const auto& donor = _donorShardIds[donorIdx]; { stdx::lock_guard<Latch> lk(_mutex); _oplogApplierWorkers.emplace_back( @@ -565,10 +566,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: true /* isKillableByStepdown */)); } - auto sourceId = ReshardingSourceId{_recipientDoc.get_id(), donor}; - const auto& oplogBufferNss = - getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor); - auto fetchTimestamp = *_recipientDoc.getFetchTimestamp(); + auto sourceId = ReshardingSourceId{_metadata.getReshardingUUID(), donor}; + const auto& oplogBufferNss = getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor); + auto fetchTimestamp = *_fetchTimestamp; auto idToResumeFrom = [&] { auto opCtx = cc().makeOperationContext(); return resharding::getApplierIdToResumeFrom(opCtx.get(), sourceId, fetchTimestamp); @@ -580,8 +580,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _metrics()), std::move(sourceId), oplogBufferNss, - _recipientDoc.getNss(), - _recipientDoc.getExistingUUID(), + _metadata.getSourceNss(), + _metadata.getSourceUUID(), stashCollections, donorIdx, fetchTimestamp, @@ -606,7 +606,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: return whenAllSucceed(std::move(futuresToWaitOn)) .thenRunOn(**executor) - .then([this, stashCollections] { + .then([stashCollections] { auto opCtxRaii = cc().makeOperationContext(); for (auto&& stashNss : stashCollections) { @@ -619,7 +619,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: .then([this] { _transitionState(RecipientStateEnum::kStrictConsistency); - bool isDonor = [& id = _recipientDoc.get_id()] { + bool isDonor = [& id = _metadata.getReshardingUUID()] { auto opCtx = cc().makeOperationContext(); auto instance = resharding::tryGetReshardingStateMachine< ReshardingDonorService, @@ -630,7 +630,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: }(); if (!isDonor) { - _critSec.emplace(cc().getServiceContext(), _recipientDoc.getNss()); + _critSec.emplace(cc().getServiceContext(), _metadata.getSourceNss()); } _updateCoordinator(); @@ -640,7 +640,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - if (_recipientDoc.getState() > RecipientStateEnum::kStrictConsistency) { + if (_recipientCtx.getState() > RecipientStateEnum::kStrictConsistency) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -650,20 +650,17 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: } void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardingCollection() { - if (_recipientDoc.getState() > RecipientStateEnum::kRenaming) { + if (_recipientCtx.getState() > RecipientStateEnum::kRenaming) { return; } { auto opCtx = cc().makeOperationContext(); - auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), - _recipientDoc.getExistingUUID()); - RenameCollectionOptions options; options.dropTarget = true; - uassertStatusOK( - renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options)); + uassertStatusOK(renameCollection( + opCtx.get(), _metadata.getTempReshardingNss(), _metadata.getSourceNss(), options)); _dropOplogCollections(opCtx.get()); @@ -675,30 +672,47 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi } void ReshardingRecipientService::RecipientStateMachine::_transitionState( - RecipientStateEnum endState, - boost::optional<Timestamp> fetchTimestamp, - boost::optional<Status> abortReason) { - invariant(endState != RecipientStateEnum::kAwaitingFetchTimestamp); - ReshardingRecipientDocument replacementDoc(_recipientDoc); - replacementDoc.setState(endState); + RecipientStateEnum newState) { + invariant(newState != RecipientStateEnum::kCreatingCollection && + newState != RecipientStateEnum::kError); + + auto newRecipientCtx = _recipientCtx; + newRecipientCtx.setState(newState); + _transitionState(std::move(newRecipientCtx), boost::none); +} - emplaceFetchTimestampIfExists(replacementDoc, std::move(fetchTimestamp)); - emplaceAbortReasonIfExists(replacementDoc, std::move(abortReason)); +void ReshardingRecipientService::RecipientStateMachine::_transitionState( + RecipientShardContext&& newRecipientCtx, boost::optional<Timestamp>&& fetchTimestamp) { + invariant(newRecipientCtx.getState() != RecipientStateEnum::kAwaitingFetchTimestamp); // For logging purposes. - auto oldState = _recipientDoc.getState(); - auto newState = replacementDoc.getState(); + auto oldState = _recipientCtx.getState(); + auto newState = newRecipientCtx.getState(); - _updateRecipientDocument(std::move(replacementDoc)); - _metrics()->setRecipientState(endState); + _updateRecipientDocument(std::move(newRecipientCtx), std::move(fetchTimestamp)); + _metrics()->setRecipientState(newState); LOGV2_INFO(5279506, "Transitioned resharding recipient state", "newState"_attr = RecipientState_serializer(newState), "oldState"_attr = RecipientState_serializer(oldState), - "ns"_attr = _recipientDoc.getNss(), - "collectionUUID"_attr = _recipientDoc.getExistingUUID(), - "reshardingUUID"_attr = _recipientDoc.get_id()); + "namespace"_attr = _metadata.getSourceNss(), + "collectionUUID"_attr = _metadata.getSourceUUID(), + "reshardingUUID"_attr = _metadata.getReshardingUUID()); +} + +void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCollection( + Timestamp fetchTimestamp) { + auto newRecipientCtx = _recipientCtx; + newRecipientCtx.setState(RecipientStateEnum::kCreatingCollection); + _transitionState(std::move(newRecipientCtx), fetchTimestamp); +} + +void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) { + auto newRecipientCtx = _recipientCtx; + newRecipientCtx.setState(RecipientStateEnum::kError); + emplaceAbortReasonIfExists(newRecipientCtx, abortReason); + _transitionState(std::move(newRecipientCtx), boost::none); } void ReshardingRecipientService::RecipientStateMachine::_updateCoordinator() { @@ -707,21 +721,14 @@ void ReshardingRecipientService::RecipientStateMachine::_updateCoordinator() { auto shardId = ShardingState::get(opCtx.get())->shardId(); - BSONObjBuilder updateBuilder; - updateBuilder.append("recipientShards.$.state", - RecipientState_serializer(_recipientDoc.getState())); - if (_recipientDoc.getAbortReason()) { - updateBuilder.append("recipientShards.$.abortReason", _recipientDoc.getAbortReason().get()); - } - uassertStatusOK( Grid::get(opCtx.get()) ->catalogClient() ->updateConfigDocument( opCtx.get(), NamespaceString::kConfigReshardingOperationsNamespace, - BSON("_id" << _recipientDoc.get_id() << "recipientShards.id" << shardId), - BSON("$set" << updateBuilder.done()), + BSON("_id" << _metadata.getReshardingUUID() << "recipientShards.id" << shardId), + BSON("$set" << BSON("recipientShards.$.mutableState" << _recipientCtx.toBSON())), false /* upsert */, ShardingCatalogClient::kMajorityWriteConcern)); } @@ -734,16 +741,34 @@ void ReshardingRecipientService::RecipientStateMachine::insertStateDocument( } void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument( - ReshardingRecipientDocument&& replacementDoc) { + RecipientShardContext&& newRecipientCtx, boost::optional<Timestamp>&& fetchTimestamp) { auto opCtx = cc().makeOperationContext(); PersistentTaskStore<ReshardingRecipientDocument> store( NamespaceString::kRecipientReshardingOperationsNamespace); + + BSONObjBuilder updateBuilder; + { + BSONObjBuilder setBuilder(updateBuilder.subobjStart("$set")); + setBuilder.append(ReshardingRecipientDocument::kMutableStateFieldName, + newRecipientCtx.toBSON()); + + if (fetchTimestamp) { + setBuilder.append(ReshardingRecipientDocument::kFetchTimestampFieldName, + *fetchTimestamp); + } + } + store.update(opCtx.get(), - BSON(ReshardingRecipientDocument::k_idFieldName << _id), - replacementDoc.toBSON(), + BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName + << _metadata.getReshardingUUID()), + updateBuilder.done(), WriteConcerns::kMajorityWriteConcern); - _recipientDoc = replacementDoc; + _recipientCtx = newRecipientCtx; + + if (fetchTimestamp) { + _fetchTimestamp = fetchTimestamp; + } } void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument() { @@ -751,15 +776,15 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument PersistentTaskStore<ReshardingRecipientDocument> store( NamespaceString::kRecipientReshardingOperationsNamespace); store.remove(opCtx.get(), - BSON(ReshardingRecipientDocument::k_idFieldName << _id), + BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName + << _metadata.getReshardingUUID()), WriteConcerns::kMajorityWriteConcern); - _recipientDoc = {}; } void ReshardingRecipientService::RecipientStateMachine::_dropOplogCollections( OperationContext* opCtx) { - for (const auto& donor : _recipientDoc.getDonorShards()) { - auto reshardingSourceId = ReshardingSourceId{_recipientDoc.get_id(), donor}; + for (const auto& donor : _donorShardIds) { + auto reshardingSourceId = ReshardingSourceId{_metadata.getReshardingUUID(), donor}; // Remove the oplog applier progress doc for this donor. PersistentTaskStore<ReshardingOplogApplierProgress> oplogApplierProgressStore( @@ -779,11 +804,11 @@ void ReshardingRecipientService::RecipientStateMachine::_dropOplogCollections( WriteConcernOptions()); // Drop the conflict stash collection for this donor. - auto stashNss = getLocalConflictStashNamespace(_recipientDoc.getExistingUUID(), donor); + auto stashNss = getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor); resharding::data_copy::ensureCollectionDropped(opCtx, stashNss); // Drop the oplog buffer collection for this donor. - auto oplogBufferNss = getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor); + auto oplogBufferNss = getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor); resharding::data_copy::ensureCollectionDropped(opCtx, oplogBufferNss); } } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 59e41df7b63..a5fab606b8b 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -133,6 +133,8 @@ public: const ReshardingRecipientDocument& recipientDoc); private: + RecipientStateMachine(const ReshardingRecipientDocument& recipientDoc); + // The following functions correspond to the actions to take at a particular recipient state. ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( const std::shared_ptr<executor::ScopedTaskExecutor>& executor); @@ -153,17 +155,26 @@ private: void _renameTemporaryReshardingCollection(); - // Transitions the state on-disk and in-memory to 'endState'. - void _transitionState(RecipientStateEnum endState, - boost::optional<Timestamp> fetchTimestamp = boost::none, - boost::optional<Status> abortReason = boost::none); + // Transitions the on-disk and in-memory state to 'newState'. + void _transitionState(RecipientStateEnum newState); + + void _transitionState(RecipientShardContext&& newRecipientCtx, + boost::optional<Timestamp>&& fetchTimestamp); + + // Transitions the on-disk and in-memory state to RecipientStateEnum::kCreatingCollection. + void _transitionToCreatingCollection(Timestamp fetchTimestamp); + + // Transitions the on-disk and in-memory state to RecipientStateEnum::kError. + void _transitionToError(Status abortReason); void _updateCoordinator(); - // Updates the recipient document on-disk and in-memory with the 'replacementDoc.' - void _updateRecipientDocument(ReshardingRecipientDocument&& replacementDoc); + // Updates the mutable portion of the on-disk and in-memory recipient document with + // 'newRecipientCtx' and 'fetchTimestamp'. + void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx, + boost::optional<Timestamp>&& fetchTimestamp); - // Removes the local recipient document from disk and clears the in-memory state. + // Removes the local recipient document from disk. void _removeRecipientDocument(); // Removes any docs from the oplog applier progress and txn applier progress collections that @@ -180,12 +191,15 @@ private: // (abort resharding). void _onAbortOrStepdown(WithLock, Status status); - // The in-memory representation of the underlying document in + // The in-memory representation of the immutable portion of the document in // config.localReshardingOperations.recipient. - ReshardingRecipientDocument _recipientDoc; + const CommonReshardingMetadata _metadata; + const std::vector<ShardId> _donorShardIds; - // The id both for the resharding operation and for the primary-only-service instance. - const UUID _id; + // The in-memory representation of the mutable portion of the document in + // config.localReshardingOperations.recipient. + RecipientShardContext _recipientCtx; + boost::optional<Timestamp> _fetchTimestamp; std::unique_ptr<ReshardingCollectionCloner> _collectionCloner; std::vector<std::unique_ptr<ReshardingTxnCloner>> _txnCloners; @@ -200,7 +214,7 @@ private: std::vector<ExecutorFuture<void>> _oplogFetcherFutures; // Protects the promises below - Mutex _mutex = MONGO_MAKE_LATCH("ReshardingRecipient::_mutex"); + Mutex _mutex = MONGO_MAKE_LATCH("RecipientStateMachine::_mutex"); boost::optional<ReshardingCriticalSection> _critSec; diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 178e71158cc..5d73cd21ddd 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -147,10 +147,10 @@ public: coll.setDefaultCollation(collation); TypeCollectionReshardingFields reshardingFields; - reshardingFields.setUuid(uuid); + reshardingFields.setReshardingUUID(uuid); TypeCollectionRecipientFields recipientFields; - recipientFields.setOriginalNamespace(origNss); - recipientFields.setExistingUUID(uuid); + recipientFields.setSourceNss(origNss); + recipientFields.setSourceUUID(uuid); // Populating the set of donor shard ids isn't necessary to test the functionality of // creating the temporary resharding collection. recipientFields.setDonorShardIds({}); diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl b/src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl index 9c546b6a597..640dda5f651 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl +++ b/src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl @@ -41,6 +41,10 @@ imports: structs: ReshardingTxnClonerProgress: description: "Used for storing the progress made by the resharding transaction cloner." + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: _id: type: ReshardingSourceId @@ -51,4 +55,3 @@ structs: description: >- The minimum point where the resharding transaction cloner can start without missing any config.transactions entries it needs to process. - diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp index d625242aa0d..7d932c4eb0e 100644 --- a/src/mongo/db/s/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -200,8 +200,8 @@ protected: DatabaseType db(kNss.db().toString(), kShardList[0].getName(), true, env.dbVersion); TypeCollectionReshardingFields reshardingFields; - reshardingFields.setUuid(UUID::gen()); - reshardingFields.setDonorFields(TypeCollectionDonorFields{BSON("y" << 1)}); + reshardingFields.setReshardingUUID(UUID::gen()); + reshardingFields.setDonorFields(TypeCollectionDonorFields{env.tempNss, BSON("y" << 1)}); reshardingFields.setState(CoordinatorStateEnum::kPreparingToDonate); CollectionType coll(kNss, env.version.epoch(), Date_t::now(), UUID::gen()); diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index c383bf5beab..1aeb6e36014 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -90,22 +90,22 @@ DonorShardEntry makeDonorShard(ShardId shardId, DonorStateEnum donorState, boost::optional<Timestamp> minFetchTimestamp, boost::optional<Status> abortReason) { - DonorShardEntry entry(shardId); - entry.setState(donorState); - emplaceMinFetchTimestampIfExists(entry, minFetchTimestamp); - emplaceAbortReasonIfExists(entry, abortReason); - return entry; + DonorShardContext donorCtx; + donorCtx.setState(donorState); + emplaceMinFetchTimestampIfExists(donorCtx, minFetchTimestamp); + emplaceAbortReasonIfExists(donorCtx, abortReason); + + return DonorShardEntry{std::move(shardId), std::move(donorCtx)}; } RecipientShardEntry makeRecipientShard(ShardId shardId, RecipientStateEnum recipientState, - boost::optional<Timestamp> strictConsistencyTimestamp, boost::optional<Status> abortReason) { - RecipientShardEntry entry(shardId); - entry.setState(recipientState); - emplaceStrictConsistencyTimestampIfExists(entry, strictConsistencyTimestamp); - emplaceAbortReasonIfExists(entry, abortReason); - return entry; + RecipientShardContext recipientCtx; + recipientCtx.setState(recipientState); + emplaceAbortReasonIfExists(recipientCtx, abortReason); + + return RecipientShardEntry{std::move(shardId), std::move(recipientCtx)}; } UUID getCollectionUUIDFromChunkManger(const NamespaceString& originalNss, const ChunkManager& cm) { @@ -185,7 +185,7 @@ Timestamp getHighestMinFetchTimestamp(const std::vector<DonorShardEntry>& donorS auto maxMinFetchTimestamp = Timestamp::min(); for (auto& donor : donorShards) { - auto donorFetchTimestamp = donor.getMinFetchTimestamp(); + auto donorFetchTimestamp = donor.getMutableState().getMinFetchTimestamp(); uassert(4957300, "All donors must have a minFetchTimestamp, but donor {} does not."_format( donor.getId()), diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 007202ca4ab..f9f0109d05f 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -91,32 +91,7 @@ void emplaceMinFetchTimestampIfExists(ClassWithMinFetchTimestamp& c, invariant(minFetchTimestamp == alreadyExistingMinFetchTimestamp); } - MinFetchTimestamp minFetchTimestampStruct; - minFetchTimestampStruct.setMinFetchTimestamp(std::move(minFetchTimestamp)); - c.setMinFetchTimestampStruct(std::move(minFetchTimestampStruct)); -} - -/** - * Emplaces the 'strictConsistencyTimestamp' onto the ClassWithStrictConsistencyTimestamp if the - * timestamp has been emplaced inside the boost::optional. - */ -template <class ClassWithStrictConsistencyTimestamp> -void emplaceStrictConsistencyTimestampIfExists( - ClassWithStrictConsistencyTimestamp& c, boost::optional<Timestamp> strictConsistencyTimestamp) { - if (!strictConsistencyTimestamp) { - return; - } - - invariant(!strictConsistencyTimestamp->isNull()); - - if (auto alreadyExistingStrictConsistencyTimestamp = c.getStrictConsistencyTimestamp()) { - invariant(strictConsistencyTimestamp == alreadyExistingStrictConsistencyTimestamp); - } - - StrictConsistencyTimestamp strictConsistencyTimestampStruct; - strictConsistencyTimestampStruct.setStrictConsistencyTimestamp( - std::move(strictConsistencyTimestamp)); - c.setStrictConsistencyTimestampStruct(std::move(strictConsistencyTimestampStruct)); + c.setMinFetchTimestamp(std::move(minFetchTimestamp)); } /** @@ -173,11 +148,9 @@ DonorShardEntry makeDonorShard(ShardId shardId, /** * Helper method to construct a RecipientShardEntry with the fields specified. */ -RecipientShardEntry makeRecipientShard( - ShardId shardId, - RecipientStateEnum recipientState, - boost::optional<Timestamp> strictConsistencyTimestamp = boost::none, - boost::optional<Status> abortReason = boost::none); +RecipientShardEntry makeRecipientShard(ShardId shardId, + RecipientStateEnum recipientState, + boost::optional<Status> abortReason = boost::none); /** * Gets the UUID for 'nss' from the 'cm' diff --git a/src/mongo/db/s/type_shard_collection_test.cpp b/src/mongo/db/s/type_shard_collection_test.cpp index aa6fefd5214..3a62e513af3 100644 --- a/src/mongo/db/s/type_shard_collection_test.cpp +++ b/src/mongo/db/s/type_shard_collection_test.cpp @@ -97,8 +97,8 @@ TEST(ShardCollectionType, ReshardingFieldsIncluded) { ShardCollectionType shardCollType(kNss, OID::gen(), UUID::gen(), kKeyPattern, true); TypeCollectionReshardingFields reshardingFields; - const auto reshardingUuid = UUID::gen(); - reshardingFields.setUuid(reshardingUuid); + const auto reshardingUUID = UUID::gen(); + reshardingFields.setReshardingUUID(reshardingUUID); shardCollType.setReshardingFields(std::move(reshardingFields)); BSONObj obj = shardCollType.toBSON(); @@ -106,7 +106,7 @@ TEST(ShardCollectionType, ReshardingFieldsIncluded) { ShardCollectionType shardCollTypeFromBSON(obj); ASSERT(shardCollType.getReshardingFields()); - ASSERT_EQ(reshardingUuid, shardCollType.getReshardingFields()->getUuid()); + ASSERT_EQ(reshardingUUID, shardCollType.getReshardingFields()->getReshardingUUID()); } TEST(ShardCollectionType, AllowMigrationsFieldBackwardsCompatibility) { diff --git a/src/mongo/s/catalog/type_collection_test.cpp b/src/mongo/s/catalog/type_collection_test.cpp index 49d74cb8161..e56ee3d5df4 100644 --- a/src/mongo/s/catalog/type_collection_test.cpp +++ b/src/mongo/s/catalog/type_collection_test.cpp @@ -72,7 +72,7 @@ TEST(CollectionType, AllFieldsPresent) { const auto reshardingUuid = UUID::gen(); ReshardingFields reshardingFields; - reshardingFields.setUuid(reshardingUuid); + reshardingFields.setReshardingUUID(reshardingUuid); CollectionType coll(BSON(CollectionType::kNssFieldName << "db.coll" << CollectionType::kEpochFieldName << oid @@ -99,7 +99,7 @@ TEST(CollectionType, AllFieldsPresent) { ASSERT_EQUALS(coll.getDropped(), false); ASSERT_EQUALS(coll.getUuid(), uuid); ASSERT(coll.getReshardingFields()->getState() == CoordinatorStateEnum::kUnused); - ASSERT(coll.getReshardingFields()->getUuid() == reshardingUuid); + ASSERT(coll.getReshardingFields()->getReshardingUUID() == reshardingUuid); } TEST(CollectionType, MissingDefaultCollationParses) { diff --git a/src/mongo/s/catalog_cache_refresh_test.cpp b/src/mongo/s/catalog_cache_refresh_test.cpp index d650c22b865..28b074ed87a 100644 --- a/src/mongo/s/catalog_cache_refresh_test.cpp +++ b/src/mongo/s/catalog_cache_refresh_test.cpp @@ -79,7 +79,7 @@ protected: auto collType = getDefaultCollectionType(epoch, shardKeyPattern); TypeCollectionReshardingFields reshardingFields; - reshardingFields.setUuid(reshardingUUID); + reshardingFields.setReshardingUUID(reshardingUUID); collType.setReshardingFields(std::move(reshardingFields)); std::vector<BSONObj> aggResult; @@ -137,7 +137,7 @@ TEST_F(CatalogCacheRefreshTest, FullLoad) { auto cm = *future.default_timed_get(); ASSERT(cm.isSharded()); ASSERT_EQ(4, cm.numChunks()); - ASSERT_EQ(reshardingUUID, cm.getReshardingFields()->getUuid()); + ASSERT_EQ(reshardingUUID, cm.getReshardingFields()->getReshardingUUID()); } TEST_F(CatalogCacheRefreshTest, NoLoadIfShardNotMarkedStaleInOperationContext) { @@ -863,7 +863,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveWithReshardingFieldsAdde auto cm = *future.default_timed_get(); ASSERT(cm.isSharded()); ASSERT_EQ(2, cm.numChunks()); - ASSERT_EQ(reshardingUUID, cm.getReshardingFields()->getUuid()); + ASSERT_EQ(reshardingUUID, cm.getReshardingFields()->getReshardingUUID()); ASSERT_EQ(version, cm.getVersion()); ASSERT_EQ(version, cm.getVersion({"0"})); ASSERT_EQ(expectedDestShardVersion, cm.getVersion({"1"})); @@ -874,13 +874,13 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunkWithReshardingF const UUID reshardingUUID = UUID::gen(); TypeCollectionReshardingFields reshardingFields; - reshardingFields.setUuid(reshardingUUID); + reshardingFields.setReshardingUUID(reshardingUUID); auto initialRoutingInfo( makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}, reshardingFields)); ASSERT_EQ(1, initialRoutingInfo.numChunks()); - ASSERT_EQ(reshardingUUID, initialRoutingInfo.getReshardingFields()->getUuid()); + ASSERT_EQ(reshardingUUID, initialRoutingInfo.getReshardingFields()->getReshardingUUID()); setupNShards(2); diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl index 8746e9c8310..bfbcb63d3a3 100644 --- a/src/mongo/s/resharding/common_types.idl +++ b/src/mongo/s/resharding/common_types.idl @@ -92,36 +92,103 @@ enums: structs: CommonReshardingMetadata: - description: "Metadata shared across all node types for a resharding operation." + description: "Immutable metadata shared across all node types for a resharding operation." generate_comparison_operators: false - strict: true + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: _id: + cpp_name: reshardingUUID type: uuid description: "A unique identifier for the resharding operation." - nss: + ns: + cpp_name: sourceNss type: namespacestring description: "The namespace of the collection being resharded." - existingUUID: + ui: + cpp_name: sourceUUID type: uuid description: "The UUID of the original collection being resharded." + tempNs: + cpp_name: tempReshardingNss + type: namespacestring + description: "The namespace of the temporary resharding collection that exists on + recipient shards." reshardingKey: type: KeyPattern description: "The index specification document to use as the new shard key." - MinFetchTimestamp: + AbortReason: description: "Not meant to be used directly. Only use internal fields." - strict: true + # Use strict:false to avoid complications around upgrade/downgrade. This setting has no + # practical effect because this type is only meant to be used as a chained struct with + # inline_chained_structs:true and only the strictness setting of the wrapping struct type + # applies. + strict: false + fields: + abortReason: + type: object_owned + description: "The error that caused the node to abort the resharding operation. In + this case, the node may be a donor shard, recipient shard, or the + coordinator itself." + optional: true + + DonorShardContext: + description: "Mutable state for a donor shard tracked both locally by a donor shard and + remotely by the resharding coordinator." + inline_chained_structs: true + chained_structs: + AbortReason: AbortReasonStruct + generate_comparison_operators: false + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: + state: + type: DonorState + default: kUnused minFetchTimestamp: type: timestamp - description: "Timestamp after which a donor shard has guaranteed that oplog + description: "Timestamp after which this donor shard has guaranteed that oplog entries contain recipient shard information." optional: true + bytesToClone: + type: long + description: "Number of bytes on this donor shard in the collection being + resharded." + optional: true + documentsToClone: + type: long + description: "Number of documents on this donor shard in the collection being + resharded." + optional: true + + RecipientShardContext: + description: "Mutable state for a recipient shard tracked both locally by a recipient shard + and remotely by the resharding coordinator." + inline_chained_structs: true + chained_structs: + AbortReason: AbortReasonStruct + generate_comparison_operators: false + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false + fields: + state: + type: RecipientState + default: kUnused FetchTimestamp: description: "Not meant to be used directly. Only use internal fields." - strict: true + # Use strict:false to avoid complications around upgrade/downgrade. This setting has no + # practical effect because this type is only meant to be used as a chained struct with + # inline_chained_structs:true and only the strictness setting of the wrapping struct type + # applies. + strict: false fields: fetchTimestamp: type: timestamp @@ -129,39 +196,13 @@ structs: retrieve documents from donor shards." optional: true - StrictConsistencyTimestamp: - description: "Not meant to be used directly. Only use internal fields." - strict: true - fields: - strictConsistencyTimestamp: - type: timestamp - description: "Timestamp after which all writes to a particular recipient shard for - the original resharding collection will be executed as distributed - transactions." - optional: true - - AbortReason: - description: "Not meant to be used directly. Only use internal fields." - fields: - abortReason: - type: object_owned - description: "The error that caused the node to abort the resharding operation. In - this case, the node may be a donor shard, recipient shard, or the - coordinator itself." - optional: true - - ReshardingCloneSize: - description: "Not meant to be used directly. Only use internal fields." - fields: - bytesToClone: - type: long - optional: true - documentsToClone: - type: long - optional: true - ReshardingApproxCopySize: description: "Not meant to be used directly. Only use internal fields." + # Use strict:false to avoid complications around upgrade/downgrade. This setting has no + # practical effect because this type is only meant to be used as a chained struct with + # inline_chained_structs:true and only the strictness setting of the wrapping struct type + # applies. + strict: false fields: approxDocumentsToCopy: type: long @@ -172,6 +213,9 @@ structs: ReshardingSourceId: description: "Identifier for a shard involved in a particular resharding operation." + # Use strict:true because this type is used as the structure for the _id value in documents + # and requires being an exact match. + strict: true fields: reshardingUUID: type: uuid diff --git a/src/mongo/s/resharding/type_collection_fields.idl b/src/mongo/s/resharding/type_collection_fields.idl index 274e60f3f19..cae01ed0106 100644 --- a/src/mongo/s/resharding/type_collection_fields.idl +++ b/src/mongo/s/resharding/type_collection_fields.idl @@ -38,7 +38,16 @@ imports: structs: TypeCollectionDonorFields: description: "Resharding-related fields specific to donor shards." + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: + tempNs: + cpp_name: tempReshardingNss + type: namespacestring + description: "The namespace of the temporary resharding collection that exists on + recipient shards." reshardingKey: type: KeyPattern @@ -47,23 +56,37 @@ structs: inline_chained_structs: true chained_structs: FetchTimestamp: FetchTimestampStruct + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: donorShardIds: type: array<shard_id> - existingUUID: + sourceUUID: + cpp_name: sourceUUID type: uuid - originalNamespace: + description: "The UUID of the original collection being resharded." + sourceNs: + cpp_name: sourceNss type: namespacestring + description: "The namespace of the collection being resharded." TypeCollectionReshardingFields: description: "Resharding-related fields meant to be stored in a config.collections document." inline_chained_structs: true chained_structs: - AbortReason: AbortReasonStruct + AbortReason: AbortReasonStruct + # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically + # required for resharding because durable state from all resharding operations is cleaned up + # before the upgrade or downgrade can complete. + strict: false fields: uuid: + cpp_name: reshardingUUID type: uuid + description: "A unique identifier for the resharding operation." state: type: CoordinatorState default: kUnused |