diff options
34 files changed, 194 insertions, 164 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index fcd7c965b37..bc1707620e4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1421,7 +1421,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) { TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUpLegacyFormat) { auto existingUuid = UUID::gen(); auto reshardingUuid = UUID::gen(); - auto temporaryNs = constructTemporaryReshardingNss(nss.db(), existingUuid); + auto temporaryNs = resharding::constructTemporaryReshardingNss(nss.db(), existingUuid); const auto o2FieldInLegacyFormat = BSON("type" << "reshardDoneCatchUp" @@ -1460,7 +1460,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUpLegacyFormat) { TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) { auto existingUuid = UUID::gen(); auto reshardingUuid = UUID::gen(); - auto temporaryNs = constructTemporaryReshardingNss(nss.db(), existingUuid); + auto temporaryNs = resharding::constructTemporaryReshardingNss(nss.db(), existingUuid); ReshardDoneCatchUpChangeEventO2Field o2Field{temporaryNs, reshardingUuid}; auto reshardDoneCatchUp = makeOplogEntry(OpTypeEnum::kNoop, diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index 8f789549796..c9ed5d77272 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -125,7 +125,7 @@ protected: reshardingFields.setRecipientFields(std::move(recipientFields)); } else if (state == CoordinatorStateEnum::kBlockingWrites) { TypeCollectionDonorFields donorFields{ - constructTemporaryReshardingNss(kNss.db(), existingUuid), + resharding::constructTemporaryReshardingNss(kNss.db(), existingUuid), KeyPattern{BSON("newKey" << 1)}, {kThisShard, kOtherShard}}; reshardingFields.setDonorFields(std::move(donorFields)); diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index 7f284e2c642..1a094c7db5f 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -63,8 +63,9 @@ getExistingInstanceToJoin(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& newShardKey) { auto instances = - getReshardingStateMachines<ReshardingCoordinatorService, - ReshardingCoordinatorService::ReshardingCoordinator>(opCtx, nss); + resharding::getReshardingStateMachines<ReshardingCoordinatorService, + ReshardingCoordinatorService::ReshardingCoordinator>( + opCtx, nss); for (const auto& instance : instances) { if (SimpleBSONObjComparator::kInstance.evaluate( instance->getMetadata().getReshardingKey().toBSON() == newShardKey)) { @@ -139,7 +140,7 @@ public: "Must specify only one of _presetReshardedChunks or numInitialChunks", !(bool(request().getNumInitialChunks()))); - validateReshardedChunks( + resharding::validateReshardedChunks( *presetChunks, opCtx, ShardKeyPattern(request().getKey()).getKeyPattern()); } @@ -183,11 +184,12 @@ public: return boost::none; } - auto tempReshardingNss = constructTemporaryReshardingNss(nss.db(), cm.getUUID()); + auto tempReshardingNss = + resharding::constructTemporaryReshardingNss(nss.db(), cm.getUUID()); if (auto zones = request().getZones()) { - checkForOverlappingZones(*zones); + resharding::checkForOverlappingZones(*zones); } auto coordinatorDoc = diff --git a/src/mongo/db/s/flush_resharding_state_change_command.cpp b/src/mongo/db/s/flush_resharding_state_change_command.cpp index 85f0c42cff0..95439564643 100644 --- a/src/mongo/db/s/flush_resharding_state_change_command.cpp +++ b/src/mongo/db/s/flush_resharding_state_change_command.cpp @@ -131,7 +131,7 @@ public: .getAsync([](auto) {}); // Ensure the command isn't run on a stale primary. - doNoopWrite(opCtx, "_flushReshardingStateChange no-op", ns()); + resharding::doNoopWrite(opCtx, "_flushReshardingStateChange no-op", ns()); } }; } _flushReshardingStateChange; diff --git a/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp b/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp index aaeb6180654..dc3176cf3e8 100644 --- a/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp +++ b/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp @@ -117,7 +117,8 @@ DocumentSource::GetModPathsReturn DocumentSourceReshardingOwnershipMatch::getMod DocumentSource::GetNextResult DocumentSourceReshardingOwnershipMatch::doGetNext() { if (!_tempReshardingChunkMgr) { // TODO: Actually propagate the temporary resharding namespace from the recipient. - auto tempReshardingNss = constructTemporaryReshardingNss(pExpCtx->ns.db(), *pExpCtx->uuid); + auto tempReshardingNss = + resharding::constructTemporaryReshardingNss(pExpCtx->ns.db(), *pExpCtx->uuid); auto* catalogCache = Grid::get(pExpCtx->opCtx)->catalogCache(); _tempReshardingChunkMgr = diff --git a/src/mongo/db/s/resharding/resharding_agg_test.cpp b/src/mongo/db/s/resharding/resharding_agg_test.cpp index ce8d110e5ab..c49467f79f9 100644 --- a/src/mongo/db/s/resharding/resharding_agg_test.cpp +++ b/src/mongo/db/s/resharding/resharding_agg_test.cpp @@ -362,7 +362,7 @@ protected: expCtx->ns = kRemoteOplogNss; expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(pipelineSource); - auto pipeline = createOplogFetchingPipelineForResharding( + auto pipeline = resharding::createOplogFetchingPipelineForResharding( expCtx, ReshardingDonorOplogId(Timestamp::min(), Timestamp::min()), _reshardingCollUUID, @@ -524,13 +524,14 @@ TEST_F(ReshardingAggTest, VerifyPipelineOutputHasOplogSchema) { expCtx->ns = kRemoteOplogNss; expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(pipelineSource); - std::unique_ptr<Pipeline, PipelineDeleter> pipeline = createOplogFetchingPipelineForResharding( - expCtx, - // Use the test to also exercise the stages for resuming. The timestamp passed in is - // excluded from the results. - ReshardingDonorOplogId(insertOplog.getTimestamp(), insertOplog.getTimestamp()), - _reshardingCollUUID, - {_destinedRecipient}); + std::unique_ptr<Pipeline, PipelineDeleter> pipeline = + resharding::createOplogFetchingPipelineForResharding( + expCtx, + // Use the test to also exercise the stages for resuming. The timestamp passed in is + // excluded from the results. + ReshardingDonorOplogId(insertOplog.getTimestamp(), insertOplog.getTimestamp()), + _reshardingCollUUID, + {_destinedRecipient}); auto bsonPipeline = pipeline->serializeToBson(); if (debug) { std::cout << "Pipeline stages:" << std::endl; @@ -624,11 +625,12 @@ TEST_F(ReshardingAggTest, VerifyPipelinePreparedTxn) { expCtx->ns = kRemoteOplogNss; expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(pipelineSource); - std::unique_ptr<Pipeline, PipelineDeleter> pipeline = createOplogFetchingPipelineForResharding( - expCtx, - ReshardingDonorOplogId(Timestamp::min(), Timestamp::min()), - _reshardingCollUUID, - {_destinedRecipient}); + std::unique_ptr<Pipeline, PipelineDeleter> pipeline = + resharding::createOplogFetchingPipelineForResharding( + expCtx, + ReshardingDonorOplogId(Timestamp::min(), Timestamp::min()), + _reshardingCollUUID, + {_destinedRecipient}); if (debug) { std::cout << "Pipeline stages:" << std::endl; // This is can be changed to process a prefix of the pipeline for debugging. @@ -1476,7 +1478,7 @@ TEST_F(ReshardingAggWithStorageTest, RetryableFindAndModifyWithImageLookup) { expCtx->mongoProcessInterface = std::move(mockMongoInterface); } - auto pipeline = createOplogFetchingPipelineForResharding( + auto pipeline = resharding::createOplogFetchingPipelineForResharding( expCtx, ReshardingDonorOplogId(Timestamp::min(), Timestamp::min()), kCrudUUID, kMyShardId); pipeline->addInitialSource(DocumentSourceMock::createForTest(pipelineSource, expCtx)); @@ -1578,8 +1580,8 @@ TEST_F(ReshardingAggWithStorageTest, expCtx->mongoProcessInterface = std::move(mockMongoInterface); } - auto pipeline = - createOplogFetchingPipelineForResharding(expCtx, startAt, kCrudUUID, kMyShardId); + auto pipeline = resharding::createOplogFetchingPipelineForResharding( + expCtx, startAt, kCrudUUID, kMyShardId); pipeline->addInitialSource(DocumentSourceMock::createForTest(pipelineSource, expCtx)); return pipeline; }; diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 610580612d6..8bd04ebfe37 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -109,7 +109,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}}; // Assume that the config.cache.chunks collection isn't a view either. - auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); + auto tempNss = resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); auto tempCacheChunksNss = NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns()); resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, std::vector<BSONObj>{}}; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp index 5f78cac592c..da457d8eab3 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp @@ -112,7 +112,7 @@ bool stateTransistionsComplete(WithLock lk, template <class TParticipant> Status getStatusFromAbortReasonWithShardInfo(const TParticipant& participant, StringData participantType) { - return getStatusFromAbortReason(participant.getMutableState()) + return resharding::getStatusFromAbortReason(participant.getMutableState()) .withContext("{} shard {} reached an unrecoverable error"_format( participantType, participant.getId().toString())); } @@ -128,7 +128,7 @@ boost::optional<Status> getAbortReasonIfExists( if (updatedStateDoc.getAbortReason()) { // Note: the absence of context specifying which shard the abortReason originates from // implies the abortReason originates from the coordinator. - return getStatusFromAbortReason(updatedStateDoc); + return resharding::getStatusFromAbortReason(updatedStateDoc); } for (const auto& donorShard : updatedStateDoc.getDonorShards()) { diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp index 0f3803ab04e..bd893bd6dee 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp @@ -53,7 +53,7 @@ protected: auto coordinatorDoc = ReshardingCoordinatorDocument(); coordinatorDoc.setRecipientShards(std::move(recipients)); coordinatorDoc.setDonorShards(std::move(donors)); - emplaceTruncatedAbortReasonIfExists(coordinatorDoc, abortReason); + resharding::emplaceTruncatedAbortReasonIfExists(coordinatorDoc, abortReason); return coordinatorDoc; } @@ -62,9 +62,9 @@ protected: boost::optional<Timestamp> timestamp = boost::none, boost::optional<Status> abortReason = boost::none) { // The mock state here is simulating only one donor shard having errored locally. - return {makeDonorShard(ShardId{"s1"}, donorState, timestamp), - makeDonorShard(ShardId{"s2"}, donorState, timestamp, abortReason), - makeDonorShard(ShardId{"s3"}, donorState, timestamp)}; + return {resharding::makeDonorShard(ShardId{"s1"}, donorState, timestamp), + resharding::makeDonorShard(ShardId{"s2"}, donorState, timestamp, abortReason), + resharding::makeDonorShard(ShardId{"s3"}, donorState, timestamp)}; } std::vector<RecipientShardEntry> makeMockRecipientsInState( @@ -72,9 +72,9 @@ protected: boost::optional<Timestamp> timestamp = boost::none, boost::optional<Status> abortReason = boost::none) { // The mock state here is simulating only one donor shard having errored locally. - return {makeRecipientShard(ShardId{"s1"}, recipientState), - makeRecipientShard(ShardId{"s2"}, recipientState, abortReason), - makeRecipientShard(ShardId{"s3"}, recipientState)}; + return {resharding::makeRecipientShard(ShardId{"s1"}, recipientState), + resharding::makeRecipientShard(ShardId{"s2"}, recipientState, abortReason), + resharding::makeRecipientShard(ShardId{"s3"}, recipientState)}; } }; @@ -85,15 +85,15 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionSucce auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1)); std::vector<RecipientShardEntry> recipientShards0{ - makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning), - makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}; + resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning), + resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}; auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); ASSERT_FALSE(fut.isReady()); std::vector<RecipientShardEntry> recipientShards1{ - makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying), - makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}; + resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying), + resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}; auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); ASSERT_TRUE(fut.isReady()); @@ -110,25 +110,25 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionTwoOu auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1)); std::vector<RecipientShardEntry> recipientShards0{ - {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)}, - {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}, - {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}}; + {resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)}, + {resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}, + {resharding::makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}}; auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); ASSERT_FALSE(fut.isReady()); std::vector<RecipientShardEntry> recipientShards1{ - {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)}, - {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}, - {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kCloning)}}; + {resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)}, + {resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}, + {resharding::makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kCloning)}}; auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); ASSERT_FALSE(fut.isReady()); std::vector<RecipientShardEntry> recipientShards2{ - {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying)}, - {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}, - {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}}; + {resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying)}, + {resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)}, + {resharding::makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}}; auto coordinatorDoc2 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards2, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc2); ASSERT_TRUE(fut.isReady()); @@ -145,11 +145,11 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) { auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1)); std::vector<RecipientShardEntry> recipientShards{ - {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)}, - {makeRecipientShard(ShardId{"s2"}, - RecipientStateEnum::kError, - Status{ErrorCodes::InternalError, "We gotta abort"})}, - {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}}; + {resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)}, + {resharding::makeRecipientShard(ShardId{"s2"}, + RecipientStateEnum::kError, + Status{ErrorCodes::InternalError, "We gotta abort"})}, + {resharding::makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}}; auto coordinatorDoc = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc); auto resp = fut.getNoThrow(); @@ -173,9 +173,11 @@ TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) { // donor who hasn't seen there was an error yet. auto recipientShards = makeMockRecipientsInState(RecipientStateEnum::kDone, Timestamp(1, 1)); std::vector<DonorShardEntry> donorShards0{ - {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)}, - {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDonatingOplogEntries, Timestamp(1, 1))}, - {makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}}; + {resharding::makeDonorShard( + ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)}, + {resharding::makeDonorShard( + ShardId{"s2"}, DonorStateEnum::kDonatingOplogEntries, Timestamp(1, 1))}, + {resharding::makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}}; auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards0, abortReason); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); @@ -183,9 +185,10 @@ TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) { // All participants are done. std::vector<DonorShardEntry> donorShards1{ - {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)}, - {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDone, Timestamp(1, 1))}, - {makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}}; + {resharding::makeDonorShard( + ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)}, + {resharding::makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDone, Timestamp(1, 1))}, + {resharding::makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}}; auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards1, abortReason); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); @@ -206,15 +209,15 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingRecipientsOutOfSync) { auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1)); std::vector<RecipientShardEntry> recipientShards0{ - makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kUnused), - makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kStrictConsistency)}; + resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kUnused), + resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kStrictConsistency)}; auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); ASSERT_FALSE(fut.isReady()); std::vector<RecipientShardEntry> recipientShards1{ - makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying), - makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kStrictConsistency)}; + resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying), + resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kStrictConsistency)}; auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); ASSERT_TRUE(fut.isReady()); @@ -231,15 +234,18 @@ TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) { auto recipientShards = makeMockRecipientsInState(RecipientStateEnum::kUnused); std::vector<DonorShardEntry> donorShards0{ - {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))}, - {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kPreparingToDonate)}}; + {resharding::makeDonorShard( + ShardId{"s1"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))}, + {resharding::makeDonorShard(ShardId{"s2"}, DonorStateEnum::kPreparingToDonate)}}; auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards0); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); ASSERT_FALSE(fut.isReady()); std::vector<DonorShardEntry> donorShards1{ - {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))}, - {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))}}; + {resharding::makeDonorShard( + ShardId{"s1"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))}, + {resharding::makeDonorShard( + ShardId{"s2"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))}}; auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards1); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); ASSERT_TRUE(fut.isReady()); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index d2236e708d1..4f905f80f51 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -295,9 +295,9 @@ TypeCollectionRecipientFields constructRecipientFields( coordinatorDoc.getSourceNss(), resharding::gReshardingMinimumOperationDurationMillis.load()); - emplaceCloneTimestampIfExists(recipientFields, coordinatorDoc.getCloneTimestamp()); - emplaceApproxBytesToCopyIfExists(recipientFields, - coordinatorDoc.getReshardingApproxCopySizeStruct()); + resharding::emplaceCloneTimestampIfExists(recipientFields, coordinatorDoc.getCloneTimestamp()); + resharding::emplaceApproxBytesToCopyIfExists( + recipientFields, coordinatorDoc.getReshardingApproxCopySizeStruct()); return recipientFields; } @@ -323,10 +323,10 @@ BSONObj createReshardingFieldsUpdateForOriginalNss( << CollectionType::kAllowMigrationsFieldName << false)); } case CoordinatorStateEnum::kPreparingToDonate: { - TypeCollectionDonorFields donorFields( - coordinatorDoc.getTempReshardingNss(), - coordinatorDoc.getReshardingKey(), - extractShardIdsFromParticipantEntries(coordinatorDoc.getRecipientShards())); + TypeCollectionDonorFields donorFields(coordinatorDoc.getTempReshardingNss(), + coordinatorDoc.getReshardingKey(), + resharding::extractShardIdsFromParticipantEntries( + coordinatorDoc.getRecipientShards())); BSONObjBuilder updateBuilder; { @@ -394,7 +394,7 @@ BSONObj createReshardingFieldsUpdateForOriginalNss( // If the abortReason exists, include it in the update. setBuilder.append("reshardingFields.abortReason", *abortReason); - auto abortStatus = getStatusFromAbortReason(coordinatorDoc); + auto abortStatus = resharding::getStatusFromAbortReason(coordinatorDoc); setBuilder.append("reshardingFields.userCanceled", abortStatus == ErrorCodes::ReshardCollectionAborted); } @@ -504,7 +504,7 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx, if (auto abortReason = coordinatorDoc.getAbortReason()) { setBuilder.append("reshardingFields.abortReason", *abortReason); - auto abortStatus = getStatusFromAbortReason(coordinatorDoc); + auto abortStatus = resharding::getStatusFromAbortReason(coordinatorDoc); setBuilder.append("reshardingFields.userCanceled", abortStatus == ErrorCodes::ReshardCollectionAborted); } @@ -1592,8 +1592,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: // the possibility of the document reaching the BSONObj size constraint. std::vector<BSONObj> zones; if (updatedCoordinatorDoc.getZones()) { - zones = buildTagsDocsFromZones(updatedCoordinatorDoc.getTempReshardingNss(), - *updatedCoordinatorDoc.getZones()); + zones = resharding::buildTagsDocsFromZones(updatedCoordinatorDoc.getTempReshardingNss(), + *updatedCoordinatorDoc.getZones()); } updatedCoordinatorDoc.setPresetReshardedChunks(boost::none); updatedCoordinatorDoc.setZones(boost::none); @@ -1652,8 +1652,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat opCtx.get(), _ctHolder->getAbortToken()); } - auto highestMinFetchTimestamp = - getHighestMinFetchTimestamp(coordinatorDocChangedOnDisk.getDonorShards()); + auto highestMinFetchTimestamp = resharding::getHighestMinFetchTimestamp( + coordinatorDocChangedOnDisk.getDonorShards()); _updateCoordinatorDocStateAndCatalogEntries( CoordinatorStateEnum::kCloning, coordinatorDocChangedOnDisk, @@ -1693,7 +1693,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_startCommitMonitor( _commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>( _metrics, _coordinatorDoc.getSourceNss(), - extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()), + resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()), **executor, _ctHolder->getCommitMonitorToken()); @@ -1849,7 +1849,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsD boost::optional<Status> abortReason; if (coordinatorDoc.getAbortReason()) { - abortReason = getStatusFromAbortReason(coordinatorDoc); + abortReason = resharding::getStatusFromAbortReason(coordinatorDoc); } if (!abortReason) { @@ -1909,9 +1909,9 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: // Build new state doc for coordinator state update ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; updatedCoordinatorDoc.setState(nextState); - emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize)); - emplaceCloneTimestampIfExists(updatedCoordinatorDoc, std::move(cloneTimestamp)); - emplaceTruncatedAbortReasonIfExists(updatedCoordinatorDoc, abortReason); + resharding::emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize)); + resharding::emplaceCloneTimestampIfExists(updatedCoordinatorDoc, std::move(cloneTimestamp)); + resharding::emplaceTruncatedAbortReasonIfExists(updatedCoordinatorDoc, abortReason); auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); resharding::writeStateTransitionAndCatalogUpdatesThenBumpShardVersions( @@ -1924,9 +1924,10 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllParticipants( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); + auto donorShardIds = + resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); auto recipientShardIds = - extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); + resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()}; participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end()); @@ -1942,7 +1943,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllRecip const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto recipientShardIds = - extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); + resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); _reshardingCoordinatorExternalState->sendCommandToShards( opCtx.get(), @@ -1955,7 +1956,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllRecip void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllDonors( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); + auto donorShardIds = + resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); _reshardingCoordinatorExternalState->sendCommandToShards( opCtx.get(), diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp index dc16d5fe271..52c52654e89 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp @@ -203,7 +203,7 @@ public: {DonorShardEntry(ShardId("shard0000"), {})}, {RecipientShardEntry(ShardId("shard0001"), {})}); doc.setCommonReshardingMetadata(meta); - emplaceCloneTimestampIfExists(doc, cloneTimestamp); + resharding::emplaceCloneTimestampIfExists(doc, cloneTimestamp); return doc; } @@ -372,10 +372,11 @@ public: TypeCollectionReshardingFields reshardingFields(coordinatorDoc.getReshardingUUID()); reshardingFields.setState(coordinatorDoc.getState()); - reshardingFields.setDonorFields(TypeCollectionDonorFields( - coordinatorDoc.getTempReshardingNss(), - coordinatorDoc.getReshardingKey(), - extractShardIdsFromParticipantEntries(coordinatorDoc.getRecipientShards()))); + reshardingFields.setDonorFields( + TypeCollectionDonorFields(coordinatorDoc.getTempReshardingNss(), + coordinatorDoc.getReshardingKey(), + resharding::extractShardIdsFromParticipantEntries( + coordinatorDoc.getRecipientShards()))); auto originalNssCatalogEntry = makeOriginalCollectionCatalogEntry( coordinatorDoc, diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index afd580b0037..ff7bc064939 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -121,7 +121,7 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: for (const auto& donor : donorShards) { auto oplogBufferNss = - getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId()); + resharding::getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId()); auto minFetchTimestamp = *donor.getMinFetchTimestamp(); auto idToResumeFrom = getOplogFetcherResumeId( opCtx, metadata.getReshardingUUID(), oplogBufferNss, minFetchTimestamp); @@ -182,7 +182,7 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication:: invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp})); const auto& oplogBufferNss = - getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardId); + resharding::getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardId); auto applierMetrics = (*applierMetricsMap)[donorShardId].get(); oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>( @@ -456,7 +456,7 @@ ReshardingDonorOplogId ReshardingDataReplication::getOplogFetcherResumeId( if (highestOplogBufferId) { auto oplogEntry = repl::OplogEntry{highestOplogBufferId->toBson()}; - if (isFinalOplog(oplogEntry, reshardingUUID)) { + if (resharding::isFinalOplog(oplogEntry, reshardingUUID)) { return ReshardingOplogFetcher::kFinalOpAlreadyFetched; } diff --git a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp index f71ce9f0356..d02c0babe27 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp @@ -193,7 +193,7 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) { auto opCtx = makeOperationContext(); const auto reshardingUUID = UUID::gen(); - auto oplogBufferNss = getLocalOplogBufferNamespace(reshardingUUID, {"shard0"}); + auto oplogBufferNss = resharding::getLocalOplogBufferNamespace(reshardingUUID, {"shard0"}); const auto minFetchTimestamp = Timestamp{10, 0}; const auto oplogId1 = ReshardingDonorOplogId{{20, 0}, {18, 0}}; diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp index 5213b170753..0a9027deea2 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp @@ -129,7 +129,7 @@ std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline& numBytes += obj.objsize(); - if (isFinalOplog(entry)) { + if (resharding::isFinalOplog(entry)) { // The ReshardingOplogFetcher should never insert documents after the reshardFinalOp // entry. We defensively check each oplog entry for being the reshardFinalOp and confirm // the pipeline has been exhausted. @@ -185,7 +185,7 @@ ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getN const auto& lastEntryInBatch = batch.back(); _resumeToken = getId(lastEntryInBatch); - if (isFinalOplog(lastEntryInBatch)) { + if (resharding::isFinalOplog(lastEntryInBatch)) { _hasSeenFinalOplogEntry = true; // Skip returning the final oplog entry because it is known to be a no-op. batch.pop_back(); diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index 26b7646283f..a0491b06e7c 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -95,7 +95,7 @@ public: const BSONObj oField(BSON("msg" << "Created temporary resharding collection")); const BSONObj o2Field( - BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << UUID::gen())); + BSON("type" << resharding::kReshardFinalOpLogType << "reshardingUUID" << UUID::gen())); return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId); } @@ -103,7 +103,7 @@ public: ReshardingDonorOplogId oplogId(ts, ts); const BSONObj oField(BSON("msg" << "Latest oplog ts from donor's cursor response")); - const BSONObj o2Field(BSON("type" << kReshardProgressMark)); + const BSONObj o2Field(BSON("type" << resharding::kReshardProgressMark)); return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId); } diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index d4aae1cd424..bb47fe20b83 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -58,7 +58,7 @@ public: const NamespaceString kOriginalNss = NamespaceString("db", "foo"); const NamespaceString kTemporaryReshardingNss = - constructTemporaryReshardingNss("db", kExistingUUID); + resharding::constructTemporaryReshardingNss("db", kExistingUUID); const std::string kOriginalShardKey = "oldKey"; const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1); const std::string kReshardingKey = "newKey"; @@ -195,7 +195,7 @@ protected: const boost::optional<Timestamp>& cloneTimestamp = boost::none) { auto recipientFields = TypeCollectionRecipientFields(donorShards, existingUUID, originalNss, 5000); - emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp); + resharding::emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp); fields.setRecipientFields(std::move(recipientFields)); } @@ -620,10 +620,10 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta OperationContext* opCtx = operationContext(); NamespaceString sourceNss1 = NamespaceString("db", "one"); NamespaceString tempReshardingNss1 = - constructTemporaryReshardingNss(sourceNss1.db(), UUID::gen()); + resharding::constructTemporaryReshardingNss(sourceNss1.db(), UUID::gen()); NamespaceString sourceNss2 = NamespaceString("db", "two"); NamespaceString tempReshardingNss2 = - constructTemporaryReshardingNss(sourceNss2.db(), UUID::gen()); + resharding::constructTemporaryReshardingNss(sourceNss2.db(), UUID::gen()); ShardId shardId1 = ShardId{"recipient1"}; ShardId shardId2 = ShardId{"recipient2"}; ReshardingDonorDocument doc1 = diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 868e240e5e0..b25b25185ce 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -715,7 +715,7 @@ void ReshardingDonorService::DonorStateMachine:: oplog.setObject( BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", _metadata.getSourceNss().toString()))); - oplog.setObject2(BSON("type" << kReshardFinalOpLogType << "reshardingUUID" + oplog.setObject2(BSON("type" << resharding::kReshardFinalOpLogType << "reshardingUUID" << _metadata.getReshardingUUID())); oplog.setOpTime(OplogSlot()); oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); @@ -856,7 +856,7 @@ void ReshardingDonorService::DonorStateMachine::_transitionToDonatingInitialData void ReshardingDonorService::DonorStateMachine::_transitionToError(Status abortReason) { auto newDonorCtx = _donorCtx; newDonorCtx.setState(DonorStateEnum::kError); - emplaceTruncatedAbortReasonIfExists(newDonorCtx, abortReason); + resharding::emplaceTruncatedAbortReasonIfExists(newDonorCtx, abortReason); _transitionState(std::move(newDonorCtx)); } diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index f0ffd9ef028..4d83cfe5e44 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -150,12 +150,12 @@ public: NamespaceString sourceNss("sourcedb.sourcecollection"); auto sourceUUID = UUID::gen(); - auto commonMetadata = - CommonReshardingMetadata(UUID::gen(), - sourceNss, - sourceUUID, - constructTemporaryReshardingNss(sourceNss.db(), sourceUUID), - BSON("newKey" << 1)); + auto commonMetadata = CommonReshardingMetadata( + UUID::gen(), + sourceNss, + sourceUUID, + resharding::constructTemporaryReshardingNss(sourceNss.db(), sourceUUID), + BSON("newKey" << 1)); commonMetadata.setStartTime(getServiceContext()->getFastClockSource()->now()); doc.setCommonReshardingMetadata(std::move(commonMetadata)); @@ -350,7 +350,7 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl DBDirectClient client(opCtx.get()); FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace}; - findRequest.setFilter(BSON("o2.type" << kReshardFinalOpLogType)); + findRequest.setFilter(BSON("o2.type" << resharding::kReshardFinalOpLogType)); auto cursor = client.find(std::move(findRequest)); ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection"; @@ -712,7 +712,7 @@ TEST_F(ReshardingDonorServiceTest, TruncatesXLErrorOnDonorDocument) { // to the primitive truncation algorithm - Check that the total size is less than // kReshardErrorMaxBytes + a couple additional bytes to provide a buffer for the field // name sizes. - int maxReshardErrorBytesCeiling = kReshardErrorMaxBytes + 200; + int maxReshardErrorBytesCeiling = resharding::kReshardErrorMaxBytes + 200; ASSERT_LT(persistedAbortReasonBSON->objsize(), maxReshardErrorBytesCeiling); ASSERT_EQ(persistedAbortReasonBSON->getIntField("code"), ErrorCodes::ReshardCollectionTruncatedError); diff --git a/src/mongo/db/s/resharding/resharding_manual_cleanup.cpp b/src/mongo/db/s/resharding/resharding_manual_cleanup.cpp index 9c2b78385fa..74911c8518f 100644 --- a/src/mongo/db/s/resharding/resharding_manual_cleanup.cpp +++ b/src/mongo/db/s/resharding/resharding_manual_cleanup.cpp @@ -48,8 +48,9 @@ namespace { std::vector<ShardId> getAllParticipantsFromCoordDoc(const ReshardingCoordinatorDocument& doc) { std::vector<ShardId> participants; - auto donorShards = extractShardIdsFromParticipantEntriesAsSet(doc.getDonorShards()); - auto recipientShards = extractShardIdsFromParticipantEntriesAsSet(doc.getRecipientShards()); + auto donorShards = resharding::extractShardIdsFromParticipantEntriesAsSet(doc.getDonorShards()); + auto recipientShards = + resharding::extractShardIdsFromParticipantEntriesAsSet(doc.getRecipientShards()); std::set_union(donorShards.begin(), donorShards.end(), recipientShards.begin(), diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index 48a81ba5183..e57581cf8dd 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -98,12 +98,12 @@ public: } CommonReshardingMetadata createCommonReshardingMetadata(const UUID& operationId) { - CommonReshardingMetadata metadata{ - operationId, - kTestNamespace, - getSourceCollectionId(), - constructTemporaryReshardingNss(kTestNamespace.db(), getSourceCollectionId()), - kShardKey}; + CommonReshardingMetadata metadata{operationId, + kTestNamespace, + getSourceCollectionId(), + resharding::constructTemporaryReshardingNss( + kTestNamespace.db(), getSourceCollectionId()), + kShardKey}; metadata.setStartTime(getClockSource()->now() - kRunningTime); return metadata; } diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index cf449c4c00c..d9edf786371 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -271,7 +271,7 @@ NamespaceString ReshardingOplogApplier::ensureStashCollectionExists( const UUID& existingUUID, const ShardId& donorShardId, const CollectionOptions& options) { - auto nss = getLocalConflictStashNamespace(existingUUID, donorShardId); + auto nss = resharding::getLocalConflictStashNamespace(existingUUID, donorShardId); resharding::data_copy::ensureCollectionExists(opCtx, nss, options); return nss; diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index 5ee0720d57b..c88884ac624 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -356,11 +356,13 @@ private: const ShardId _otherDonorId{"otherDonorId"}; const NamespaceString _outputNss = - constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); - const NamespaceString _myStashNss = getLocalConflictStashNamespace(_sourceUUID, _myDonorId); + resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); + const NamespaceString _myStashNss = + resharding::getLocalConflictStashNamespace(_sourceUUID, _myDonorId); const NamespaceString _otherStashNss = - getLocalConflictStashNamespace(_sourceUUID, _otherDonorId); - const NamespaceString _myOplogBufferNss = getLocalOplogBufferNamespace(_sourceUUID, _myDonorId); + resharding::getLocalConflictStashNamespace(_sourceUUID, _otherDonorId); + const NamespaceString _myOplogBufferNss = + resharding::getLocalOplogBufferNamespace(_sourceUUID, _myDonorId); std::unique_ptr<ReshardingMetrics> _metrics; std::unique_ptr<ReshardingOplogApplierMetrics> _applierMetrics; diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp index fa3c7804902..a9a60cd9aa0 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp @@ -335,10 +335,11 @@ private: const ShardId _otherDonorId{"otherDonorId"}; const NamespaceString _outputNss = - constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); - const NamespaceString _myStashNss = getLocalConflictStashNamespace(_sourceUUID, _myDonorId); + resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); + const NamespaceString _myStashNss = + resharding::getLocalConflictStashNamespace(_sourceUUID, _myDonorId); const NamespaceString _otherStashNss = - getLocalConflictStashNamespace(_sourceUUID, _otherDonorId); + resharding::getLocalConflictStashNamespace(_sourceUUID, _otherDonorId); std::unique_ptr<ReshardingOplogApplicationRules> _applier; std::unique_ptr<ReshardingMetrics> _metrics; diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index 40f51e56e60..ac62a1cee4d 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -272,9 +272,9 @@ AggregateCommandRequest ReshardingOplogFetcher::_makeAggregateCommandRequest( auto opCtx = opCtxRaii.get(); auto expCtx = _makeExpressionContext(opCtx); - auto serializedPipeline = - createOplogFetchingPipelineForResharding(expCtx, _startAt, _collUUID, _recipientShard) - ->serializeToBson(); + auto serializedPipeline = resharding::createOplogFetchingPipelineForResharding( + expCtx, _startAt, _collUUID, _recipientShard) + ->serializeToBson(); AggregateCommandRequest aggRequest(NamespaceString::kRsOplogNamespace, std::move(serializedPipeline)); @@ -368,7 +368,7 @@ bool ReshardingOplogFetcher::consume(Client* client, _onInsertFuture = std::move(f); } - if (isFinalOplog(nextOplog, _reshardingUUID)) { + if (resharding::isFinalOplog(nextOplog, _reshardingUUID)) { moreToCome = false; return false; } @@ -392,7 +392,7 @@ bool ReshardingOplogFetcher::consume(Client* client, oplog.set_id(Value(startAt.toBSON())); oplog.setObject(BSON("msg" << "Latest oplog ts from donor's cursor response")); - oplog.setObject2(BSON("type" << kReshardProgressMark)); + oplog.setObject2(BSON("type" << resharding::kReshardProgressMark)); oplog.setOpTime(OplogSlot()); oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index dc6f82ad701..68523519f41 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -298,7 +298,8 @@ public: BSON( "msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", dataColl.getCollection()->ns().toString())), - BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << _reshardingUUID), + BSON("type" << resharding::kReshardFinalOpLogType << "reshardingUUID" + << _reshardingUUID), boost::none, boost::none, boost::none, diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index d64e180304d..f52e3b8a3a7 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -728,8 +728,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: .then([this, &factory] { auto opCtx = factory.makeOperationContext(&cc()); for (const auto& donor : _donorShards) { - auto stashNss = - getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId()); + auto stashNss = resharding::getLocalConflictStashNamespace( + _metadata.getSourceUUID(), donor.getShardId()); AutoGetCollection stashColl(opCtx.get(), stashNss, MODE_IS); uassert(5356800, "Resharding completed with non-empty stash collections", @@ -902,7 +902,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToError( Status abortReason, const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kError); - emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason); + resharding::emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason); _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); } @@ -1168,10 +1168,10 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( progressDocList; for (const auto& donor : _donorShards) { { - AutoGetCollection oplogBufferColl( - opCtx.get(), - getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()), - MODE_IS); + AutoGetCollection oplogBufferColl(opCtx.get(), + resharding::getLocalOplogBufferNamespace( + _metadata.getSourceUUID(), donor.getShardId()), + MODE_IS); if (oplogBufferColl) { oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get()); } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 22ae1f61542..b72f0ad34e8 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -252,12 +252,12 @@ public: NamespaceString sourceNss("sourcedb", "sourcecollection"); auto sourceUUID = UUID::gen(); - auto commonMetadata = - CommonReshardingMetadata(UUID::gen(), - sourceNss, - sourceUUID, - constructTemporaryReshardingNss(sourceNss.db(), sourceUUID), - newShardKeyPattern()); + auto commonMetadata = CommonReshardingMetadata( + UUID::gen(), + sourceNss, + sourceUUID, + resharding::constructTemporaryReshardingNss(sourceNss.db(), sourceUUID), + newShardKeyPattern()); commonMetadata.setStartTime(getServiceContext()->getFastClockSource()->now()); doc.setCommonReshardingMetadata(std::move(commonMetadata)); @@ -627,7 +627,8 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryOnReshardDoneCatchUp) ErrorCodes::InterruptedDueToReplStateChange); DBDirectClient client(opCtx.get()); - NamespaceString sourceNss = constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID()); + NamespaceString sourceNss = + resharding::constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID()); FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace}; findRequest.setFilter( @@ -673,7 +674,8 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryForImplicitShardColle ErrorCodes::InterruptedDueToReplStateChange); DBDirectClient client(opCtx.get()); - NamespaceString sourceNss = constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID()); + NamespaceString sourceNss = + resharding::constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID()); FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace}; findRequest.setFilter( @@ -741,7 +743,7 @@ TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) { // to the primitive truncation algorithm - Check that the total size is less than // kReshardErrorMaxBytes + a couple additional bytes to provide a buffer for the field // name sizes. - int maxReshardErrorBytesCeiling = kReshardErrorMaxBytes + 200; + int maxReshardErrorBytesCeiling = resharding::kReshardErrorMaxBytes + 200; ASSERT_LT(persistedAbortReasonBSON->objsize(), maxReshardErrorBytesCeiling); ASSERT_EQ(persistedAbortReasonBSON->getIntField("code"), ErrorCodes::ReshardCollectionTruncatedError); @@ -817,7 +819,8 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { for (const auto& donor : donorShards) { // Setup oplogBuffer collection. ReshardingDonorOplogId donorOplogId{{20, i}, {19, 0}}; - insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()), + insertFn(resharding::getLocalOplogBufferNamespace(doc.getSourceUUID(), + donor.getShardId()), InsertStatement{BSON("_id" << donorOplogId.toBSON())}); ++i; @@ -925,7 +928,7 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUpWithMissingProgr // Setup oplogBuffer collection. ReshardingDonorOplogId donorOplogId{{20, i}, {19, 0}}; - insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()), + insertFn(resharding::getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()), InsertStatement{BSON("_id" << donorOplogId.toBSON())}); // Setup reshardingApplierProgress collection. diff --git a/src/mongo/db/s/resharding/resharding_util.cpp b/src/mongo/db/s/resharding/resharding_util.cpp index 4febc199126..873fc7ce5d5 100644 --- a/src/mongo/db/s/resharding/resharding_util.cpp +++ b/src/mongo/db/s/resharding/resharding_util.cpp @@ -63,6 +63,7 @@ namespace mongo { +namespace resharding { namespace { /** @@ -414,4 +415,5 @@ boost::optional<Milliseconds> estimateRemainingRecipientTime(bool applyingBegan, return {}; } +} // namespace resharding } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_util.h b/src/mongo/db/s/resharding/resharding_util.h index 194381e7e78..0d8aacbe3f7 100644 --- a/src/mongo/db/s/resharding/resharding_util.h +++ b/src/mongo/db/s/resharding/resharding_util.h @@ -50,6 +50,7 @@ #include "mongo/util/str.h" namespace mongo { +namespace resharding { constexpr auto kReshardFinalOpLogType = "reshardFinalOp"_sd; constexpr auto kReshardProgressMark = "reshardProgressMark"_sd; @@ -324,5 +325,6 @@ std::vector<std::shared_ptr<Instance>> getReshardingStateMachines(OperationConte return result; } +} // namespace resharding } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_util_test.cpp b/src/mongo/db/s/resharding/resharding_util_test.cpp index 5fd40fd86b7..12e5e15ddcd 100644 --- a/src/mongo/db/s/resharding/resharding_util_test.cpp +++ b/src/mongo/db/s/resharding/resharding_util_test.cpp @@ -52,6 +52,7 @@ namespace mongo { +namespace resharding { namespace { class ReshardingUtilTest : public ConfigServerTestFixture { @@ -309,4 +310,7 @@ TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineAfterID) { } } // namespace + +} // namespace resharding + } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp index e74155e374b..c5f6a831b0f 100644 --- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp @@ -118,7 +118,8 @@ ShardingDataTransformInstanceMetrics::~ShardingDataTransformInstanceMetrics() { Milliseconds ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const { switch (_role) { case Role::kRecipient: { - auto estimate = estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate, + auto estimate = + resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate, _bytesCopied.load(), _approxBytesToCopy.load(), getCopyingElapsedTimeSecs(), diff --git a/src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp b/src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp index 4e95395faaa..f0918cc5766 100644 --- a/src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp @@ -99,7 +99,7 @@ public: // If abort actually went through, the resharding documents should be cleaned up. // If they still exists, it could be because that it was interrupted or it is no // longer primary. - doNoopWrite(opCtx, "_shardsvrAbortReshardCollection no-op", ns()); + resharding::doNoopWrite(opCtx, "_shardsvrAbortReshardCollection no-op", ns()); PersistentTaskStore<CommonReshardingMetadata> donorReshardingOpStore( NamespaceString::kDonorReshardingOperationsNamespace); uassert(5563802, diff --git a/src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp b/src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp index f4240c1eb0a..3d9be030fcb 100644 --- a/src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp @@ -107,7 +107,7 @@ public: // If commit actually went through, the resharding documents will be cleaned up. If // documents still exist, it could be because that commit was interrupted or that the // underlying replica set node is no longer primary. - doNoopWrite(opCtx, "_shardsvrCommitReshardCollection no-op", ns()); + resharding::doNoopWrite(opCtx, "_shardsvrCommitReshardCollection no-op", ns()); PersistentTaskStore<CommonReshardingMetadata> donorReshardingOpStore( NamespaceString::kDonorReshardingOperationsNamespace); uassert(5795302, diff --git a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp index 4c3e05a7879..56bf7b644f3 100644 --- a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp +++ b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp @@ -108,10 +108,9 @@ public: } Response typedRun(OperationContext* opCtx) { - auto instances = - getReshardingStateMachines<ReshardingRecipientService, - ReshardingRecipientService::RecipientStateMachine>(opCtx, - ns()); + auto instances = resharding::getReshardingStateMachines< + ReshardingRecipientService, + ReshardingRecipientService::RecipientStateMachine>(opCtx, ns()); if (instances.empty()) { return Response{boost::none, boost::none}; } |