summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-03-08 22:37:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-09 00:03:51 +0000
commitdb2e46857ce1c2bf90fda9842287cc5940fce81c (patch)
tree28d1ddbd9376da377ff37cdc466a51b9550d577e /src/mongo/db
parent421ddfa11b6eb4b38f676a0c4da3560fcf63713d (diff)
downloadmongo-db2e46857ce1c2bf90fda9842287cc5940fce81c.tar.gz
SERVER-54980 Split donor and recipent documents into im/mutable state.
Removes the unused strictConsistencyTimestamp field from the recipient document. Also explicitly sets strict:true and strict:false for all of resharding's IDL types.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/collection_metadata.cpp4
-rw-r--r--src/mongo/db/s/collection_metadata_test.cpp5
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp10
-rw-r--r--src/mongo/db/s/resharding/coordinator_document.idl47
-rw-r--r--src/mongo/db/s/resharding/donor_document.idl15
-rw-r--r--src/mongo/db/s/resharding/donor_oplog_id.idl3
-rw-r--r--src/mongo/db/s/resharding/recipient_document.idl15
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.cpp11
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp22
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp132
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp69
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp45
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h23
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp260
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h41
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp15
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl5
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp231
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h38
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl5
-rw-r--r--src/mongo/db/s/resharding_destined_recipient_test.cpp4
-rw-r--r--src/mongo/db/s/resharding_util.cpp24
-rw-r--r--src/mongo/db/s/resharding_util.h35
-rw-r--r--src/mongo/db/s/type_shard_collection_test.cpp6
28 files changed, 572 insertions, 512 deletions
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp
index 1590131e02a..86744c1594f 100644
--- a/src/mongo/db/s/collection_metadata.cpp
+++ b/src/mongo/db/s/collection_metadata.cpp
@@ -134,8 +134,8 @@ bool CollectionMetadata::disallowWritesForResharding(const UUID& currentCollecti
"decision persisted",
recipientFields);
- const auto& originalUUID = recipientFields->getExistingUUID();
- const auto& reshardingUUID = reshardingFields->getUuid();
+ const auto& originalUUID = recipientFields->getSourceUUID();
+ const auto& reshardingUUID = reshardingFields->getReshardingUUID();
if (currentCollectionUUID == originalUUID) {
// This shard must be both a donor and recipient. Neither the drop or renameCollection have
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp
index 4152bf1241b..d3b585f9d90 100644
--- a/src/mongo/db/s/collection_metadata_test.cpp
+++ b/src/mongo/db/s/collection_metadata_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/base/status.h"
#include "mongo/db/range_arithmetic.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/resharding_util.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/sharding_test_fixture_common.h"
@@ -120,7 +121,9 @@ protected:
{kThisShard, kOtherShard}, existingUuid, kNss};
reshardingFields.setRecipientFields(std::move(recipientFields));
} else if (state == CoordinatorStateEnum::kBlockingWrites) {
- TypeCollectionDonorFields donorFields{KeyPattern{BSON("newKey" << 1)}};
+ TypeCollectionDonorFields donorFields{
+ constructTemporaryReshardingNss(kNss.db(), existingUuid),
+ KeyPattern{BSON("newKey" << 1)}};
reshardingFields.setDonorFields(std::move(donorFields));
}
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index ea757e7666f..49b57f258b4 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -119,16 +119,18 @@ public:
nss.db(), getCollectionUUIDFromChunkManger(nss, cm));
auto coordinatorDoc =
- ReshardingCoordinatorDocument(std::move(tempReshardingNss),
- std::move(CoordinatorStateEnum::kUnused),
+ ReshardingCoordinatorDocument(std::move(CoordinatorStateEnum::kUnused),
{}, // donorShards
{}); // recipientShards
// Generate the resharding metadata for the ReshardingCoordinatorDocument.
auto reshardingUUID = UUID::gen();
auto existingUUID = getCollectionUUIDFromChunkManger(ns(), cm);
- auto commonMetadata = CommonReshardingMetadata(
- std::move(reshardingUUID), ns(), std::move(existingUUID), request().getKey());
+ auto commonMetadata = CommonReshardingMetadata(std::move(reshardingUUID),
+ ns(),
+ std::move(existingUUID),
+ std::move(tempReshardingNss),
+ request().getKey());
coordinatorDoc.setCommonReshardingMetadata(std::move(commonMetadata));
coordinatorDoc.setZones(request().getZones());
coordinatorDoc.setPresetReshardedChunks(request().get_presetReshardedChunks());
diff --git a/src/mongo/db/s/resharding/coordinator_document.idl b/src/mongo/db/s/resharding/coordinator_document.idl
index b3110f0a391..f52169f9aea 100644
--- a/src/mongo/db/s/resharding/coordinator_document.idl
+++ b/src/mongo/db/s/resharding/coordinator_document.idl
@@ -39,35 +39,30 @@ structs:
DonorShardEntry:
description: "Represents a donor shard for a particular resharding operation on the
coordinator."
- inline_chained_structs: true
- chained_structs:
- MinFetchTimestamp: MinFetchTimestampStruct
- AbortReason: AbortReasonStruct
generate_comparison_operators: false
- strict: true
+ # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
+ # required for resharding because durable state from all resharding operations is cleaned up
+ # before the upgrade or downgrade can complete.
+ strict: false
fields:
id: shard_id
- state:
- type: DonorState
- optional: true
- cloneSizeInfo:
- type: ReshardingCloneSize
- optional: true
+ # We intentionally have the mutable state nested in a subobject to make it easy to
+ # overwrite with a single $set.
+ mutableState: DonorShardContext
RecipientShardEntry:
description: "Represents a recipient shard for a particular resharding operation on the
coordinator."
- inline_chained_structs: true
- chained_structs:
- StrictConsistencyTimestamp: StrictConsistencyTimestampStruct
- AbortReason: AbortReasonStruct
generate_comparison_operators: false
- strict: true
+ # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
+ # required for resharding because durable state from all resharding operations is cleaned up
+ # before the upgrade or downgrade can complete.
+ strict: false
fields:
id: shard_id
- state:
- type: RecipientState
- optional: true
+ # We intentionally have the mutable state nested in a subobject to make it easy to
+ # overwrite with a single $set.
+ mutableState: RecipientShardContext
ReshardingCoordinatorDocument:
description: "Represents a resharding operation on the coordinator."
@@ -75,14 +70,14 @@ structs:
chained_structs:
CommonReshardingMetadata: CommonReshardingMetadata
FetchTimestamp: FetchTimestampStruct
+ ReshardingApproxCopySize: ReshardingApproxCopySizeStruct
AbortReason: AbortReasonStruct
generate_comparison_operators: false
- strict: true
+ # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
+ # required for resharding because durable state from all resharding operations is cleaned up
+ # before the upgrade or downgrade can complete.
+ strict: false
fields:
- tempReshardingNss:
- type: namespacestring
- description: "The namespace of the temporary resharding collection that exists on
- recipient shards."
state: CoordinatorState
donorShards:
type: array<DonorShardEntry>
@@ -94,10 +89,6 @@ structs:
type: array<object_owned>
description: "The zones associated with the new shard key."
optional: true
- approxCopySize:
- type: ReshardingApproxCopySize
- description: "The approximate data that each recipient clones for this operation."
- optional: true
presetReshardedChunks:
type: array<object_owned>
description: >-
diff --git a/src/mongo/db/s/resharding/donor_document.idl b/src/mongo/db/s/resharding/donor_document.idl
index edb2269a2b8..c282f93874c 100644
--- a/src/mongo/db/s/resharding/donor_document.idl
+++ b/src/mongo/db/s/resharding/donor_document.idl
@@ -26,8 +26,8 @@
# it in the license file.
#
-# This file defines the format of documents stored in config.localReshardingOperations on the donor
-# shard for a resharding operation.
+# This file defines the format of documents stored in config.localReshardingOperations.donor on the
+# donor shard for a resharding operation.
global:
cpp_namespace: "mongo"
@@ -41,9 +41,12 @@ structs:
inline_chained_structs: true
chained_structs:
CommonReshardingMetadata: CommonReshardingMetadata
- MinFetchTimestamp: MinFetchTimestampStruct
- AbortReason: AbortReasonStruct
generate_comparison_operators: false
- strict: true
+ # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
+ # required for resharding because durable state from all resharding operations is cleaned up
+ # before the upgrade or downgrade can complete.
+ strict: false
fields:
- state: DonorState
+ # We intentionally have the mutable state nested in a subobject to make it easy to
+ # overwrite with a single $set.
+ mutableState: DonorShardContext
diff --git a/src/mongo/db/s/resharding/donor_oplog_id.idl b/src/mongo/db/s/resharding/donor_oplog_id.idl
index 6ab3fbc2913..0c4fdeecc7f 100644
--- a/src/mongo/db/s/resharding/donor_oplog_id.idl
+++ b/src/mongo/db/s/resharding/donor_oplog_id.idl
@@ -39,6 +39,9 @@ structs:
description: >-
Represents the set of timestamps that belong to an operation from the donor shard.
generate_comparison_operators: true
+ # Use strict:true because this type is used as the structure for the _id value in documents
+ # and requires being an exact match.
+ strict: true
fields:
clusterTime:
type: timestamp
diff --git a/src/mongo/db/s/resharding/recipient_document.idl b/src/mongo/db/s/resharding/recipient_document.idl
index f9d98bb207e..4c56423307e 100644
--- a/src/mongo/db/s/resharding/recipient_document.idl
+++ b/src/mongo/db/s/resharding/recipient_document.idl
@@ -26,8 +26,8 @@
# it in the license file.
#
-# This file defines the format of documents stored in config.localReshardingOperations on the
-# recipient shard for a resharding operation.
+# This file defines the format of documents stored in config.localReshardingOperations.recipient on
+# the recipient shard for a resharding operation.
global:
cpp_namespace: "mongo"
@@ -42,12 +42,15 @@ structs:
chained_structs:
CommonReshardingMetadata: CommonReshardingMetadata
FetchTimestamp: FetchTimestampStruct
- StrictConsistencyTimestamp: StrictConsistencyTimestampStruct
- AbortReason: AbortReasonStruct
generate_comparison_operators: false
- strict: true
+ # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
+ # required for resharding because durable state from all resharding operations is cleaned up
+ # before the upgrade or downgrade can complete.
+ strict: false
fields:
- state: RecipientState
+ # We intentionally have the mutable state nested in a subobject to make it easy to
+ # overwrite with a single $set.
+ mutableState: RecipientShardContext
donorShards:
type: array<shard_id>
description: "The list of donor shards that report to this recipient."
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
index 7329a5cea7d..7dbc37576ba 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
@@ -67,7 +67,7 @@ bool allParticipantsInStateGTE(WithLock lk,
TState expectedState,
const std::vector<TParticipant>& participants) {
for (const auto& shard : participants) {
- if (shard.getState() < expectedState) {
+ if (shard.getMutableState().getState() < expectedState) {
return false;
}
}
@@ -103,7 +103,7 @@ bool stateTransistionsComplete(WithLock lk,
template <class TParticipant>
Status getStatusFromAbortReasonWithShardInfo(const TParticipant& participant,
StringData participantType) {
- return getStatusFromAbortReason(participant)
+ return getStatusFromAbortReason(participant.getMutableState())
.withContext("{} shard {} reached an unrecoverable error"_format(
participantType, participant.getId().toString()));
}
@@ -123,13 +123,13 @@ boost::optional<Status> getAbortReasonIfExists(
}
for (const auto& donorShard : updatedStateDoc.getDonorShards()) {
- if (donorShard.getState() == DonorStateEnum::kError) {
+ if (donorShard.getMutableState().getState() == DonorStateEnum::kError) {
return getStatusFromAbortReasonWithShardInfo(donorShard, "Donor"_sd);
}
}
for (const auto& recipientShard : updatedStateDoc.getRecipientShards()) {
- if (recipientShard.getState() == RecipientStateEnum::kError) {
+ if (recipientShard.getMutableState().getState() == RecipientStateEnum::kError) {
return getStatusFromAbortReasonWithShardInfo(recipientShard, "Recipient"_sd);
}
}
@@ -142,7 +142,8 @@ bool allParticipantsDoneWithAbortReason(WithLock lk,
TState expectedState,
const std::vector<TParticipant>& participants) {
for (const auto& shard : participants) {
- if (!(shard.getState() == expectedState && shard.getAbortReason().is_initialized())) {
+ if (!(shard.getMutableState().getState() == expectedState &&
+ shard.getMutableState().getAbortReason().is_initialized())) {
return false;
}
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
index 3f6c3c19272..ffb44e7abdd 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
@@ -68,9 +68,9 @@ protected:
RecipientStateEnum recipientState,
boost::optional<Timestamp> timestamp = boost::none,
boost::optional<Status> abortReason = boost::none) {
- return {makeRecipientShard(ShardId{"s1"}, recipientState, timestamp, abortReason),
- makeRecipientShard(ShardId{"s2"}, recipientState, timestamp, abortReason),
- makeRecipientShard(ShardId{"s3"}, recipientState, timestamp, abortReason)};
+ return {makeRecipientShard(ShardId{"s1"}, recipientState, abortReason),
+ makeRecipientShard(ShardId{"s2"}, recipientState, abortReason),
+ makeRecipientShard(ShardId{"s3"}, recipientState, abortReason)};
}
};
@@ -144,7 +144,6 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) {
{makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
{makeRecipientShard(ShardId{"s2"},
RecipientStateEnum::kError,
- boost::none, // timestamp
Status{ErrorCodes::InternalError, "We gotta abort"})},
{makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}};
auto coordinatorDoc = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards);
@@ -169,18 +168,9 @@ TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) {
// All participants have an abortReason, but not all are in state kDone yet.
auto donorShards = makeMockDonorsInState(DonorStateEnum::kDone, Timestamp(1, 1), abortReason);
std::vector<RecipientShardEntry> recipientShards0{
- {makeRecipientShard(ShardId{"s1"},
- RecipientStateEnum::kError,
- boost::none, // timestamp
- abortReason)},
- {makeRecipientShard(ShardId{"s2"},
- RecipientStateEnum::kDone,
- boost::none, // timestamp
- abortReason)},
- {makeRecipientShard(ShardId{"s3"},
- RecipientStateEnum::kDone,
- boost::none, // timestamp
- abortReason)}};
+ {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kError, abortReason)},
+ {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kDone, abortReason)},
+ {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kDone, abortReason)}};
auto coordinatorDoc0 =
makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards, abortReason);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 8c3ec48a49f..e20b8f447ec 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -123,8 +123,8 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
// Remove the coordinator document.
return BatchedCommandRequest::buildDeleteOp(
NamespaceString::kConfigReshardingOperationsNamespace,
- BSON("_id" << coordinatorDoc.get_id()), // query
- false // multi
+ BSON("_id" << coordinatorDoc.getReshardingUUID()), // query
+ false // multi
);
default: {
// Partially update the coordinator document.
@@ -148,10 +148,18 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
*abortReason);
}
- if (auto approxCopySize = coordinatorDoc.getApproxCopySize()) {
- // If the approxCopySize exists, include it in the update.
- setBuilder.append(ReshardingCoordinatorDocument::kApproxCopySizeFieldName,
- approxCopySize->toBSON());
+ if (auto approxBytesToCopy = coordinatorDoc.getApproxBytesToCopy()) {
+ // If the approxBytesToCopy exists, include it in the update.
+ setBuilder.append(
+ ReshardingCoordinatorDocument::kApproxBytesToCopyFieldName,
+ *approxBytesToCopy);
+ }
+
+ if (auto approxDocumentsToCopy = coordinatorDoc.getApproxDocumentsToCopy()) {
+ // If the approxDocumentsToCopy exists, include it in the update.
+ setBuilder.append(
+ ReshardingCoordinatorDocument::kApproxDocumentsToCopyFieldName,
+ *approxDocumentsToCopy);
}
if (nextState == CoordinatorStateEnum::kPreparingToDonate) {
@@ -163,7 +171,7 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
return BatchedCommandRequest::buildUpdateOp(
NamespaceString::kConfigReshardingOperationsNamespace,
- BSON("_id" << coordinatorDoc.get_id()),
+ BSON("_id" << coordinatorDoc.getReshardingUUID()),
updateBuilder.obj(),
false, // upsert
false // multi
@@ -204,7 +212,7 @@ TypeCollectionRecipientFields constructRecipientFields(
const ReshardingCoordinatorDocument& coordinatorDoc) {
auto donorShardIds = extractShardIds(coordinatorDoc.getDonorShards());
TypeCollectionRecipientFields recipientFields(
- std::move(donorShardIds), coordinatorDoc.getExistingUUID(), coordinatorDoc.getNss());
+ std::move(donorShardIds), coordinatorDoc.getSourceUUID(), coordinatorDoc.getSourceNss());
emplaceFetchTimestampIfExists(recipientFields, coordinatorDoc.getFetchTimestamp());
return recipientFields;
}
@@ -218,9 +226,11 @@ BSONObj createReshardingFieldsUpdateForOriginalNss(
switch (nextState) {
case CoordinatorStateEnum::kInitializing: {
// Append 'reshardingFields' to the config.collections entry for the original nss
- TypeCollectionReshardingFields originalEntryReshardingFields(coordinatorDoc.get_id());
+ TypeCollectionReshardingFields originalEntryReshardingFields(
+ coordinatorDoc.getReshardingUUID());
originalEntryReshardingFields.setState(coordinatorDoc.getState());
- TypeCollectionDonorFields donorField(coordinatorDoc.getReshardingKey());
+ TypeCollectionDonorFields donorField(coordinatorDoc.getTempReshardingNss(),
+ coordinatorDoc.getReshardingKey());
originalEntryReshardingFields.setDonorFields(donorField);
return BSON("$set" << BSON(CollectionType::kReshardingFieldsFieldName
@@ -238,7 +248,7 @@ BSONObj createReshardingFieldsUpdateForOriginalNss(
// 'state' field and add the 'recipientFields' to the 'reshardingFields' section.
auto recipientFields = constructRecipientFields(coordinatorDoc);
BSONObj setFields =
- BSON("uuid" << coordinatorDoc.get_id() << "key"
+ BSON("uuid" << coordinatorDoc.getReshardingUUID() << "key"
<< coordinatorDoc.getReshardingKey().toBSON() << "lastmodEpoch"
<< newCollectionEpoch.get() << "lastmod"
<< opCtx->getServiceContext()->getPreciseClockSource()->now()
@@ -292,7 +302,7 @@ void updateConfigCollectionsForOriginalNss(OperationContext* opCtx,
auto request = BatchedCommandRequest::buildUpdateOp(
CollectionType::ConfigNS,
- BSON(CollectionType::kNssFieldName << coordinatorDoc.getNss().ns()), // query
+ BSON(CollectionType::kNssFieldName << coordinatorDoc.getSourceNss().ns()), // query
writeOp,
false, // upsert
false // multi
@@ -410,9 +420,9 @@ void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx,
// exist, so cannot pass a value for expectedNumModified
const auto chunksQuery = [&]() {
if (newCollectionTimestamp) {
- return BSON(ChunkType::collectionUUID() << coordinatorDoc.getExistingUUID());
+ return BSON(ChunkType::collectionUUID() << coordinatorDoc.getSourceUUID());
} else {
- return BSON(ChunkType::ns(coordinatorDoc.getNss().ns()));
+ return BSON(ChunkType::ns(coordinatorDoc.getSourceNss().ns()));
}
}();
ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
@@ -431,8 +441,8 @@ void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx,
TagsType::ConfigNS,
BatchedCommandRequest::buildDeleteOp(
TagsType::ConfigNS,
- BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
- true // multi
+ BSON(ChunkType::ns(coordinatorDoc.getSourceNss().ns())), // query
+ true // multi
),
txnNumber);
}
@@ -447,7 +457,7 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
// newCollectionEpoch.
const auto chunksQuery = [&]() {
if (newCollectionTimestamp) {
- return BSON(ChunkType::collectionUUID() << coordinatorDoc.get_id());
+ return BSON(ChunkType::collectionUUID() << coordinatorDoc.getReshardingUUID());
} else {
return BSON(ChunkType::ns(coordinatorDoc.getTempReshardingNss().ns()));
}
@@ -456,7 +466,7 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
if (newCollectionTimestamp) {
return BSON("$set" << BSON("lastmodEpoch" << newCollectionEpoch));
} else {
- return BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns() << "lastmodEpoch"
+ return BSON("$set" << BSON("ns" << coordinatorDoc.getSourceNss().ns() << "lastmodEpoch"
<< newCollectionEpoch));
}
}();
@@ -472,10 +482,10 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
auto tagsRequest = BatchedCommandRequest::buildUpdateOp(
TagsType::ConfigNS,
- BSON(TagsType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query
- BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns())), // update
- false, // upsert
- true // multi
+ BSON(TagsType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query
+ BSON("$set" << BSON("ns" << coordinatorDoc.getSourceNss().ns())), // update
+ false, // upsert
+ true // multi
);
// Update the 'ns' field to be the original collection namespace for all tags documents that
@@ -577,7 +587,7 @@ void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
// metadata.
ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
opCtx,
- updatedCoordinatorDoc.getNss(),
+ updatedCoordinatorDoc.getSourceNss(),
extractShardIds(updatedCoordinatorDoc.getDonorShards()),
std::move(changeMetadataFunc));
} else if (participantsToNotify == ParticipantsToNotifyEnum::kRecipients) {
@@ -596,7 +606,7 @@ void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
// shards would not apply.
ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
opCtx,
- updatedCoordinatorDoc.getNss(),
+ updatedCoordinatorDoc.getSourceNss(),
extractShardIds(updatedCoordinatorDoc.getRecipientShards()),
std::move(changeMetadataFunc));
}
@@ -612,13 +622,13 @@ CollectionType createTempReshardingCollectionType(
CollectionType collType(coordinatorDoc.getTempReshardingNss(),
chunkVersion.epoch(),
opCtx->getServiceContext()->getPreciseClockSource()->now(),
- coordinatorDoc.get_id());
+ coordinatorDoc.getReshardingUUID());
collType.setKeyPattern(coordinatorDoc.getReshardingKey());
collType.setDefaultCollation(collation);
collType.setUnique(false);
collType.setTimestamp(chunkVersion.getTimestamp());
- TypeCollectionReshardingFields tempEntryReshardingFields(coordinatorDoc.get_id());
+ TypeCollectionReshardingFields tempEntryReshardingFields(coordinatorDoc.getReshardingUUID());
tempEntryReshardingFields.setState(coordinatorDoc.getState());
auto recipientFields = constructRecipientFields(coordinatorDoc);
@@ -631,7 +641,7 @@ CollectionType createTempReshardingCollectionType(
void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc) {
auto originalCollType = Grid::get(opCtx)->catalogClient()->getCollection(
- opCtx, coordinatorDoc.getNss(), repl::ReadConcernLevel::kMajorityReadConcern);
+ opCtx, coordinatorDoc.getSourceNss(), repl::ReadConcernLevel::kMajorityReadConcern);
const auto collation = originalCollType.getDefaultCollation();
executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) {
@@ -646,7 +656,7 @@ void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx,
str::stream()
<< "Only one resharding operation is allowed to be active at a "
"time, aborting resharding op for "
- << coordinatorDoc.getNss());
+ << coordinatorDoc.getSourceNss());
}
throw;
@@ -676,9 +686,9 @@ std::vector<DonorShardEntry> constructDonorShardEntries(const std::set<ShardId>&
donorShardIds.end(),
std::back_inserter(donorShards),
[](const ShardId& shardId) -> DonorShardEntry {
- DonorShardEntry entry{shardId};
- entry.setState(DonorStateEnum::kUnused);
- return entry;
+ DonorShardContext donorCtx;
+ donorCtx.setState(DonorStateEnum::kUnused);
+ return DonorShardEntry{shardId, std::move(donorCtx)};
});
return donorShards;
}
@@ -690,9 +700,9 @@ std::vector<RecipientShardEntry> constructRecipientShardEntries(
recipientShardIds.end(),
std::back_inserter(recipientShards),
[](const ShardId& shardId) -> RecipientShardEntry {
- RecipientShardEntry entry{shardId};
- entry.setState(RecipientStateEnum::kUnused);
- return entry;
+ RecipientShardContext recipientCtx;
+ recipientCtx.setState(RecipientStateEnum::kUnused);
+ return RecipientShardEntry{shardId, std::move(recipientCtx)};
});
return recipientShards;
}
@@ -701,7 +711,7 @@ ParticipantShardsAndChunks calculateParticipantShardsAndChunks(
OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) {
const auto cm = uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
- opCtx, coordinatorDoc.getNss()));
+ opCtx, coordinatorDoc.getSourceNss()));
std::set<ShardId> donorShardIds;
cm.getAllShardIds(&donorShardIds);
@@ -722,7 +732,7 @@ ParticipantShardsAndChunks calculateParticipantShardsAndChunks(
ReshardedChunk::parse(IDLParserErrorContext("ReshardedChunk"), obj);
if (version.getTimestamp()) {
initialChunks.emplace_back(
- coordinatorDoc.get_id(),
+ coordinatorDoc.getReshardingUUID(),
ChunkRange{reshardedChunk.getMin(), reshardedChunk.getMax()},
version,
reshardedChunk.getRecipientShardId());
@@ -743,7 +753,7 @@ ParticipantShardsAndChunks calculateParticipantShardsAndChunks(
cm.forEachChunk([&](const auto& chunk) {
// TODO SERVER-49526 Change the range to refer to the new shard key pattern.
if (version.getTimestamp()) {
- initialChunks.emplace_back(coordinatorDoc.get_id(),
+ initialChunks.emplace_back(coordinatorDoc.getReshardingUUID(),
ChunkRange{chunk.getMin(), chunk.getMax()},
version,
chunk.getShardId());
@@ -910,15 +920,15 @@ ReshardingCoordinatorService::ReshardingCoordinator::~ReshardingCoordinator() {
void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc(
const ReshardingCoordinatorDocument& doc) {
- invariant(doc.get_id() == _coordinatorDoc.get_id());
+ invariant(doc.getReshardingUUID() == _coordinatorDoc.getReshardingUUID());
LOGV2_INFO(5343001,
"Transitioned resharding coordinator state",
"newState"_attr = CoordinatorState_serializer(doc.getState()),
"oldState"_attr = CoordinatorState_serializer(_coordinatorDoc.getState()),
- "ns"_attr = doc.getNss(),
- "collectionUUID"_attr = doc.getCommonReshardingMetadata().getExistingUUID(),
- "reshardingUUID"_attr = doc.get_id());
+ "namespace"_attr = doc.getSourceNss(),
+ "collectionUUID"_attr = doc.getSourceUUID(),
+ "reshardingUUID"_attr = doc.getReshardingUUID());
_coordinatorDoc = doc;
}
@@ -997,7 +1007,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
})
.then([this, executor] {
_tellAllParticipantsToRefresh(
- createFinishReshardCollectionCommand(_coordinatorDoc.getNss()), executor);
+ createFinishReshardCollectionCommand(_coordinatorDoc.getSourceNss()), executor);
})
.then([this, self = shared_from_this(), executor] {
// The shared_ptr maintaining the ReshardingCoordinatorService Instance object gets
@@ -1014,7 +1024,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
}
}
- auto nss = _coordinatorDoc.getNss();
+ auto nss = _coordinatorDoc.getSourceNss();
LOGV2(4956902,
"Resharding failed",
@@ -1082,8 +1092,8 @@ boost::optional<BSONObj> ReshardingCoordinatorService::ReshardingCoordinator::re
MongoProcessInterface::CurrentOpSessionsMode) noexcept {
ReshardingMetrics::ReporterOptions options(
ReshardingMetrics::ReporterOptions::Role::kCoordinator,
- _coordinatorDoc.get_id(),
- _coordinatorDoc.getNss(),
+ _coordinatorDoc.getReshardingUUID(),
+ _coordinatorDoc.getSourceNss(),
_coordinatorDoc.getReshardingKey().toBSON(),
false);
return ReshardingMetrics::get(cc().getServiceContext())->reportForCurrentOp(options);
@@ -1149,12 +1159,23 @@ void emplaceApproxBytesToCopyIfExists(ReshardingCoordinatorDocument& coordinator
return;
}
- if (auto alreadyExistingApproxCopySize = coordinatorDoc.getApproxCopySize()) {
- invariant(approxCopySize->toBSON().woCompare(alreadyExistingApproxCopySize->toBSON()) == 0,
- "Expected the existing and the new values for approxCopySize to be equal");
+ invariant(bool(coordinatorDoc.getApproxBytesToCopy()) ==
+ bool(coordinatorDoc.getApproxDocumentsToCopy()),
+ "Expected approxBytesToCopy and approxDocumentsToCopy to either both be set or to"
+ " both be unset");
+
+ if (auto alreadyExistingApproxBytesToCopy = coordinatorDoc.getApproxBytesToCopy()) {
+ invariant(approxCopySize->getApproxBytesToCopy() == *alreadyExistingApproxBytesToCopy,
+ "Expected the existing and the new values for approxBytesToCopy to be equal");
}
- coordinatorDoc.setApproxCopySize(std::move(approxCopySize));
+ if (auto alreadyExistingApproxDocumentsToCopy = coordinatorDoc.getApproxDocumentsToCopy()) {
+ invariant(approxCopySize->getApproxDocumentsToCopy() ==
+ *alreadyExistingApproxDocumentsToCopy,
+ "Expected the existing and the new values for approxDocumentsToCopy to be equal");
+ }
+
+ coordinatorDoc.setReshardingApproxCopySizeStruct(std::move(*approxCopySize));
}
ReshardingApproxCopySize computeApproxCopySize(ReshardingCoordinatorDocument& coordinatorDoc) {
@@ -1166,9 +1187,12 @@ ReshardingApproxCopySize computeApproxCopySize(ReshardingCoordinatorDocument& co
// Compute the aggregate for the number of documents and bytes to copy.
long aggBytesToCopy = 0, aggDocumentsToCopy = 0;
for (auto donor : coordinatorDoc.getDonorShards()) {
- if (const auto cloneSizeInfo = donor.getCloneSizeInfo(); cloneSizeInfo) {
- aggBytesToCopy += cloneSizeInfo->getBytesToClone().get_value_or(0);
- aggDocumentsToCopy += cloneSizeInfo->getDocumentsToClone().get_value_or(0);
+ if (const auto bytesToClone = donor.getMutableState().getBytesToClone()) {
+ aggBytesToCopy += *bytesToClone;
+ }
+
+ if (const auto documentsToClone = donor.getMutableState().getDocumentsToClone()) {
+ aggDocumentsToCopy += *documentsToClone;
}
}
@@ -1341,7 +1365,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe
_coordinatorDoc.getState() == CoordinatorStateEnum::kError) {
nssToRefresh = _coordinatorDoc.getTempReshardingNss();
} else {
- nssToRefresh = _coordinatorDoc.getNss();
+ nssToRefresh = _coordinatorDoc.getSourceNss();
}
sharding_util::tellShardsToRefreshCollection(
@@ -1353,7 +1377,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefres
auto opCtx = cc().makeOperationContext();
auto donorIds = extractShardIds(_coordinatorDoc.getDonorShards());
sharding_util::tellShardsToRefreshCollection(
- opCtx.get(), donorIds, _coordinatorDoc.getNss(), **executor);
+ opCtx.get(), donorIds, _coordinatorDoc.getSourceNss(), **executor);
}
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh(
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index 3dc4417f02e..99f8285d2b8 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -85,11 +85,10 @@ protected:
ReshardingCoordinatorDocument makeCoordinatorDoc(
CoordinatorStateEnum state, boost::optional<Timestamp> fetchTimestamp = boost::none) {
CommonReshardingMetadata meta(
- _reshardingUUID, _originalNss, UUID::gen(), _newShardKey.toBSON());
- ReshardingCoordinatorDocument doc(_tempNss,
- state,
- {DonorShardEntry(ShardId("shard0000"))},
- {RecipientShardEntry(ShardId("shard0001"))});
+ _reshardingUUID, _originalNss, UUID::gen(), _tempNss, _newShardKey.toBSON());
+ ReshardingCoordinatorDocument doc(state,
+ {DonorShardEntry(ShardId("shard0000"), {})},
+ {RecipientShardEntry(ShardId("shard0001"), {})});
doc.setCommonReshardingMetadata(meta);
emplaceFetchTimestampIfExists(doc, std::move(fetchTimestamp));
return doc;
@@ -112,7 +111,7 @@ protected:
}
CollectionType collType(
- coordinatorDoc.getNss(), std::move(epoch), lastUpdated, std::move(uuid));
+ coordinatorDoc.getSourceNss(), std::move(epoch), lastUpdated, std::move(uuid));
collType.setKeyPattern(shardKey);
collType.setUnique(false);
if (reshardingFields)
@@ -196,10 +195,10 @@ protected:
client.insert(NamespaceString::kConfigReshardingOperationsNamespace.ns(),
coordinatorDoc.toBSON());
- TypeCollectionReshardingFields reshardingFields(coordinatorDoc.get_id());
+ TypeCollectionReshardingFields reshardingFields(coordinatorDoc.getReshardingUUID());
reshardingFields.setState(coordinatorDoc.getState());
- reshardingFields.setDonorFields(
- TypeCollectionDonorFields(coordinatorDoc.getReshardingKey()));
+ reshardingFields.setDonorFields(TypeCollectionDonorFields(
+ coordinatorDoc.getTempReshardingNss(), coordinatorDoc.getReshardingKey()));
auto originalNssCatalogEntry = makeOriginalCollectionCatalogEntry(
coordinatorDoc,
@@ -235,13 +234,14 @@ protected:
OperationContext* opCtx, ReshardingCoordinatorDocument expectedCoordinatorDoc) {
DBDirectClient client(opCtx);
auto doc = client.findOne(NamespaceString::kConfigReshardingOperationsNamespace.ns(),
- Query(BSON("nss" << expectedCoordinatorDoc.getNss().ns())));
+ Query(BSON("ns" << expectedCoordinatorDoc.getSourceNss().ns())));
auto coordinatorDoc = ReshardingCoordinatorDocument::parse(
IDLParserErrorContext("ReshardingCoordinatorTest"), doc);
- ASSERT_EQUALS(coordinatorDoc.get_id(), expectedCoordinatorDoc.get_id());
- ASSERT_EQUALS(coordinatorDoc.getNss(), expectedCoordinatorDoc.getNss());
+ ASSERT_EQUALS(coordinatorDoc.getReshardingUUID(),
+ expectedCoordinatorDoc.getReshardingUUID());
+ ASSERT_EQUALS(coordinatorDoc.getSourceNss(), expectedCoordinatorDoc.getSourceNss());
ASSERT_EQUALS(coordinatorDoc.getTempReshardingNss(),
expectedCoordinatorDoc.getTempReshardingNss());
ASSERT_EQUALS(coordinatorDoc.getReshardingKey().toBSON().woCompare(
@@ -284,14 +284,14 @@ protected:
onDiskDonorShards.end(),
[shardId](DonorShardEntry d) { return d.getId() == shardId; });
ASSERT(onDiskIt != onDiskDonorShards.end());
- if (it->getMinFetchTimestamp()) {
- ASSERT(onDiskIt->getMinFetchTimestamp());
- ASSERT_EQUALS(onDiskIt->getMinFetchTimestamp().get(),
- it->getMinFetchTimestamp().get());
+ if (it->getMutableState().getMinFetchTimestamp()) {
+ ASSERT(onDiskIt->getMutableState().getMinFetchTimestamp());
+ ASSERT_EQUALS(onDiskIt->getMutableState().getMinFetchTimestamp().get(),
+ it->getMutableState().getMinFetchTimestamp().get());
} else {
- ASSERT(!onDiskIt->getMinFetchTimestamp());
+ ASSERT(!onDiskIt->getMutableState().getMinFetchTimestamp());
}
- ASSERT(onDiskIt->getState() == it->getState());
+ ASSERT(onDiskIt->getMutableState().getState() == it->getMutableState().getState());
}
auto expectedRecipientShards = expectedCoordinatorDoc.getRecipientShards();
@@ -304,14 +304,7 @@ protected:
onDiskRecipientShards.end(),
[shardId](RecipientShardEntry r) { return r.getId() == shardId; });
ASSERT(onDiskIt != onDiskRecipientShards.end());
- if (it->getStrictConsistencyTimestamp()) {
- ASSERT(onDiskIt->getStrictConsistencyTimestamp());
- ASSERT_EQUALS(onDiskIt->getStrictConsistencyTimestamp().get(),
- it->getStrictConsistencyTimestamp().get());
- } else {
- ASSERT(!onDiskIt->getStrictConsistencyTimestamp());
- }
- ASSERT(onDiskIt->getState() == it->getState());
+ ASSERT(onDiskIt->getMutableState().getState() == it->getMutableState().getState());
}
}
@@ -344,7 +337,8 @@ protected:
ASSERT(onDiskEntry.getReshardingFields());
auto onDiskReshardingFields = onDiskEntry.getReshardingFields().get();
- ASSERT(onDiskReshardingFields.getUuid() == expectedReshardingFields->getUuid());
+ ASSERT(onDiskReshardingFields.getReshardingUUID() ==
+ expectedReshardingFields->getReshardingUUID());
ASSERT(onDiskReshardingFields.getState() == expectedReshardingFields->getState());
ASSERT(onDiskReshardingFields.getDonorFields());
@@ -394,12 +388,13 @@ protected:
ASSERT(onDiskEntry.getReshardingFields());
auto onDiskReshardingFields = onDiskEntry.getReshardingFields().get();
- ASSERT_EQUALS(onDiskReshardingFields.getUuid(), expectedReshardingFields.getUuid());
+ ASSERT_EQUALS(onDiskReshardingFields.getReshardingUUID(),
+ expectedReshardingFields.getReshardingUUID());
ASSERT(onDiskReshardingFields.getState() == expectedReshardingFields.getState());
ASSERT(onDiskReshardingFields.getRecipientFields());
- ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getOriginalNamespace(),
- expectedReshardingFields.getRecipientFields()->getOriginalNamespace());
+ ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getSourceNss(),
+ expectedReshardingFields.getRecipientFields()->getSourceNss());
if (expectedReshardingFields.getRecipientFields()->getFetchTimestamp()) {
ASSERT(onDiskReshardingFields.getRecipientFields()->getFetchTimestamp());
@@ -472,9 +467,11 @@ protected:
// Check the resharding fields and allowMigrations in the config.collections entry for the
// original collection
- TypeCollectionReshardingFields expectedReshardingFields(expectedCoordinatorDoc.get_id());
+ TypeCollectionReshardingFields expectedReshardingFields(
+ expectedCoordinatorDoc.getReshardingUUID());
expectedReshardingFields.setState(expectedCoordinatorDoc.getState());
- TypeCollectionDonorFields donorField(expectedCoordinatorDoc.getReshardingKey());
+ TypeCollectionDonorFields donorField(expectedCoordinatorDoc.getTempReshardingNss(),
+ expectedCoordinatorDoc.getReshardingKey());
expectedReshardingFields.setDonorFields(donorField);
if (auto abortReason = expectedCoordinatorDoc.getAbortReason()) {
AbortReason abortReasonStruct;
@@ -518,10 +515,12 @@ protected:
auto opCtx = operationContext();
DBDirectClient client(opCtx);
- TypeCollectionReshardingFields reshardingFields(expectedCoordinatorDoc.get_id());
+ TypeCollectionReshardingFields reshardingFields(
+ expectedCoordinatorDoc.getReshardingUUID());
reshardingFields.setState(expectedCoordinatorDoc.getState());
reshardingFields.setDonorFields(
- TypeCollectionDonorFields(expectedCoordinatorDoc.getReshardingKey()));
+ TypeCollectionDonorFields(expectedCoordinatorDoc.getTempReshardingNss(),
+ expectedCoordinatorDoc.getReshardingKey()));
auto originalNssCatalogEntry = makeOriginalCollectionCatalogEntry(
expectedCoordinatorDoc,
@@ -618,7 +617,7 @@ protected:
// Check that the entry is removed from config.reshardingOperations
DBDirectClient client(opCtx);
auto doc = client.findOne(NamespaceString::kConfigReshardingOperationsNamespace.ns(),
- Query(BSON("nss" << expectedCoordinatorDoc.getNss().ns())));
+ Query(BSON("ns" << expectedCoordinatorDoc.getSourceNss().ns())));
ASSERT(doc.isEmpty());
// Check that the resharding fields are removed from the config.collections entry and
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index 4367fa45891..0ad26e7eae1 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -86,7 +86,8 @@ void createReshardingStateMachine(OperationContext* opCtx, const ReshardingDocum
// exception. This is safe because PrimaryOnlyService::onStepUp() will have constructed a
// new instance of the resharding state machine.
auto dupeKeyInfo = ex.extraInfo<DuplicateKeyErrorInfo>();
- invariant(dupeKeyInfo->getDuplicatedKeyValue().binaryEqual(BSON("_id" << doc.get_id())));
+ invariant(dupeKeyInfo->getDuplicatedKeyValue().binaryEqual(
+ BSON("_id" << doc.getReshardingUUID())));
}
}
@@ -118,7 +119,7 @@ void processAbortReasonNoDonorMachine(OperationContext* opCtx,
uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument(
opCtx,
NamespaceString::kConfigReshardingOperationsNamespace,
- BSON("_id" << reshardingFields.getUuid() << "donorShards.id" << shardId),
+ BSON("_id" << reshardingFields.getReshardingUUID() << "donorShards.id" << shardId),
BSON("$set" << updateBuilder.done()),
false /* upsert */,
ShardingCatalogClient::kMajorityWriteConcern));
@@ -154,7 +155,7 @@ void processAbortReasonNoRecipientMachine(OperationContext* opCtx,
uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument(
opCtx,
NamespaceString::kConfigReshardingOperationsNamespace,
- BSON("_id" << reshardingFields.getUuid() << "recipientShards.id" << shardId),
+ BSON("_id" << reshardingFields.getReshardingUUID() << "recipientShards.id" << shardId),
BSON("$set" << updateBuilder.done()),
false /* upsert */,
ShardingCatalogClient::kMajorityWriteConcern));
@@ -171,7 +172,7 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx,
if (auto donorStateMachine = tryGetReshardingStateMachine<ReshardingDonorService,
DonorStateMachine,
ReshardingDonorDocument>(
- opCtx, reshardingFields.getUuid())) {
+ opCtx, reshardingFields.getReshardingUUID())) {
donorStateMachine->get()->onReshardingFieldsChanges(opCtx, reshardingFields);
return;
}
@@ -218,7 +219,7 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx,
if (auto recipientStateMachine = tryGetReshardingStateMachine<ReshardingRecipientService,
RecipientStateMachine,
ReshardingRecipientDocument>(
- opCtx, reshardingFields.getUuid())) {
+ opCtx, reshardingFields.getReshardingUUID())) {
recipientStateMachine->get()->onReshardingFieldsChanges(opCtx, reshardingFields);
return;
}
@@ -247,7 +248,7 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx,
}
auto recipientDoc =
- constructRecipientDocumentFromReshardingFields(opCtx, metadata, reshardingFields);
+ constructRecipientDocumentFromReshardingFields(opCtx, nss, metadata, reshardingFields);
createReshardingStateMachine<ReshardingRecipientService,
RecipientStateMachine,
ReshardingRecipientDocument>(opCtx, recipientDoc);
@@ -291,12 +292,17 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields(
const NamespaceString& nss,
const CollectionMetadata& metadata,
const ReshardingFields& reshardingFields) {
- auto donorDoc = ReshardingDonorDocument(DonorStateEnum::kPreparingToDonate);
+ DonorShardContext donorCtx;
+ donorCtx.setState(DonorStateEnum::kPreparingToDonate);
+ auto donorDoc = ReshardingDonorDocument{std::move(donorCtx)};
+
+ auto sourceUUID = getCollectionUUIDFromChunkManger(nss, *metadata.getChunkManager());
auto commonMetadata =
- CommonReshardingMetadata(reshardingFields.getUuid(),
+ CommonReshardingMetadata(reshardingFields.getReshardingUUID(),
nss,
- getCollectionUUIDFromChunkManger(nss, *metadata.getChunkManager()),
+ sourceUUID,
+ reshardingFields.getDonorFields()->getTempReshardingNss(),
reshardingFields.getDonorFields()->getReshardingKey().toBSON());
donorDoc.setCommonReshardingMetadata(std::move(commonMetadata));
@@ -305,21 +311,26 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields(
ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
OperationContext* opCtx,
+ const NamespaceString& nss,
const CollectionMetadata& metadata,
const ReshardingFields& reshardingFields) {
// The recipient state machines are created before the donor shards are prepared to donate but
// will remain idle until the donor shards are prepared to donate.
invariant(!reshardingFields.getRecipientFields()->getFetchTimestamp());
- auto recipientDoc =
- ReshardingRecipientDocument(RecipientStateEnum::kAwaitingFetchTimestamp,
- reshardingFields.getRecipientFields()->getDonorShardIds());
+ RecipientShardContext recipientCtx;
+ recipientCtx.setState(RecipientStateEnum::kAwaitingFetchTimestamp);
- auto commonMetadata =
- CommonReshardingMetadata(reshardingFields.getUuid(),
- reshardingFields.getRecipientFields()->getOriginalNamespace(),
- reshardingFields.getRecipientFields()->getExistingUUID(),
- metadata.getShardKeyPattern().toBSON());
+ auto recipientDoc = ReshardingRecipientDocument{
+ std::move(recipientCtx), reshardingFields.getRecipientFields()->getDonorShardIds()};
+
+ auto sourceNss = reshardingFields.getRecipientFields()->getSourceNss();
+ auto sourceUUID = reshardingFields.getRecipientFields()->getSourceUUID();
+ auto commonMetadata = CommonReshardingMetadata(reshardingFields.getReshardingUUID(),
+ sourceNss,
+ sourceUUID,
+ nss,
+ metadata.getShardKeyPattern().toBSON());
recipientDoc.setCommonReshardingMetadata(std::move(commonMetadata));
return recipientDoc;
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h
index 309be7d5230..8930ddd3fe0 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h
@@ -44,7 +44,7 @@ using ReshardingFields = TypeCollectionReshardingFields;
template <class Service, class StateMachine, class ReshardingDocument>
boost::optional<std::shared_ptr<StateMachine>> tryGetReshardingStateMachine(
OperationContext* opCtx, const UUID& reshardingUUID) {
- auto instanceId = BSON(ReshardingDocument::k_idFieldName << reshardingUUID);
+ auto instanceId = BSON(ReshardingDocument::kReshardingUUIDFieldName << reshardingUUID);
auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
auto service = registry->lookupServiceByName(Service::kServiceName);
return StateMachine::lookup(opCtx, service, instanceId);
@@ -60,6 +60,7 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields(
ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
OperationContext* opCtx,
+ const NamespaceString& nss,
const CollectionMetadata& metadata,
const ReshardingFields& reshardingFields);
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index 0da9f7f56b1..e5193d93374 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -72,7 +72,7 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest,
reshardingFields, kShardIds, kExistingUUID, kOriginalNss);
auto recipientDoc = resharding::constructRecipientDocumentFromReshardingFields(
- opCtx, metadata, reshardingFields);
+ opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
assertRecipientDocMatchesReshardingFields(metadata, reshardingFields, recipientDoc);
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
index 86178f5dc09..790f6eb5ddb 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
@@ -135,7 +135,7 @@ protected:
void appendDonorFieldsToReshardingFields(ReshardingFields& fields,
const BSONObj& reshardingKey) {
- fields.setDonorFields(TypeCollectionDonorFields(reshardingKey));
+ fields.setDonorFields(TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey));
}
void appendRecipientFieldsToReshardingFields(
@@ -156,9 +156,9 @@ protected:
const UUID& existingUUID,
const BSONObj& reshardingKey,
const ReshardingDocument& reshardingDoc) {
- ASSERT_EQ(reshardingDoc.get_id(), reshardingUUID);
- ASSERT_EQ(reshardingDoc.getNss(), nss);
- ASSERT_EQ(reshardingDoc.getExistingUUID(), existingUUID);
+ ASSERT_EQ(reshardingDoc.getReshardingUUID(), reshardingUUID);
+ ASSERT_EQ(reshardingDoc.getSourceNss(), nss);
+ ASSERT_EQ(reshardingDoc.getSourceUUID(), existingUUID);
ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey);
}
@@ -168,12 +168,12 @@ protected:
const ReshardingDonorDocument& donorDoc) {
assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>(
nss,
- reshardingFields.getUuid(),
+ reshardingFields.getReshardingUUID(),
existingUUID,
reshardingFields.getDonorFields()->getReshardingKey().toBSON(),
donorDoc);
- ASSERT(donorDoc.getState() == DonorStateEnum::kPreparingToDonate);
- ASSERT(donorDoc.getMinFetchTimestamp() == boost::none);
+ ASSERT(donorDoc.getMutableState().getState() == DonorStateEnum::kPreparingToDonate);
+ ASSERT(donorDoc.getMutableState().getMinFetchTimestamp() == boost::none);
}
void assertRecipientDocMatchesReshardingFields(
@@ -181,13 +181,14 @@ protected:
const ReshardingFields& reshardingFields,
const ReshardingRecipientDocument& recipientDoc) {
assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>(
- reshardingFields.getRecipientFields()->getOriginalNamespace(),
- reshardingFields.getUuid(),
- reshardingFields.getRecipientFields()->getExistingUUID(),
+ reshardingFields.getRecipientFields()->getSourceNss(),
+ reshardingFields.getReshardingUUID(),
+ reshardingFields.getRecipientFields()->getSourceUUID(),
metadata.getShardKeyPattern().toBSON(),
recipientDoc);
- ASSERT(recipientDoc.getState() == RecipientStateEnum::kAwaitingFetchTimestamp);
+ ASSERT(recipientDoc.getMutableState().getState() ==
+ RecipientStateEnum::kAwaitingFetchTimestamp);
ASSERT(!recipientDoc.getFetchTimestamp());
auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds();
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index edc2ac24298..79c16d97ddf 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -74,13 +74,8 @@ ChunkManager getShardedCollectionRoutingInfoWithRefreshAndFlush(const NamespaceS
return routingInfo;
}
-void refreshTemporaryReshardingCollection(const ReshardingDonorDocument& donorDoc) {
- auto tempNss =
- constructTemporaryReshardingNss(donorDoc.getNss().db(), donorDoc.getExistingUUID());
- std::ignore = getShardedCollectionRoutingInfoWithRefreshAndFlush(tempNss);
-}
-
-Timestamp generateMinFetchTimestamp(const ReshardingDonorDocument& donorDoc) {
+Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss,
+ const CollectionUUID& sourceUUID) {
auto opCtx = cc().makeOperationContext();
// Do a no-op write and use the OpTime as the minFetchTimestamp
@@ -89,19 +84,19 @@ Timestamp generateMinFetchTimestamp(const ReshardingDonorDocument& donorDoc) {
"resharding donor minFetchTimestamp",
NamespaceString::kRsOplogNamespace.ns(),
[&] {
- AutoGetDb db(opCtx.get(), donorDoc.getNss().db(), MODE_IX);
- Lock::CollectionLock collLock(opCtx.get(), donorDoc.getNss(), MODE_S);
+ AutoGetDb db(opCtx.get(), sourceNss.db(), MODE_IX);
+ Lock::CollectionLock collLock(opCtx.get(), sourceNss, MODE_S);
AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite);
const std::string msg = str::stream()
- << "All future oplog entries on the namespace " << donorDoc.getNss().ns()
+ << "All future oplog entries on the namespace " << sourceNss.ns()
<< " must include a 'destinedRecipient' field";
WriteUnitOfWork wuow(opCtx.get());
opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
opCtx.get(),
- donorDoc.getNss(),
- donorDoc.getExistingUUID(),
+ sourceNss,
+ sourceUUID,
{},
BSON("msg" << msg),
boost::none,
@@ -141,10 +136,13 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::cons
}
ReshardingDonorService::DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc)
+ : DonorStateMachine(ReshardingDonorDocument::parse({"DonorStateMachine"}, donorDoc)) {}
+
+ReshardingDonorService::DonorStateMachine::DonorStateMachine(
+ const ReshardingDonorDocument& donorDoc)
: repl::PrimaryOnlyService::TypedInstance<DonorStateMachine>(),
- _donorDoc(ReshardingDonorDocument::parse(IDLParserErrorContext("ReshardingDonorDocument"),
- donorDoc)),
- _id(_donorDoc.getCommonReshardingMetadata().get_id()) {}
+ _metadata{donorDoc.getCommonReshardingMetadata()},
+ _donorCtx{donorDoc.getMutableState()} {}
ReshardingDonorService::DonorStateMachine::~DonorStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
@@ -174,15 +172,18 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
.onError([this](Status status) {
LOGV2(4956400,
"Resharding operation donor state machine failed",
- "namespace"_attr = _donorDoc.getNss().ns(),
- "reshardingId"_attr = _id,
+ "namespace"_attr = _metadata.getSourceNss(),
+ "reshardingUUID"_attr = _metadata.getReshardingUUID(),
"error"_attr = status);
- _transitionStateAndUpdateCoordinator(DonorStateEnum::kError, boost::none, status);
+
+ _transitionToError(status);
+ _updateCoordinator();
// TODO SERVER-52838: Ensure all local collections that may have been created for
// resharding are removed, with the exception of the ReshardingDonorDocument, before
// transitioning to kDone.
- _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone, boost::none, status);
+ _transitionState(DonorStateEnum::kDone);
+ _updateCoordinator();
return status;
})
.onCompletion([this, self = shared_from_this()](Status status) {
@@ -227,9 +228,9 @@ boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCur
MongoProcessInterface::CurrentOpConnectionsMode connMode,
MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept {
ReshardingMetrics::ReporterOptions options(ReshardingMetrics::ReporterOptions::Role::kDonor,
- _id,
- _donorDoc.getNss(),
- _donorDoc.getReshardingKey().toBSON(),
+ _metadata.getReshardingUUID(),
+ _metadata.getSourceNss(),
+ _metadata.getReshardingKey().toBSON(),
false);
return ReshardingMetrics::get(cc().getServiceContext())->reportForCurrentOp(options);
}
@@ -249,7 +250,7 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
}
if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) {
- _critSec.emplace(opCtx->getServiceContext(), _donorDoc.getNss());
+ _critSec.emplace(opCtx->getServiceContext(), _metadata.getSourceNss());
ensureFulfilledPromise(lk, _allRecipientsDoneApplying);
}
@@ -265,58 +266,59 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
void ReshardingDonorService::DonorStateMachine::
_onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() {
- if (_donorDoc.getState() > DonorStateEnum::kPreparingToDonate) {
- invariant(_donorDoc.getMinFetchTimestamp());
+ if (_donorCtx.getState() > DonorStateEnum::kPreparingToDonate) {
+ invariant(_donorCtx.getMinFetchTimestamp());
+ invariant(_donorCtx.getBytesToClone());
+ invariant(_donorCtx.getDocumentsToClone());
return;
}
- ReshardingCloneSize cloneSizeEstimate;
+ int64_t bytesToClone = 0;
+ int64_t documentsToClone = 0;
+
{
auto opCtx = cc().makeOperationContext();
auto rawOpCtx = opCtx.get();
- const auto shardId = ShardingState::get(rawOpCtx)->shardId();
-
- const auto& nss = _donorDoc.getNss();
- const auto& nssUUID = _donorDoc.getExistingUUID();
- const auto& reshardingUUID = _donorDoc.get_id();
-
- AutoGetCollectionForRead coll(rawOpCtx, _donorDoc.getNss());
- if (!coll) {
- cloneSizeEstimate.setBytesToClone(0);
- cloneSizeEstimate.setDocumentsToClone(0);
- } else {
- cloneSizeEstimate.setBytesToClone(coll->dataSize(rawOpCtx));
- cloneSizeEstimate.setDocumentsToClone(coll->numRecords(rawOpCtx));
- }
- LOGV2_DEBUG(5390702,
- 2,
- "Resharding estimated size",
- "reshardingUUID"_attr = reshardingUUID,
- "namespace"_attr = nss,
- "donorShardId"_attr = shardId,
- "sizeInfo"_attr = cloneSizeEstimate);
+ AutoGetCollection coll(rawOpCtx, _metadata.getSourceNss(), MODE_IS);
+ if (coll) {
+ IndexBuildsCoordinator::get(rawOpCtx)->assertNoIndexBuildInProgForCollection(
+ coll->uuid());
- IndexBuildsCoordinator::get(rawOpCtx)->assertNoIndexBuildInProgForCollection(nssUUID);
+ bytesToClone = coll->dataSize(rawOpCtx);
+ documentsToClone = coll->numRecords(rawOpCtx);
+ }
}
- // Recipient shards expect to read from the donor shard's existing sharded collection
- // and the config.cache.chunks collection of the temporary resharding collection using
- // {atClusterTime: <fetchTimestamp>}. Refreshing the temporary resharding collection on
- // the donor shards causes them to create the config.cache.chunks collection. Without
- // this refresh, the {atClusterTime: <fetchTimestamp>} read on the config.cache.chunks
- // namespace would fail with a SnapshotUnavailable error response.
- refreshTemporaryReshardingCollection(_donorDoc);
-
- auto minFetchTimestamp = generateMinFetchTimestamp(_donorDoc);
- _transitionStateAndUpdateCoordinator(
- DonorStateEnum::kDonatingInitialData, minFetchTimestamp, boost::none, cloneSizeEstimate);
+ // Recipient shards expect to read from the donor shard's existing sharded collection and the
+ // config.cache.chunks collection of the temporary resharding collection using
+ // {atClusterTime: <fetchTimestamp>}. Refreshing the temporary resharding collection on the
+ // donor shards causes them to create the config.cache.chunks collection. Without this refresh,
+ // the {atClusterTime: <fetchTimestamp>} read on the config.cache.chunks namespace would fail
+ // with a SnapshotUnavailable error response.
+ std::ignore =
+ getShardedCollectionRoutingInfoWithRefreshAndFlush(_metadata.getTempReshardingNss());
+
+ Timestamp minFetchTimestamp =
+ generateMinFetchTimestamp(_metadata.getSourceNss(), _metadata.getSourceUUID());
+
+ LOGV2_DEBUG(5390702,
+ 2,
+ "Collection being resharded now ready for recipients to begin cloning",
+ "namespace"_attr = _metadata.getSourceNss(),
+ "minFetchTimestamp"_attr = minFetchTimestamp,
+ "bytesToClone"_attr = bytesToClone,
+ "documentsToClone"_attr = documentsToClone,
+ "reshardingUUID"_attr = _metadata.getReshardingUUID());
+
+ _transitionToDonatingInitialData(minFetchTimestamp, bytesToClone, documentsToClone);
+ _updateCoordinator();
}
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
_awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- if (_donorDoc.getState() > DonorStateEnum::kDonatingInitialData) {
+ if (_donorCtx.getState() > DonorStateEnum::kDonatingInitialData) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -333,7 +335,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
_awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- if (_donorDoc.getState() > DonorStateEnum::kDonatingOplogEntries) {
+ if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -344,28 +346,25 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
void ReshardingDonorService::DonorStateMachine::
_writeTransactionOplogEntryThenTransitionToBlockingWrites() {
- if (_donorDoc.getState() > DonorStateEnum::kPreparingToBlockWrites) {
+ if (_donorCtx.getState() > DonorStateEnum::kPreparingToBlockWrites) {
return;
}
{
- const auto& nss = _donorDoc.getNss();
- const auto& nssUUID = _donorDoc.getExistingUUID();
- const auto& reshardingUUID = _donorDoc.get_id();
auto opCtx = cc().makeOperationContext();
auto rawOpCtx = opCtx.get();
auto generateOplogEntry = [&](ShardId destinedRecipient) {
repl::MutableOplogEntry oplog;
- oplog.setNss(nss);
+ oplog.setNss(_metadata.getSourceNss());
oplog.setOpType(repl::OpTypeEnum::kNoop);
- oplog.setUuid(nssUUID);
+ oplog.setUuid(_metadata.getSourceUUID());
oplog.setDestinedRecipient(destinedRecipient);
oplog.setObject(
BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
- nss.toString())));
- oplog.setObject2(
- BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << reshardingUUID));
+ _metadata.getSourceNss().toString())));
+ oplog.setObject2(BSON("type" << kReshardFinalOpLogType << "reshardingUUID"
+ << _metadata.getReshardingUUID()));
oplog.setOpTime(OplogSlot());
oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
return oplog;
@@ -374,7 +373,8 @@ void ReshardingDonorService::DonorStateMachine::
try {
Timer latency;
- const auto recipients = getRecipientShards(rawOpCtx, nss, nssUUID);
+ const auto recipients =
+ getRecipientShards(rawOpCtx, _metadata.getSourceNss(), _metadata.getSourceUUID());
for (const auto& recipient : recipients) {
auto oplog = generateOplogEntry(recipient);
@@ -401,8 +401,8 @@ void ReshardingDonorService::DonorStateMachine::
LOGV2_DEBUG(5279504,
0,
"Committed oplog entries to temporarily block writes for resharding",
- "namespace"_attr = nss,
- "reshardingUUID"_attr = reshardingUUID,
+ "namespace"_attr = _metadata.getSourceNss(),
+ "reshardingUUID"_attr = _metadata.getReshardingUUID(),
"numRecipients"_attr = recipients.size(),
"duration"_attr = duration_cast<Milliseconds>(latency.elapsed()));
ensureFulfilledPromise(lg, _finalOplogEntriesWritten);
@@ -412,7 +412,7 @@ void ReshardingDonorService::DonorStateMachine::
stdx::lock_guard<Latch> lg(_mutex);
LOGV2_ERROR(5279508,
"Exception while writing resharding final oplog entries",
- "reshardingUUID"_attr = reshardingUUID,
+ "reshardingUUID"_attr = _metadata.getReshardingUUID(),
"error"_attr = status);
ensureFulfilledPromise(lg, _finalOplogEntriesWritten, status);
uassertStatusOK(status);
@@ -429,7 +429,7 @@ SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitFinalOplo
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
_awaitCoordinatorHasDecisionPersistedThenTransitionToDropping(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- if (_donorDoc.getState() > DonorStateEnum::kBlockingWrites) {
+ if (_donorCtx.getState() > DonorStateEnum::kBlockingWrites) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -439,80 +439,76 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
}
void ReshardingDonorService::DonorStateMachine::_dropOriginalCollection() {
- if (_donorDoc.getState() > DonorStateEnum::kDropping) {
+ if (_donorCtx.getState() > DonorStateEnum::kDropping) {
return;
}
{
auto opCtx = cc().makeOperationContext();
resharding::data_copy::ensureCollectionDropped(
- opCtx.get(), _donorDoc.getNss(), _donorDoc.getExistingUUID());
+ opCtx.get(), _metadata.getSourceNss(), _metadata.getSourceUUID());
}
- _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone);
+ _transitionState(DonorStateEnum::kDone);
+ _updateCoordinator();
}
-void ReshardingDonorService::DonorStateMachine::_transitionState(
- DonorStateEnum endState,
- boost::optional<Timestamp> minFetchTimestamp,
- boost::optional<Status> abortReason) {
- ReshardingDonorDocument replacementDoc(_donorDoc);
- replacementDoc.setState(endState);
+void ReshardingDonorService::DonorStateMachine::_transitionState(DonorStateEnum newState) {
+ invariant(newState != DonorStateEnum::kDonatingInitialData &&
+ newState != DonorStateEnum::kError);
- emplaceMinFetchTimestampIfExists(replacementDoc, minFetchTimestamp);
- emplaceAbortReasonIfExists(replacementDoc, abortReason);
+ auto newDonorCtx = _donorCtx;
+ newDonorCtx.setState(newState);
+ _transitionState(std::move(newDonorCtx));
+}
+void ReshardingDonorService::DonorStateMachine::_transitionState(DonorShardContext&& newDonorCtx) {
// For logging purposes.
- auto oldState = _donorDoc.getState();
- auto newState = replacementDoc.getState();
+ auto oldState = _donorCtx.getState();
+ auto newState = newDonorCtx.getState();
- _updateDonorDocument(std::move(replacementDoc));
+ _updateDonorDocument(std::move(newDonorCtx));
LOGV2_INFO(5279505,
"Transitioned resharding donor state",
"newState"_attr = DonorState_serializer(newState),
"oldState"_attr = DonorState_serializer(oldState),
- "ns"_attr = _donorDoc.getNss(),
- "collectionUUID"_attr = _donorDoc.getExistingUUID(),
- "reshardingUUID"_attr = _donorDoc.get_id());
+ "namespace"_attr = _metadata.getSourceNss(),
+ "collectionUUID"_attr = _metadata.getSourceUUID(),
+ "reshardingUUID"_attr = _metadata.getReshardingUUID());
}
-void ReshardingDonorService::DonorStateMachine::_transitionStateAndUpdateCoordinator(
- DonorStateEnum endState,
- boost::optional<Timestamp> minFetchTimestamp,
- boost::optional<Status> abortReason,
- boost::optional<ReshardingCloneSize> cloneSizeEstimate) {
- _transitionState(endState, minFetchTimestamp, abortReason);
+void ReshardingDonorService::DonorStateMachine::_transitionToDonatingInitialData(
+ Timestamp minFetchTimestamp, int64_t bytesToClone, int64_t documentsToClone) {
+ auto newDonorCtx = _donorCtx;
+ newDonorCtx.setState(DonorStateEnum::kDonatingInitialData);
+ newDonorCtx.setMinFetchTimestamp(minFetchTimestamp);
+ newDonorCtx.setBytesToClone(bytesToClone);
+ newDonorCtx.setDocumentsToClone(documentsToClone);
+ _transitionState(std::move(newDonorCtx));
+}
+
+void ReshardingDonorService::DonorStateMachine::_transitionToError(Status abortReason) {
+ auto newDonorCtx = _donorCtx;
+ newDonorCtx.setState(DonorStateEnum::kError);
+ emplaceAbortReasonIfExists(newDonorCtx, abortReason);
+ _transitionState(std::move(newDonorCtx));
+}
+void ReshardingDonorService::DonorStateMachine::_updateCoordinator() {
auto opCtx = cc().makeOperationContext();
auto shardId = ShardingState::get(opCtx.get())->shardId();
- BSONObjBuilder updateBuilder;
- updateBuilder.append("donorShards.$.state", DonorState_serializer(endState));
-
- if (minFetchTimestamp) {
- updateBuilder.append("donorShards.$.minFetchTimestamp", minFetchTimestamp.get());
- }
-
- if (abortReason) {
- BSONObjBuilder abortReasonBuilder;
- abortReason.get().serializeErrorToBSON(&abortReasonBuilder);
- updateBuilder.append("donorShards.$.abortReason", abortReasonBuilder.obj());
- }
-
- if (cloneSizeEstimate) {
- updateBuilder.append("donorShards.$.cloneSizeInfo", cloneSizeEstimate.get().toBSON());
- }
-
uassertStatusOK(
Grid::get(opCtx.get())
->catalogClient()
- ->updateConfigDocument(opCtx.get(),
- NamespaceString::kConfigReshardingOperationsNamespace,
- BSON("_id" << _donorDoc.get_id() << "donorShards.id" << shardId),
- BSON("$set" << updateBuilder.done()),
- false /* upsert */,
- ShardingCatalogClient::kMajorityWriteConcern));
+ ->updateConfigDocument(
+ opCtx.get(),
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ BSON("_id" << _metadata.getReshardingUUID() << "donorShards.id" << shardId),
+ BSON("$set" << BSON("donorShards.$.mutableState" << _donorCtx.toBSON())),
+ false /* upsert */,
+ ShardingCatalogClient::kMajorityWriteConcern));
}
void ReshardingDonorService::DonorStateMachine::insertStateDocument(
@@ -523,26 +519,28 @@ void ReshardingDonorService::DonorStateMachine::insertStateDocument(
}
void ReshardingDonorService::DonorStateMachine::_updateDonorDocument(
- ReshardingDonorDocument&& replacementDoc) {
+ DonorShardContext&& newDonorCtx) {
auto opCtx = cc().makeOperationContext();
PersistentTaskStore<ReshardingDonorDocument> store(
NamespaceString::kDonorReshardingOperationsNamespace);
- store.update(opCtx.get(),
- BSON(ReshardingDonorDocument::k_idFieldName << _id),
- replacementDoc.toBSON(),
- WriteConcerns::kMajorityWriteConcern);
+ store.update(
+ opCtx.get(),
+ BSON(ReshardingDonorDocument::kReshardingUUIDFieldName << _metadata.getReshardingUUID()),
+ BSON("$set" << BSON(ReshardingDonorDocument::kMutableStateFieldName
+ << newDonorCtx.toBSON())),
+ WriteConcerns::kMajorityWriteConcern);
- _donorDoc = replacementDoc;
+ _donorCtx = newDonorCtx;
}
void ReshardingDonorService::DonorStateMachine::_removeDonorDocument() {
auto opCtx = cc().makeOperationContext();
PersistentTaskStore<ReshardingDonorDocument> store(
NamespaceString::kDonorReshardingOperationsNamespace);
- store.remove(opCtx.get(),
- BSON(ReshardingDonorDocument::k_idFieldName << _id),
- WriteConcerns::kMajorityWriteConcern);
- _donorDoc = {};
+ store.remove(
+ opCtx.get(),
+ BSON(ReshardingDonorDocument::kReshardingUUIDFieldName << _metadata.getReshardingUUID()),
+ WriteConcerns::kMajorityWriteConcern);
}
void ReshardingDonorService::DonorStateMachine::_onAbortOrStepdown(WithLock, Status status) {
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 6e772b886ec..1f73febf005 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -101,6 +101,8 @@ public:
const ReshardingDonorDocument& donorDoc);
private:
+ DonorStateMachine(const ReshardingDonorDocument& donorDoc);
+
// The following functions correspond to the actions to take at a particular donor state.
void _transitionToPreparingToDonate();
@@ -121,36 +123,41 @@ private:
// or NamespaceNotFound.
void _dropOriginalCollection();
- // Transitions the state on-disk and in-memory to 'endState'.
- void _transitionState(DonorStateEnum endState,
- boost::optional<Timestamp> minFetchTimestamp = boost::none,
- boost::optional<Status> abortReason = boost::none);
+ // Transitions the on-disk and in-memory state to 'newState'.
+ void _transitionState(DonorStateEnum newState);
+
+ void _transitionState(DonorShardContext&& newDonorCtx);
+
+ // Transitions the on-disk and in-memory state to DonorStateEnum::kDonatingInitialData.
+ void _transitionToDonatingInitialData(Timestamp minFetchTimestamp,
+ int64_t bytesToClone,
+ int64_t documentsToClone);
- void _transitionStateAndUpdateCoordinator(
- DonorStateEnum endState,
- boost::optional<Timestamp> minFetchTimestamp = boost::none,
- boost::optional<Status> abortReason = boost::none,
- boost::optional<ReshardingCloneSize> cloneSizeEstimate = boost::none);
+ // Transitions the on-disk and in-memory state to DonorStateEnum::kError.
+ void _transitionToError(Status abortReason);
- // Updates the donor document on-disk and in-memory with the 'replacementDoc.'
- void _updateDonorDocument(ReshardingDonorDocument&& replacementDoc);
+ void _updateCoordinator();
- // Removes the local donor document from disk and clears the in-memory state.
+ // Updates the mutable portion of the on-disk and in-memory donor document with 'newDonorCtx'.
+ void _updateDonorDocument(DonorShardContext&& newDonorCtx);
+
+ // Removes the local donor document from disk.
void _removeDonorDocument();
// Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors
// (abort resharding).
void _onAbortOrStepdown(WithLock lk, Status status);
- // The in-memory representation of the underlying document in
+ // The in-memory representation of the immutable portion of the document in
// config.localReshardingOperations.donor.
- ReshardingDonorDocument _donorDoc;
+ const CommonReshardingMetadata _metadata;
- // The id both for the resharding operation and for the primary-only-service instance.
- const UUID _id;
+ // The in-memory representation of the mutable portion of the document in
+ // config.localReshardingOperations.donor.
+ DonorShardContext _donorCtx;
// Protects the promises below
- Mutex _mutex = MONGO_MAKE_LATCH("ReshardingDonor::_mutex");
+ Mutex _mutex = MONGO_MAKE_LATCH("DonorStateMachine::_mutex");
boost::optional<ReshardingCriticalSection> _critSec;
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
index 415fa8c2247..6f686c59731 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -142,7 +142,6 @@ protected:
std::shared_ptr<ReshardingDonorService::DonorStateMachine> getStateMachineInstace(
OperationContext* opCtx, ReshardingDonorDocument initialState) {
- auto instanceId = BSON(ReshardingDonorDocument::k_idFieldName << initialState.get_id());
auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
auto service = registry->lookupServiceByName(ReshardingDonorService::kServiceName);
return ReshardingDonorService::DonorStateMachine::getOrCreate(
@@ -150,7 +149,7 @@ protected:
}
std::vector<BSONObj> getOplogWritesForDonorDocument(const ReshardingDonorDocument& doc) {
- auto reshardNs = doc.getNss().toString();
+ auto reshardNs = doc.getSourceNss().toString();
DBDirectClient client(operationContext());
auto result = client.query(NamespaceString(NamespaceString::kRsOplogNamespace.ns()),
BSON("ns" << reshardNs));
@@ -191,13 +190,19 @@ protected:
TEST_F(ReshardingDonorServiceTest,
ShouldWriteFinalOpLogEntryAfterTransitionToPreparingToBlockWrites) {
- ReshardingDonorDocument doc(DonorStateEnum::kPreparingToBlockWrites);
+ DonorShardContext donorCtx;
+ donorCtx.setState(DonorStateEnum::kPreparingToBlockWrites);
+
+ ReshardingDonorDocument doc(std::move(donorCtx));
CommonReshardingMetadata metadata(kReshardingUUID,
mongo::NamespaceString(kReshardNs),
kExistingUUID,
+ kTemporaryReshardingNss,
KeyPattern(kReshardingKeyPattern));
doc.setCommonReshardingMetadata(metadata);
- doc.getMinFetchTimestampStruct().setMinFetchTimestamp(Timestamp{0xf00});
+ doc.getMutableState().setMinFetchTimestamp(Timestamp(10, 1));
+ doc.getMutableState().setBytesToClone(1000);
+ doc.getMutableState().setDocumentsToClone(5);
auto donorStateMachine = getStateMachineInstace(operationContext(), doc);
ASSERT(donorStateMachine);
@@ -227,7 +232,7 @@ TEST_F(ReshardingDonorServiceTest,
ASSERT(o2.hasField("reshardingUUID"));
auto actualReshardingUUIDBson = o2.getField("reshardingUUID");
auto actualReshardingUUID = UUID::parse(actualReshardingUUIDBson);
- ASSERT_EQUALS(doc.get_id(), actualReshardingUUID);
+ ASSERT_EQUALS(doc.getReshardingUUID(), actualReshardingUUID);
ASSERT(oplog.hasField("ui"));
auto actualUiBson = oplog.getField("ui");
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp
index 026d16587e6..54cf5b5ecd0 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp
@@ -59,8 +59,8 @@ std::shared_ptr<ReshardingCoordinatorObserver> getReshardingCoordinatorObserver(
boost::optional<Timestamp> parseNewMinFetchTimestampValue(const BSONObj& obj) {
auto doc = ReshardingDonorDocument::parse(IDLParserErrorContext("Resharding"), obj);
- if (doc.getState() == DonorStateEnum::kDonatingInitialData) {
- return doc.getMinFetchTimestamp().get();
+ if (doc.getMutableState().getState() == DonorStateEnum::kDonatingInitialData) {
+ return doc.getMutableState().getMinFetchTimestamp().get();
} else {
return boost::none;
}
@@ -208,8 +208,8 @@ void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEn
if (args.nss == NamespaceString::kConfigReshardingOperationsNamespace) {
auto newCoordinatorDoc = ReshardingCoordinatorDocument::parse(
IDLParserErrorContext("reshardingCoordinatorDoc"), args.updateArgs.updatedDoc);
- auto reshardingId =
- BSON(ReshardingCoordinatorDocument::k_idFieldName << newCoordinatorDoc.get_id());
+ auto reshardingId = BSON(ReshardingCoordinatorDocument::kReshardingUUIDFieldName
+ << newCoordinatorDoc.getReshardingUUID());
auto observer = getReshardingCoordinatorObserver(opCtx, reshardingId);
opCtx->recoveryUnit()->onCommit(
[observer = std::move(observer), newCoordinatorDoc = std::move(newCoordinatorDoc)](
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl b/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl
index 03b997873b4..6ba2c9a0305 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl
@@ -40,6 +40,10 @@ imports:
structs:
ReshardingOplogApplierProgress:
description: "Used for storing the progress made by the resharding oplog applier."
+ # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
+ # required for resharding because durable state from all resharding operations is cleaned up
+ # before the upgrade or downgrade can complete.
+ strict: false
fields:
_id:
type: ReshardingSourceId
@@ -50,4 +54,3 @@ structs:
description: >-
The minimum point where the resharding oplog applier can start without missing
any oplog it needs to process.
-
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 54259a87643..117d0fffea4 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -217,10 +217,16 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::
ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
const BSONObj& recipientDoc)
+ : RecipientStateMachine(
+ ReshardingRecipientDocument::parse({"RecipientStateMachine"}, recipientDoc)) {}
+
+ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
+ const ReshardingRecipientDocument& recipientDoc)
: repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(),
- _recipientDoc(ReshardingRecipientDocument::parse(
- IDLParserErrorContext("ReshardingRecipientDocument"), recipientDoc)),
- _id(_recipientDoc.getCommonReshardingMetadata().get_id()) {}
+ _metadata{recipientDoc.getCommonReshardingMetadata()},
+ _donorShardIds{recipientDoc.getDonorShards()},
+ _recipientCtx{recipientDoc.getMutableState()},
+ _fetchTimestamp{recipientDoc.getFetchTimestamp()} {}
ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
@@ -252,16 +258,17 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
.onError([this](Status status) {
LOGV2(4956500,
"Resharding operation recipient state machine failed",
- "namespace"_attr = _recipientDoc.getNss().ns(),
- "reshardingId"_attr = _id,
+ "namespace"_attr = _metadata.getSourceNss(),
+ "reshardingUUID"_attr = _metadata.getReshardingUUID(),
"error"_attr = status);
- _transitionState(RecipientStateEnum::kError, boost::none, status);
+
+ _transitionToError(status);
_updateCoordinator();
// TODO SERVER-52838: Ensure all local collections that may have been created for
// resharding are removed, with the exception of the ReshardingRecipientDocument, before
// transitioning to kDone.
- _transitionState(RecipientStateEnum::kDone, boost::none, status);
+ _transitionState(RecipientStateEnum::kDone);
_updateCoordinator();
return status;
})
@@ -319,9 +326,9 @@ boost::optional<BSONObj> ReshardingRecipientService::RecipientStateMachine::repo
MongoProcessInterface::CurrentOpConnectionsMode,
MongoProcessInterface::CurrentOpSessionsMode) noexcept {
ReshardingMetrics::ReporterOptions options(ReshardingMetrics::ReporterOptions::Role::kRecipient,
- _id,
- _recipientDoc.getNss(),
- _recipientDoc.getReshardingKey().toBSON(),
+ _metadata.getReshardingUUID(),
+ _metadata.getSourceNss(),
+ _metadata.getReshardingKey().toBSON(),
false);
return _metrics()->reportForCurrentOp(options);
}
@@ -351,47 +358,44 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- if (_recipientDoc.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
- invariant(_recipientDoc.getFetchTimestamp());
+ if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
+ invariant(_fetchTimestamp);
return ExecutorFuture(**executor);
}
return _allDonorsPreparedToDonate.getFuture()
.thenRunOn(**executor)
- .then([this](Timestamp fetchTimestamp) {
- _transitionState(RecipientStateEnum::kCreatingCollection, fetchTimestamp);
- });
+ .then(
+ [this](Timestamp fetchTimestamp) { _transitionToCreatingCollection(fetchTimestamp); });
}
void ReshardingRecipientService::RecipientStateMachine::
_createTemporaryReshardingCollectionThenTransitionToCloning() {
- if (_recipientDoc.getState() > RecipientStateEnum::kCreatingCollection) {
+ if (_recipientCtx.getState() > RecipientStateEnum::kCreatingCollection) {
return;
}
{
auto opCtx = cc().makeOperationContext();
- auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
- _recipientDoc.getExistingUUID());
resharding::createTemporaryReshardingCollectionLocally(opCtx.get(),
- _recipientDoc.getNss(),
- tempNss,
- _recipientDoc.get_id(),
- _recipientDoc.getExistingUUID(),
- *_recipientDoc.getFetchTimestamp());
+ _metadata.getSourceNss(),
+ _metadata.getTempReshardingNss(),
+ _metadata.getReshardingUUID(),
+ _metadata.getSourceUUID(),
+ *_fetchTimestamp);
- ShardKeyPattern shardKeyPattern(_recipientDoc.getReshardingKey());
+ ShardKeyPattern shardKeyPattern{_metadata.getReshardingKey()};
auto catalogCache = Grid::get(opCtx.get())->catalogCache();
shardVersionRetry(opCtx.get(),
catalogCache,
- tempNss,
+ _metadata.getTempReshardingNss(),
"validating shard key index for reshardCollection"_sd,
[&] {
shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible(
opCtx.get(),
- tempNss,
+ _metadata.getTempReshardingNss(),
shardKeyPattern.toBSON(),
shardKeyPattern,
CollationSpec::kSimpleSpec,
@@ -406,7 +410,8 @@ void ReshardingRecipientService::RecipientStateMachine::
void ReshardingRecipientService::RecipientStateMachine::_initTxnCloner(
OperationContext* opCtx, const Timestamp& fetchTimestamp) {
auto catalogCache = Grid::get(opCtx)->catalogCache();
- auto routingInfo = catalogCache->getShardedCollectionRoutingInfo(opCtx, _recipientDoc.getNss());
+ auto routingInfo =
+ catalogCache->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss());
std::set<ShardId> shardList;
const auto myShardId = ShardingState::get(opCtx)->shardId();
@@ -414,8 +419,8 @@ void ReshardingRecipientService::RecipientStateMachine::_initTxnCloner(
shardList.erase(myShardId);
for (const auto& shard : shardList) {
- _txnCloners.push_back(
- std::make_unique<ReshardingTxnCloner>(ReshardingSourceId(_id, shard), fetchTimestamp));
+ _txnCloners.push_back(std::make_unique<ReshardingTxnCloner>(
+ ReshardingSourceId(_metadata.getReshardingUUID(), shard), fetchTimestamp));
}
}
@@ -423,32 +428,30 @@ ExecutorFuture<void>
ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancelationToken& cancelToken) {
- if (_recipientDoc.getState() > RecipientStateEnum::kCloning) {
+ if (_recipientCtx.getState() > RecipientStateEnum::kCloning) {
return ExecutorFuture(**executor);
}
auto* serviceContext = Client::getCurrent()->getServiceContext();
- auto fetchTimestamp = *_recipientDoc.getFetchTimestamp();
- auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
- _recipientDoc.getExistingUUID());
+ auto fetchTimestamp = *_fetchTimestamp;
_collectionCloner = std::make_unique<ReshardingCollectionCloner>(
std::make_unique<ReshardingCollectionCloner::Env>(_metrics()),
- ShardKeyPattern(_recipientDoc.getReshardingKey()),
- _recipientDoc.getNss(),
- _recipientDoc.getExistingUUID(),
+ ShardKeyPattern{_metadata.getReshardingKey()},
+ _metadata.getSourceNss(),
+ _metadata.getSourceUUID(),
ShardingState::get(serviceContext)->shardId(),
fetchTimestamp,
- std::move(tempNss));
+ _metadata.getTempReshardingNss());
{
auto scopedOpCtx = cc().makeOperationContext();
auto opCtx = scopedOpCtx.get();
- _initTxnCloner(opCtx, *_recipientDoc.getFetchTimestamp());
+ _initTxnCloner(opCtx, *_fetchTimestamp);
}
- auto numDonors = _recipientDoc.getDonorShards().size();
+ auto numDonors = _donorShardIds.size();
_oplogFetchers.reserve(numDonors);
_oplogFetcherFutures.reserve(numDonors);
@@ -458,8 +461,8 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
}
const auto& recipientId = ShardingState::get(serviceContext)->shardId();
- for (const auto& donor : _recipientDoc.getDonorShards()) {
- auto oplogBufferNss = getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor);
+ for (const auto& donor : _donorShardIds) {
+ auto oplogBufferNss = getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor);
auto opCtx = cc().makeOperationContext();
auto idToResumeFrom =
resharding::getFetcherIdToResumeFrom(opCtx.get(), oplogBufferNss, fetchTimestamp);
@@ -468,8 +471,8 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
stdx::lock_guard<Latch> lk(_mutex);
_oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>(
std::make_unique<ReshardingOplogFetcher::Env>(getGlobalServiceContext(), _metrics()),
- _recipientDoc.get_id(),
- _recipientDoc.getExistingUUID(),
+ _metadata.getReshardingUUID(),
+ _metadata.getSourceUUID(),
// The recipient fetches oplog entries from the donor starting from the largest _id
// value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds
// to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
@@ -511,7 +514,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
}
void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSteadyState() {
- if (_recipientDoc.getState() > RecipientStateEnum::kApplying) {
+ if (_recipientCtx.getState() > RecipientStateEnum::kApplying) {
return;
}
@@ -532,31 +535,29 @@ void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSt
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- if (_recipientDoc.getState() > RecipientStateEnum::kSteadyState) {
+ if (_recipientCtx.getState() > RecipientStateEnum::kSteadyState) {
return ExecutorFuture<void>(**executor, Status::OK());
}
- auto numDonors = _recipientDoc.getDonorShards().size();
+ auto numDonors = _donorShardIds.size();
_oplogAppliers.reserve(numDonors);
_oplogApplierWorkers.reserve(numDonors);
const auto& sourceChunkMgr = [&] {
auto opCtx = cc().makeOperationContext();
auto catalogCache = Grid::get(opCtx.get())->catalogCache();
- return catalogCache->getShardedCollectionRoutingInfo(opCtx.get(), _recipientDoc.getNss());
+ return catalogCache->getShardedCollectionRoutingInfo(opCtx.get(), _metadata.getSourceNss());
}();
auto stashCollections = [&] {
auto opCtx = cc().makeOperationContext();
- return resharding::ensureStashCollectionsExist(opCtx.get(),
- sourceChunkMgr,
- _recipientDoc.getExistingUUID(),
- _recipientDoc.getDonorShards());
+ return resharding::ensureStashCollectionsExist(
+ opCtx.get(), sourceChunkMgr, _metadata.getSourceUUID(), _donorShardIds);
}();
auto futuresToWaitOn = std::move(_oplogFetcherFutures);
- for (size_t donorIdx = 0; donorIdx < _recipientDoc.getDonorShards().size(); ++donorIdx) {
- const auto& donor = _recipientDoc.getDonorShards()[donorIdx];
+ for (size_t donorIdx = 0; donorIdx < _donorShardIds.size(); ++donorIdx) {
+ const auto& donor = _donorShardIds[donorIdx];
{
stdx::lock_guard<Latch> lk(_mutex);
_oplogApplierWorkers.emplace_back(
@@ -565,10 +566,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
true /* isKillableByStepdown */));
}
- auto sourceId = ReshardingSourceId{_recipientDoc.get_id(), donor};
- const auto& oplogBufferNss =
- getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor);
- auto fetchTimestamp = *_recipientDoc.getFetchTimestamp();
+ auto sourceId = ReshardingSourceId{_metadata.getReshardingUUID(), donor};
+ const auto& oplogBufferNss = getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor);
+ auto fetchTimestamp = *_fetchTimestamp;
auto idToResumeFrom = [&] {
auto opCtx = cc().makeOperationContext();
return resharding::getApplierIdToResumeFrom(opCtx.get(), sourceId, fetchTimestamp);
@@ -580,8 +580,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_metrics()),
std::move(sourceId),
oplogBufferNss,
- _recipientDoc.getNss(),
- _recipientDoc.getExistingUUID(),
+ _metadata.getSourceNss(),
+ _metadata.getSourceUUID(),
stashCollections,
donorIdx,
fetchTimestamp,
@@ -606,7 +606,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
return whenAllSucceed(std::move(futuresToWaitOn))
.thenRunOn(**executor)
- .then([this, stashCollections] {
+ .then([stashCollections] {
auto opCtxRaii = cc().makeOperationContext();
for (auto&& stashNss : stashCollections) {
@@ -619,7 +619,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
.then([this] {
_transitionState(RecipientStateEnum::kStrictConsistency);
- bool isDonor = [& id = _recipientDoc.get_id()] {
+ bool isDonor = [& id = _metadata.getReshardingUUID()] {
auto opCtx = cc().makeOperationContext();
auto instance = resharding::tryGetReshardingStateMachine<
ReshardingDonorService,
@@ -630,7 +630,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
}();
if (!isDonor) {
- _critSec.emplace(cc().getServiceContext(), _recipientDoc.getNss());
+ _critSec.emplace(cc().getServiceContext(), _metadata.getSourceNss());
}
_updateCoordinator();
@@ -640,7 +640,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- if (_recipientDoc.getState() > RecipientStateEnum::kStrictConsistency) {
+ if (_recipientCtx.getState() > RecipientStateEnum::kStrictConsistency) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -650,20 +650,17 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
}
void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardingCollection() {
- if (_recipientDoc.getState() > RecipientStateEnum::kRenaming) {
+ if (_recipientCtx.getState() > RecipientStateEnum::kRenaming) {
return;
}
{
auto opCtx = cc().makeOperationContext();
- auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
- _recipientDoc.getExistingUUID());
-
RenameCollectionOptions options;
options.dropTarget = true;
- uassertStatusOK(
- renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options));
+ uassertStatusOK(renameCollection(
+ opCtx.get(), _metadata.getTempReshardingNss(), _metadata.getSourceNss(), options));
_dropOplogCollections(opCtx.get());
@@ -675,30 +672,47 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi
}
void ReshardingRecipientService::RecipientStateMachine::_transitionState(
- RecipientStateEnum endState,
- boost::optional<Timestamp> fetchTimestamp,
- boost::optional<Status> abortReason) {
- invariant(endState != RecipientStateEnum::kAwaitingFetchTimestamp);
- ReshardingRecipientDocument replacementDoc(_recipientDoc);
- replacementDoc.setState(endState);
+ RecipientStateEnum newState) {
+ invariant(newState != RecipientStateEnum::kCreatingCollection &&
+ newState != RecipientStateEnum::kError);
+
+ auto newRecipientCtx = _recipientCtx;
+ newRecipientCtx.setState(newState);
+ _transitionState(std::move(newRecipientCtx), boost::none);
+}
- emplaceFetchTimestampIfExists(replacementDoc, std::move(fetchTimestamp));
- emplaceAbortReasonIfExists(replacementDoc, std::move(abortReason));
+void ReshardingRecipientService::RecipientStateMachine::_transitionState(
+ RecipientShardContext&& newRecipientCtx, boost::optional<Timestamp>&& fetchTimestamp) {
+ invariant(newRecipientCtx.getState() != RecipientStateEnum::kAwaitingFetchTimestamp);
// For logging purposes.
- auto oldState = _recipientDoc.getState();
- auto newState = replacementDoc.getState();
+ auto oldState = _recipientCtx.getState();
+ auto newState = newRecipientCtx.getState();
- _updateRecipientDocument(std::move(replacementDoc));
- _metrics()->setRecipientState(endState);
+ _updateRecipientDocument(std::move(newRecipientCtx), std::move(fetchTimestamp));
+ _metrics()->setRecipientState(newState);
LOGV2_INFO(5279506,
"Transitioned resharding recipient state",
"newState"_attr = RecipientState_serializer(newState),
"oldState"_attr = RecipientState_serializer(oldState),
- "ns"_attr = _recipientDoc.getNss(),
- "collectionUUID"_attr = _recipientDoc.getExistingUUID(),
- "reshardingUUID"_attr = _recipientDoc.get_id());
+ "namespace"_attr = _metadata.getSourceNss(),
+ "collectionUUID"_attr = _metadata.getSourceUUID(),
+ "reshardingUUID"_attr = _metadata.getReshardingUUID());
+}
+
+void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCollection(
+ Timestamp fetchTimestamp) {
+ auto newRecipientCtx = _recipientCtx;
+ newRecipientCtx.setState(RecipientStateEnum::kCreatingCollection);
+ _transitionState(std::move(newRecipientCtx), fetchTimestamp);
+}
+
+void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) {
+ auto newRecipientCtx = _recipientCtx;
+ newRecipientCtx.setState(RecipientStateEnum::kError);
+ emplaceAbortReasonIfExists(newRecipientCtx, abortReason);
+ _transitionState(std::move(newRecipientCtx), boost::none);
}
void ReshardingRecipientService::RecipientStateMachine::_updateCoordinator() {
@@ -707,21 +721,14 @@ void ReshardingRecipientService::RecipientStateMachine::_updateCoordinator() {
auto shardId = ShardingState::get(opCtx.get())->shardId();
- BSONObjBuilder updateBuilder;
- updateBuilder.append("recipientShards.$.state",
- RecipientState_serializer(_recipientDoc.getState()));
- if (_recipientDoc.getAbortReason()) {
- updateBuilder.append("recipientShards.$.abortReason", _recipientDoc.getAbortReason().get());
- }
-
uassertStatusOK(
Grid::get(opCtx.get())
->catalogClient()
->updateConfigDocument(
opCtx.get(),
NamespaceString::kConfigReshardingOperationsNamespace,
- BSON("_id" << _recipientDoc.get_id() << "recipientShards.id" << shardId),
- BSON("$set" << updateBuilder.done()),
+ BSON("_id" << _metadata.getReshardingUUID() << "recipientShards.id" << shardId),
+ BSON("$set" << BSON("recipientShards.$.mutableState" << _recipientCtx.toBSON())),
false /* upsert */,
ShardingCatalogClient::kMajorityWriteConcern));
}
@@ -734,16 +741,34 @@ void ReshardingRecipientService::RecipientStateMachine::insertStateDocument(
}
void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument(
- ReshardingRecipientDocument&& replacementDoc) {
+ RecipientShardContext&& newRecipientCtx, boost::optional<Timestamp>&& fetchTimestamp) {
auto opCtx = cc().makeOperationContext();
PersistentTaskStore<ReshardingRecipientDocument> store(
NamespaceString::kRecipientReshardingOperationsNamespace);
+
+ BSONObjBuilder updateBuilder;
+ {
+ BSONObjBuilder setBuilder(updateBuilder.subobjStart("$set"));
+ setBuilder.append(ReshardingRecipientDocument::kMutableStateFieldName,
+ newRecipientCtx.toBSON());
+
+ if (fetchTimestamp) {
+ setBuilder.append(ReshardingRecipientDocument::kFetchTimestampFieldName,
+ *fetchTimestamp);
+ }
+ }
+
store.update(opCtx.get(),
- BSON(ReshardingRecipientDocument::k_idFieldName << _id),
- replacementDoc.toBSON(),
+ BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName
+ << _metadata.getReshardingUUID()),
+ updateBuilder.done(),
WriteConcerns::kMajorityWriteConcern);
- _recipientDoc = replacementDoc;
+ _recipientCtx = newRecipientCtx;
+
+ if (fetchTimestamp) {
+ _fetchTimestamp = fetchTimestamp;
+ }
}
void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument() {
@@ -751,15 +776,15 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument
PersistentTaskStore<ReshardingRecipientDocument> store(
NamespaceString::kRecipientReshardingOperationsNamespace);
store.remove(opCtx.get(),
- BSON(ReshardingRecipientDocument::k_idFieldName << _id),
+ BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName
+ << _metadata.getReshardingUUID()),
WriteConcerns::kMajorityWriteConcern);
- _recipientDoc = {};
}
void ReshardingRecipientService::RecipientStateMachine::_dropOplogCollections(
OperationContext* opCtx) {
- for (const auto& donor : _recipientDoc.getDonorShards()) {
- auto reshardingSourceId = ReshardingSourceId{_recipientDoc.get_id(), donor};
+ for (const auto& donor : _donorShardIds) {
+ auto reshardingSourceId = ReshardingSourceId{_metadata.getReshardingUUID(), donor};
// Remove the oplog applier progress doc for this donor.
PersistentTaskStore<ReshardingOplogApplierProgress> oplogApplierProgressStore(
@@ -779,11 +804,11 @@ void ReshardingRecipientService::RecipientStateMachine::_dropOplogCollections(
WriteConcernOptions());
// Drop the conflict stash collection for this donor.
- auto stashNss = getLocalConflictStashNamespace(_recipientDoc.getExistingUUID(), donor);
+ auto stashNss = getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor);
resharding::data_copy::ensureCollectionDropped(opCtx, stashNss);
// Drop the oplog buffer collection for this donor.
- auto oplogBufferNss = getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor);
+ auto oplogBufferNss = getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor);
resharding::data_copy::ensureCollectionDropped(opCtx, oplogBufferNss);
}
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 59e41df7b63..a5fab606b8b 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -133,6 +133,8 @@ public:
const ReshardingRecipientDocument& recipientDoc);
private:
+ RecipientStateMachine(const ReshardingRecipientDocument& recipientDoc);
+
// The following functions correspond to the actions to take at a particular recipient state.
ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
@@ -153,17 +155,26 @@ private:
void _renameTemporaryReshardingCollection();
- // Transitions the state on-disk and in-memory to 'endState'.
- void _transitionState(RecipientStateEnum endState,
- boost::optional<Timestamp> fetchTimestamp = boost::none,
- boost::optional<Status> abortReason = boost::none);
+ // Transitions the on-disk and in-memory state to 'newState'.
+ void _transitionState(RecipientStateEnum newState);
+
+ void _transitionState(RecipientShardContext&& newRecipientCtx,
+ boost::optional<Timestamp>&& fetchTimestamp);
+
+ // Transitions the on-disk and in-memory state to RecipientStateEnum::kCreatingCollection.
+ void _transitionToCreatingCollection(Timestamp fetchTimestamp);
+
+ // Transitions the on-disk and in-memory state to RecipientStateEnum::kError.
+ void _transitionToError(Status abortReason);
void _updateCoordinator();
- // Updates the recipient document on-disk and in-memory with the 'replacementDoc.'
- void _updateRecipientDocument(ReshardingRecipientDocument&& replacementDoc);
+ // Updates the mutable portion of the on-disk and in-memory recipient document with
+ // 'newRecipientCtx' and 'fetchTimestamp'.
+ void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx,
+ boost::optional<Timestamp>&& fetchTimestamp);
- // Removes the local recipient document from disk and clears the in-memory state.
+ // Removes the local recipient document from disk.
void _removeRecipientDocument();
// Removes any docs from the oplog applier progress and txn applier progress collections that
@@ -180,12 +191,15 @@ private:
// (abort resharding).
void _onAbortOrStepdown(WithLock, Status status);
- // The in-memory representation of the underlying document in
+ // The in-memory representation of the immutable portion of the document in
// config.localReshardingOperations.recipient.
- ReshardingRecipientDocument _recipientDoc;
+ const CommonReshardingMetadata _metadata;
+ const std::vector<ShardId> _donorShardIds;
- // The id both for the resharding operation and for the primary-only-service instance.
- const UUID _id;
+ // The in-memory representation of the mutable portion of the document in
+ // config.localReshardingOperations.recipient.
+ RecipientShardContext _recipientCtx;
+ boost::optional<Timestamp> _fetchTimestamp;
std::unique_ptr<ReshardingCollectionCloner> _collectionCloner;
std::vector<std::unique_ptr<ReshardingTxnCloner>> _txnCloners;
@@ -200,7 +214,7 @@ private:
std::vector<ExecutorFuture<void>> _oplogFetcherFutures;
// Protects the promises below
- Mutex _mutex = MONGO_MAKE_LATCH("ReshardingRecipient::_mutex");
+ Mutex _mutex = MONGO_MAKE_LATCH("RecipientStateMachine::_mutex");
boost::optional<ReshardingCriticalSection> _critSec;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 178e71158cc..5d73cd21ddd 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -147,10 +147,10 @@ public:
coll.setDefaultCollation(collation);
TypeCollectionReshardingFields reshardingFields;
- reshardingFields.setUuid(uuid);
+ reshardingFields.setReshardingUUID(uuid);
TypeCollectionRecipientFields recipientFields;
- recipientFields.setOriginalNamespace(origNss);
- recipientFields.setExistingUUID(uuid);
+ recipientFields.setSourceNss(origNss);
+ recipientFields.setSourceUUID(uuid);
// Populating the set of donor shard ids isn't necessary to test the functionality of
// creating the temporary resharding collection.
recipientFields.setDonorShardIds({});
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl b/src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl
index 9c546b6a597..640dda5f651 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner_progress.idl
@@ -41,6 +41,10 @@ imports:
structs:
ReshardingTxnClonerProgress:
description: "Used for storing the progress made by the resharding transaction cloner."
+ # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
+ # required for resharding because durable state from all resharding operations is cleaned up
+ # before the upgrade or downgrade can complete.
+ strict: false
fields:
_id:
type: ReshardingSourceId
@@ -51,4 +55,3 @@ structs:
description: >-
The minimum point where the resharding transaction cloner can start without
missing any config.transactions entries it needs to process.
-
diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp
index d625242aa0d..7d932c4eb0e 100644
--- a/src/mongo/db/s/resharding_destined_recipient_test.cpp
+++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp
@@ -200,8 +200,8 @@ protected:
DatabaseType db(kNss.db().toString(), kShardList[0].getName(), true, env.dbVersion);
TypeCollectionReshardingFields reshardingFields;
- reshardingFields.setUuid(UUID::gen());
- reshardingFields.setDonorFields(TypeCollectionDonorFields{BSON("y" << 1)});
+ reshardingFields.setReshardingUUID(UUID::gen());
+ reshardingFields.setDonorFields(TypeCollectionDonorFields{env.tempNss, BSON("y" << 1)});
reshardingFields.setState(CoordinatorStateEnum::kPreparingToDonate);
CollectionType coll(kNss, env.version.epoch(), Date_t::now(), UUID::gen());
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index c383bf5beab..1aeb6e36014 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -90,22 +90,22 @@ DonorShardEntry makeDonorShard(ShardId shardId,
DonorStateEnum donorState,
boost::optional<Timestamp> minFetchTimestamp,
boost::optional<Status> abortReason) {
- DonorShardEntry entry(shardId);
- entry.setState(donorState);
- emplaceMinFetchTimestampIfExists(entry, minFetchTimestamp);
- emplaceAbortReasonIfExists(entry, abortReason);
- return entry;
+ DonorShardContext donorCtx;
+ donorCtx.setState(donorState);
+ emplaceMinFetchTimestampIfExists(donorCtx, minFetchTimestamp);
+ emplaceAbortReasonIfExists(donorCtx, abortReason);
+
+ return DonorShardEntry{std::move(shardId), std::move(donorCtx)};
}
RecipientShardEntry makeRecipientShard(ShardId shardId,
RecipientStateEnum recipientState,
- boost::optional<Timestamp> strictConsistencyTimestamp,
boost::optional<Status> abortReason) {
- RecipientShardEntry entry(shardId);
- entry.setState(recipientState);
- emplaceStrictConsistencyTimestampIfExists(entry, strictConsistencyTimestamp);
- emplaceAbortReasonIfExists(entry, abortReason);
- return entry;
+ RecipientShardContext recipientCtx;
+ recipientCtx.setState(recipientState);
+ emplaceAbortReasonIfExists(recipientCtx, abortReason);
+
+ return RecipientShardEntry{std::move(shardId), std::move(recipientCtx)};
}
UUID getCollectionUUIDFromChunkManger(const NamespaceString& originalNss, const ChunkManager& cm) {
@@ -185,7 +185,7 @@ Timestamp getHighestMinFetchTimestamp(const std::vector<DonorShardEntry>& donorS
auto maxMinFetchTimestamp = Timestamp::min();
for (auto& donor : donorShards) {
- auto donorFetchTimestamp = donor.getMinFetchTimestamp();
+ auto donorFetchTimestamp = donor.getMutableState().getMinFetchTimestamp();
uassert(4957300,
"All donors must have a minFetchTimestamp, but donor {} does not."_format(
donor.getId()),
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 007202ca4ab..f9f0109d05f 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -91,32 +91,7 @@ void emplaceMinFetchTimestampIfExists(ClassWithMinFetchTimestamp& c,
invariant(minFetchTimestamp == alreadyExistingMinFetchTimestamp);
}
- MinFetchTimestamp minFetchTimestampStruct;
- minFetchTimestampStruct.setMinFetchTimestamp(std::move(minFetchTimestamp));
- c.setMinFetchTimestampStruct(std::move(minFetchTimestampStruct));
-}
-
-/**
- * Emplaces the 'strictConsistencyTimestamp' onto the ClassWithStrictConsistencyTimestamp if the
- * timestamp has been emplaced inside the boost::optional.
- */
-template <class ClassWithStrictConsistencyTimestamp>
-void emplaceStrictConsistencyTimestampIfExists(
- ClassWithStrictConsistencyTimestamp& c, boost::optional<Timestamp> strictConsistencyTimestamp) {
- if (!strictConsistencyTimestamp) {
- return;
- }
-
- invariant(!strictConsistencyTimestamp->isNull());
-
- if (auto alreadyExistingStrictConsistencyTimestamp = c.getStrictConsistencyTimestamp()) {
- invariant(strictConsistencyTimestamp == alreadyExistingStrictConsistencyTimestamp);
- }
-
- StrictConsistencyTimestamp strictConsistencyTimestampStruct;
- strictConsistencyTimestampStruct.setStrictConsistencyTimestamp(
- std::move(strictConsistencyTimestamp));
- c.setStrictConsistencyTimestampStruct(std::move(strictConsistencyTimestampStruct));
+ c.setMinFetchTimestamp(std::move(minFetchTimestamp));
}
/**
@@ -173,11 +148,9 @@ DonorShardEntry makeDonorShard(ShardId shardId,
/**
* Helper method to construct a RecipientShardEntry with the fields specified.
*/
-RecipientShardEntry makeRecipientShard(
- ShardId shardId,
- RecipientStateEnum recipientState,
- boost::optional<Timestamp> strictConsistencyTimestamp = boost::none,
- boost::optional<Status> abortReason = boost::none);
+RecipientShardEntry makeRecipientShard(ShardId shardId,
+ RecipientStateEnum recipientState,
+ boost::optional<Status> abortReason = boost::none);
/**
* Gets the UUID for 'nss' from the 'cm'
diff --git a/src/mongo/db/s/type_shard_collection_test.cpp b/src/mongo/db/s/type_shard_collection_test.cpp
index aa6fefd5214..3a62e513af3 100644
--- a/src/mongo/db/s/type_shard_collection_test.cpp
+++ b/src/mongo/db/s/type_shard_collection_test.cpp
@@ -97,8 +97,8 @@ TEST(ShardCollectionType, ReshardingFieldsIncluded) {
ShardCollectionType shardCollType(kNss, OID::gen(), UUID::gen(), kKeyPattern, true);
TypeCollectionReshardingFields reshardingFields;
- const auto reshardingUuid = UUID::gen();
- reshardingFields.setUuid(reshardingUuid);
+ const auto reshardingUUID = UUID::gen();
+ reshardingFields.setReshardingUUID(reshardingUUID);
shardCollType.setReshardingFields(std::move(reshardingFields));
BSONObj obj = shardCollType.toBSON();
@@ -106,7 +106,7 @@ TEST(ShardCollectionType, ReshardingFieldsIncluded) {
ShardCollectionType shardCollTypeFromBSON(obj);
ASSERT(shardCollType.getReshardingFields());
- ASSERT_EQ(reshardingUuid, shardCollType.getReshardingFields()->getUuid());
+ ASSERT_EQ(reshardingUUID, shardCollType.getReshardingFields()->getReshardingUUID());
}
TEST(ShardCollectionType, AllowMigrationsFieldBackwardsCompatibility) {