summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp4
-rw-r--r--src/mongo/db/s/collection_metadata_test.cpp2
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp12
-rw-r--r--src/mongo/db/s/flush_resharding_state_change_command.cpp2
-rw-r--r--src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_agg_test.cpp34
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp84
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp46
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp11
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp16
-rw-r--r--src/mongo/db/s/resharding/resharding_manual_cleanup.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_test.cpp12
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp10
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp7
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp10
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp14
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp25
-rw-r--r--src/mongo/db/s/resharding/resharding_util.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_util.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_util_test.cpp4
-rw-r--r--src/mongo/db/s/sharding_data_transform_instance_metrics.cpp3
-rw-r--r--src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp2
-rw-r--r--src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp2
-rw-r--r--src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp7
34 files changed, 194 insertions, 164 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index fcd7c965b37..bc1707620e4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1421,7 +1421,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) {
TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUpLegacyFormat) {
auto existingUuid = UUID::gen();
auto reshardingUuid = UUID::gen();
- auto temporaryNs = constructTemporaryReshardingNss(nss.db(), existingUuid);
+ auto temporaryNs = resharding::constructTemporaryReshardingNss(nss.db(), existingUuid);
const auto o2FieldInLegacyFormat = BSON("type"
<< "reshardDoneCatchUp"
@@ -1460,7 +1460,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUpLegacyFormat) {
TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
auto existingUuid = UUID::gen();
auto reshardingUuid = UUID::gen();
- auto temporaryNs = constructTemporaryReshardingNss(nss.db(), existingUuid);
+ auto temporaryNs = resharding::constructTemporaryReshardingNss(nss.db(), existingUuid);
ReshardDoneCatchUpChangeEventO2Field o2Field{temporaryNs, reshardingUuid};
auto reshardDoneCatchUp = makeOplogEntry(OpTypeEnum::kNoop,
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp
index 8f789549796..c9ed5d77272 100644
--- a/src/mongo/db/s/collection_metadata_test.cpp
+++ b/src/mongo/db/s/collection_metadata_test.cpp
@@ -125,7 +125,7 @@ protected:
reshardingFields.setRecipientFields(std::move(recipientFields));
} else if (state == CoordinatorStateEnum::kBlockingWrites) {
TypeCollectionDonorFields donorFields{
- constructTemporaryReshardingNss(kNss.db(), existingUuid),
+ resharding::constructTemporaryReshardingNss(kNss.db(), existingUuid),
KeyPattern{BSON("newKey" << 1)},
{kThisShard, kOtherShard}};
reshardingFields.setDonorFields(std::move(donorFields));
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index 7f284e2c642..1a094c7db5f 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -63,8 +63,9 @@ getExistingInstanceToJoin(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& newShardKey) {
auto instances =
- getReshardingStateMachines<ReshardingCoordinatorService,
- ReshardingCoordinatorService::ReshardingCoordinator>(opCtx, nss);
+ resharding::getReshardingStateMachines<ReshardingCoordinatorService,
+ ReshardingCoordinatorService::ReshardingCoordinator>(
+ opCtx, nss);
for (const auto& instance : instances) {
if (SimpleBSONObjComparator::kInstance.evaluate(
instance->getMetadata().getReshardingKey().toBSON() == newShardKey)) {
@@ -139,7 +140,7 @@ public:
"Must specify only one of _presetReshardedChunks or numInitialChunks",
!(bool(request().getNumInitialChunks())));
- validateReshardedChunks(
+ resharding::validateReshardedChunks(
*presetChunks, opCtx, ShardKeyPattern(request().getKey()).getKeyPattern());
}
@@ -183,11 +184,12 @@ public:
return boost::none;
}
- auto tempReshardingNss = constructTemporaryReshardingNss(nss.db(), cm.getUUID());
+ auto tempReshardingNss =
+ resharding::constructTemporaryReshardingNss(nss.db(), cm.getUUID());
if (auto zones = request().getZones()) {
- checkForOverlappingZones(*zones);
+ resharding::checkForOverlappingZones(*zones);
}
auto coordinatorDoc =
diff --git a/src/mongo/db/s/flush_resharding_state_change_command.cpp b/src/mongo/db/s/flush_resharding_state_change_command.cpp
index 85f0c42cff0..95439564643 100644
--- a/src/mongo/db/s/flush_resharding_state_change_command.cpp
+++ b/src/mongo/db/s/flush_resharding_state_change_command.cpp
@@ -131,7 +131,7 @@ public:
.getAsync([](auto) {});
// Ensure the command isn't run on a stale primary.
- doNoopWrite(opCtx, "_flushReshardingStateChange no-op", ns());
+ resharding::doNoopWrite(opCtx, "_flushReshardingStateChange no-op", ns());
}
};
} _flushReshardingStateChange;
diff --git a/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp b/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp
index aaeb6180654..dc3176cf3e8 100644
--- a/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp
+++ b/src/mongo/db/s/resharding/document_source_resharding_ownership_match.cpp
@@ -117,7 +117,8 @@ DocumentSource::GetModPathsReturn DocumentSourceReshardingOwnershipMatch::getMod
DocumentSource::GetNextResult DocumentSourceReshardingOwnershipMatch::doGetNext() {
if (!_tempReshardingChunkMgr) {
// TODO: Actually propagate the temporary resharding namespace from the recipient.
- auto tempReshardingNss = constructTemporaryReshardingNss(pExpCtx->ns.db(), *pExpCtx->uuid);
+ auto tempReshardingNss =
+ resharding::constructTemporaryReshardingNss(pExpCtx->ns.db(), *pExpCtx->uuid);
auto* catalogCache = Grid::get(pExpCtx->opCtx)->catalogCache();
_tempReshardingChunkMgr =
diff --git a/src/mongo/db/s/resharding/resharding_agg_test.cpp b/src/mongo/db/s/resharding/resharding_agg_test.cpp
index ce8d110e5ab..c49467f79f9 100644
--- a/src/mongo/db/s/resharding/resharding_agg_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_agg_test.cpp
@@ -362,7 +362,7 @@ protected:
expCtx->ns = kRemoteOplogNss;
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(pipelineSource);
- auto pipeline = createOplogFetchingPipelineForResharding(
+ auto pipeline = resharding::createOplogFetchingPipelineForResharding(
expCtx,
ReshardingDonorOplogId(Timestamp::min(), Timestamp::min()),
_reshardingCollUUID,
@@ -524,13 +524,14 @@ TEST_F(ReshardingAggTest, VerifyPipelineOutputHasOplogSchema) {
expCtx->ns = kRemoteOplogNss;
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(pipelineSource);
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline = createOplogFetchingPipelineForResharding(
- expCtx,
- // Use the test to also exercise the stages for resuming. The timestamp passed in is
- // excluded from the results.
- ReshardingDonorOplogId(insertOplog.getTimestamp(), insertOplog.getTimestamp()),
- _reshardingCollUUID,
- {_destinedRecipient});
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline =
+ resharding::createOplogFetchingPipelineForResharding(
+ expCtx,
+ // Use the test to also exercise the stages for resuming. The timestamp passed in is
+ // excluded from the results.
+ ReshardingDonorOplogId(insertOplog.getTimestamp(), insertOplog.getTimestamp()),
+ _reshardingCollUUID,
+ {_destinedRecipient});
auto bsonPipeline = pipeline->serializeToBson();
if (debug) {
std::cout << "Pipeline stages:" << std::endl;
@@ -624,11 +625,12 @@ TEST_F(ReshardingAggTest, VerifyPipelinePreparedTxn) {
expCtx->ns = kRemoteOplogNss;
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(pipelineSource);
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline = createOplogFetchingPipelineForResharding(
- expCtx,
- ReshardingDonorOplogId(Timestamp::min(), Timestamp::min()),
- _reshardingCollUUID,
- {_destinedRecipient});
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline =
+ resharding::createOplogFetchingPipelineForResharding(
+ expCtx,
+ ReshardingDonorOplogId(Timestamp::min(), Timestamp::min()),
+ _reshardingCollUUID,
+ {_destinedRecipient});
if (debug) {
std::cout << "Pipeline stages:" << std::endl;
// This is can be changed to process a prefix of the pipeline for debugging.
@@ -1476,7 +1478,7 @@ TEST_F(ReshardingAggWithStorageTest, RetryableFindAndModifyWithImageLookup) {
expCtx->mongoProcessInterface = std::move(mockMongoInterface);
}
- auto pipeline = createOplogFetchingPipelineForResharding(
+ auto pipeline = resharding::createOplogFetchingPipelineForResharding(
expCtx, ReshardingDonorOplogId(Timestamp::min(), Timestamp::min()), kCrudUUID, kMyShardId);
pipeline->addInitialSource(DocumentSourceMock::createForTest(pipelineSource, expCtx));
@@ -1578,8 +1580,8 @@ TEST_F(ReshardingAggWithStorageTest,
expCtx->mongoProcessInterface = std::move(mockMongoInterface);
}
- auto pipeline =
- createOplogFetchingPipelineForResharding(expCtx, startAt, kCrudUUID, kMyShardId);
+ auto pipeline = resharding::createOplogFetchingPipelineForResharding(
+ expCtx, startAt, kCrudUUID, kMyShardId);
pipeline->addInitialSource(DocumentSourceMock::createForTest(pipelineSource, expCtx));
return pipeline;
};
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index 610580612d6..8bd04ebfe37 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -109,7 +109,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel
resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}};
// Assume that the config.cache.chunks collection isn't a view either.
- auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
+ auto tempNss = resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
auto tempCacheChunksNss =
NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns());
resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, std::vector<BSONObj>{}};
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
index 5f78cac592c..da457d8eab3 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
@@ -112,7 +112,7 @@ bool stateTransistionsComplete(WithLock lk,
template <class TParticipant>
Status getStatusFromAbortReasonWithShardInfo(const TParticipant& participant,
StringData participantType) {
- return getStatusFromAbortReason(participant.getMutableState())
+ return resharding::getStatusFromAbortReason(participant.getMutableState())
.withContext("{} shard {} reached an unrecoverable error"_format(
participantType, participant.getId().toString()));
}
@@ -128,7 +128,7 @@ boost::optional<Status> getAbortReasonIfExists(
if (updatedStateDoc.getAbortReason()) {
// Note: the absence of context specifying which shard the abortReason originates from
// implies the abortReason originates from the coordinator.
- return getStatusFromAbortReason(updatedStateDoc);
+ return resharding::getStatusFromAbortReason(updatedStateDoc);
}
for (const auto& donorShard : updatedStateDoc.getDonorShards()) {
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
index 0f3803ab04e..bd893bd6dee 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
@@ -53,7 +53,7 @@ protected:
auto coordinatorDoc = ReshardingCoordinatorDocument();
coordinatorDoc.setRecipientShards(std::move(recipients));
coordinatorDoc.setDonorShards(std::move(donors));
- emplaceTruncatedAbortReasonIfExists(coordinatorDoc, abortReason);
+ resharding::emplaceTruncatedAbortReasonIfExists(coordinatorDoc, abortReason);
return coordinatorDoc;
}
@@ -62,9 +62,9 @@ protected:
boost::optional<Timestamp> timestamp = boost::none,
boost::optional<Status> abortReason = boost::none) {
// The mock state here is simulating only one donor shard having errored locally.
- return {makeDonorShard(ShardId{"s1"}, donorState, timestamp),
- makeDonorShard(ShardId{"s2"}, donorState, timestamp, abortReason),
- makeDonorShard(ShardId{"s3"}, donorState, timestamp)};
+ return {resharding::makeDonorShard(ShardId{"s1"}, donorState, timestamp),
+ resharding::makeDonorShard(ShardId{"s2"}, donorState, timestamp, abortReason),
+ resharding::makeDonorShard(ShardId{"s3"}, donorState, timestamp)};
}
std::vector<RecipientShardEntry> makeMockRecipientsInState(
@@ -72,9 +72,9 @@ protected:
boost::optional<Timestamp> timestamp = boost::none,
boost::optional<Status> abortReason = boost::none) {
// The mock state here is simulating only one donor shard having errored locally.
- return {makeRecipientShard(ShardId{"s1"}, recipientState),
- makeRecipientShard(ShardId{"s2"}, recipientState, abortReason),
- makeRecipientShard(ShardId{"s3"}, recipientState)};
+ return {resharding::makeRecipientShard(ShardId{"s1"}, recipientState),
+ resharding::makeRecipientShard(ShardId{"s2"}, recipientState, abortReason),
+ resharding::makeRecipientShard(ShardId{"s3"}, recipientState)};
}
};
@@ -85,15 +85,15 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionSucce
auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1));
std::vector<RecipientShardEntry> recipientShards0{
- makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning),
- makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)};
+ resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning),
+ resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)};
auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
ASSERT_FALSE(fut.isReady());
std::vector<RecipientShardEntry> recipientShards1{
- makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying),
- makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)};
+ resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying),
+ resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)};
auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_TRUE(fut.isReady());
@@ -110,25 +110,25 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionTwoOu
auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1));
std::vector<RecipientShardEntry> recipientShards0{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
- {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}};
+ {resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
+ {resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)},
+ {resharding::makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}};
auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
ASSERT_FALSE(fut.isReady());
std::vector<RecipientShardEntry> recipientShards1{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
- {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kCloning)}};
+ {resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
+ {resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)},
+ {resharding::makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kCloning)}};
auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_FALSE(fut.isReady());
std::vector<RecipientShardEntry> recipientShards2{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying)},
- {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}};
+ {resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying)},
+ {resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kApplying)},
+ {resharding::makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}};
auto coordinatorDoc2 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards2, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc2);
ASSERT_TRUE(fut.isReady());
@@ -145,11 +145,11 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) {
auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1));
std::vector<RecipientShardEntry> recipientShards{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
- {makeRecipientShard(ShardId{"s2"},
- RecipientStateEnum::kError,
- Status{ErrorCodes::InternalError, "We gotta abort"})},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}};
+ {resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
+ {resharding::makeRecipientShard(ShardId{"s2"},
+ RecipientStateEnum::kError,
+ Status{ErrorCodes::InternalError, "We gotta abort"})},
+ {resharding::makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kApplying)}};
auto coordinatorDoc = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc);
auto resp = fut.getNoThrow();
@@ -173,9 +173,11 @@ TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) {
// donor who hasn't seen there was an error yet.
auto recipientShards = makeMockRecipientsInState(RecipientStateEnum::kDone, Timestamp(1, 1));
std::vector<DonorShardEntry> donorShards0{
- {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)},
- {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDonatingOplogEntries, Timestamp(1, 1))},
- {makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}};
+ {resharding::makeDonorShard(
+ ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)},
+ {resharding::makeDonorShard(
+ ShardId{"s2"}, DonorStateEnum::kDonatingOplogEntries, Timestamp(1, 1))},
+ {resharding::makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}};
auto coordinatorDoc0 =
makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards0, abortReason);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
@@ -183,9 +185,10 @@ TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) {
// All participants are done.
std::vector<DonorShardEntry> donorShards1{
- {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)},
- {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDone, Timestamp(1, 1))},
- {makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}};
+ {resharding::makeDonorShard(
+ ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)},
+ {resharding::makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDone, Timestamp(1, 1))},
+ {resharding::makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}};
auto coordinatorDoc1 =
makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards1, abortReason);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
@@ -206,15 +209,15 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingRecipientsOutOfSync) {
auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonatingInitialData, Timestamp(1, 1));
std::vector<RecipientShardEntry> recipientShards0{
- makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kUnused),
- makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kStrictConsistency)};
+ resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kUnused),
+ resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kStrictConsistency)};
auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
ASSERT_FALSE(fut.isReady());
std::vector<RecipientShardEntry> recipientShards1{
- makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying),
- makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kStrictConsistency)};
+ resharding::makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kApplying),
+ resharding::makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kStrictConsistency)};
auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_TRUE(fut.isReady());
@@ -231,15 +234,18 @@ TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) {
auto recipientShards = makeMockRecipientsInState(RecipientStateEnum::kUnused);
std::vector<DonorShardEntry> donorShards0{
- {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))},
- {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kPreparingToDonate)}};
+ {resharding::makeDonorShard(
+ ShardId{"s1"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))},
+ {resharding::makeDonorShard(ShardId{"s2"}, DonorStateEnum::kPreparingToDonate)}};
auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards0);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
ASSERT_FALSE(fut.isReady());
std::vector<DonorShardEntry> donorShards1{
- {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))},
- {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))}};
+ {resharding::makeDonorShard(
+ ShardId{"s1"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))},
+ {resharding::makeDonorShard(
+ ShardId{"s2"}, DonorStateEnum::kDonatingInitialData, Timestamp(1, 1))}};
auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards1);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_TRUE(fut.isReady());
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index d2236e708d1..4f905f80f51 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -295,9 +295,9 @@ TypeCollectionRecipientFields constructRecipientFields(
coordinatorDoc.getSourceNss(),
resharding::gReshardingMinimumOperationDurationMillis.load());
- emplaceCloneTimestampIfExists(recipientFields, coordinatorDoc.getCloneTimestamp());
- emplaceApproxBytesToCopyIfExists(recipientFields,
- coordinatorDoc.getReshardingApproxCopySizeStruct());
+ resharding::emplaceCloneTimestampIfExists(recipientFields, coordinatorDoc.getCloneTimestamp());
+ resharding::emplaceApproxBytesToCopyIfExists(
+ recipientFields, coordinatorDoc.getReshardingApproxCopySizeStruct());
return recipientFields;
}
@@ -323,10 +323,10 @@ BSONObj createReshardingFieldsUpdateForOriginalNss(
<< CollectionType::kAllowMigrationsFieldName << false));
}
case CoordinatorStateEnum::kPreparingToDonate: {
- TypeCollectionDonorFields donorFields(
- coordinatorDoc.getTempReshardingNss(),
- coordinatorDoc.getReshardingKey(),
- extractShardIdsFromParticipantEntries(coordinatorDoc.getRecipientShards()));
+ TypeCollectionDonorFields donorFields(coordinatorDoc.getTempReshardingNss(),
+ coordinatorDoc.getReshardingKey(),
+ resharding::extractShardIdsFromParticipantEntries(
+ coordinatorDoc.getRecipientShards()));
BSONObjBuilder updateBuilder;
{
@@ -394,7 +394,7 @@ BSONObj createReshardingFieldsUpdateForOriginalNss(
// If the abortReason exists, include it in the update.
setBuilder.append("reshardingFields.abortReason", *abortReason);
- auto abortStatus = getStatusFromAbortReason(coordinatorDoc);
+ auto abortStatus = resharding::getStatusFromAbortReason(coordinatorDoc);
setBuilder.append("reshardingFields.userCanceled",
abortStatus == ErrorCodes::ReshardCollectionAborted);
}
@@ -504,7 +504,7 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx,
if (auto abortReason = coordinatorDoc.getAbortReason()) {
setBuilder.append("reshardingFields.abortReason", *abortReason);
- auto abortStatus = getStatusFromAbortReason(coordinatorDoc);
+ auto abortStatus = resharding::getStatusFromAbortReason(coordinatorDoc);
setBuilder.append("reshardingFields.userCanceled",
abortStatus == ErrorCodes::ReshardCollectionAborted);
}
@@ -1592,8 +1592,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
// the possibility of the document reaching the BSONObj size constraint.
std::vector<BSONObj> zones;
if (updatedCoordinatorDoc.getZones()) {
- zones = buildTagsDocsFromZones(updatedCoordinatorDoc.getTempReshardingNss(),
- *updatedCoordinatorDoc.getZones());
+ zones = resharding::buildTagsDocsFromZones(updatedCoordinatorDoc.getTempReshardingNss(),
+ *updatedCoordinatorDoc.getZones());
}
updatedCoordinatorDoc.setPresetReshardedChunks(boost::none);
updatedCoordinatorDoc.setZones(boost::none);
@@ -1652,8 +1652,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
opCtx.get(), _ctHolder->getAbortToken());
}
- auto highestMinFetchTimestamp =
- getHighestMinFetchTimestamp(coordinatorDocChangedOnDisk.getDonorShards());
+ auto highestMinFetchTimestamp = resharding::getHighestMinFetchTimestamp(
+ coordinatorDocChangedOnDisk.getDonorShards());
_updateCoordinatorDocStateAndCatalogEntries(
CoordinatorStateEnum::kCloning,
coordinatorDocChangedOnDisk,
@@ -1693,7 +1693,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_startCommitMonitor(
_commitMonitor = std::make_shared<resharding::CoordinatorCommitMonitor>(
_metrics,
_coordinatorDoc.getSourceNss(),
- extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()),
+ resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()),
**executor,
_ctHolder->getCommitMonitorToken());
@@ -1849,7 +1849,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsD
boost::optional<Status> abortReason;
if (coordinatorDoc.getAbortReason()) {
- abortReason = getStatusFromAbortReason(coordinatorDoc);
+ abortReason = resharding::getStatusFromAbortReason(coordinatorDoc);
}
if (!abortReason) {
@@ -1909,9 +1909,9 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
// Build new state doc for coordinator state update
ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
updatedCoordinatorDoc.setState(nextState);
- emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize));
- emplaceCloneTimestampIfExists(updatedCoordinatorDoc, std::move(cloneTimestamp));
- emplaceTruncatedAbortReasonIfExists(updatedCoordinatorDoc, abortReason);
+ resharding::emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize));
+ resharding::emplaceCloneTimestampIfExists(updatedCoordinatorDoc, std::move(cloneTimestamp));
+ resharding::emplaceTruncatedAbortReasonIfExists(updatedCoordinatorDoc, abortReason);
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(
@@ -1924,9 +1924,10 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllParticipants(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
+ auto donorShardIds =
+ resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
auto recipientShardIds =
- extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
+ resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()};
participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end());
@@ -1942,7 +1943,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllRecip
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto recipientShardIds =
- extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
+ resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
_reshardingCoordinatorExternalState->sendCommandToShards(
opCtx.get(),
@@ -1955,7 +1956,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllRecip
void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllDonors(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
+ auto donorShardIds =
+ resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
_reshardingCoordinatorExternalState->sendCommandToShards(
opCtx.get(),
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
index dc16d5fe271..52c52654e89 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
@@ -203,7 +203,7 @@ public:
{DonorShardEntry(ShardId("shard0000"), {})},
{RecipientShardEntry(ShardId("shard0001"), {})});
doc.setCommonReshardingMetadata(meta);
- emplaceCloneTimestampIfExists(doc, cloneTimestamp);
+ resharding::emplaceCloneTimestampIfExists(doc, cloneTimestamp);
return doc;
}
@@ -372,10 +372,11 @@ public:
TypeCollectionReshardingFields reshardingFields(coordinatorDoc.getReshardingUUID());
reshardingFields.setState(coordinatorDoc.getState());
- reshardingFields.setDonorFields(TypeCollectionDonorFields(
- coordinatorDoc.getTempReshardingNss(),
- coordinatorDoc.getReshardingKey(),
- extractShardIdsFromParticipantEntries(coordinatorDoc.getRecipientShards())));
+ reshardingFields.setDonorFields(
+ TypeCollectionDonorFields(coordinatorDoc.getTempReshardingNss(),
+ coordinatorDoc.getReshardingKey(),
+ resharding::extractShardIdsFromParticipantEntries(
+ coordinatorDoc.getRecipientShards())));
auto originalNssCatalogEntry = makeOriginalCollectionCatalogEntry(
coordinatorDoc,
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index afd580b0037..ff7bc064939 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -121,7 +121,7 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
for (const auto& donor : donorShards) {
auto oplogBufferNss =
- getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId());
+ resharding::getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId());
auto minFetchTimestamp = *donor.getMinFetchTimestamp();
auto idToResumeFrom = getOplogFetcherResumeId(
opCtx, metadata.getReshardingUUID(), oplogBufferNss, minFetchTimestamp);
@@ -182,7 +182,7 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}));
const auto& oplogBufferNss =
- getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardId);
+ resharding::getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardId);
auto applierMetrics = (*applierMetricsMap)[donorShardId].get();
oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>(
@@ -456,7 +456,7 @@ ReshardingDonorOplogId ReshardingDataReplication::getOplogFetcherResumeId(
if (highestOplogBufferId) {
auto oplogEntry = repl::OplogEntry{highestOplogBufferId->toBson()};
- if (isFinalOplog(oplogEntry, reshardingUUID)) {
+ if (resharding::isFinalOplog(oplogEntry, reshardingUUID)) {
return ReshardingOplogFetcher::kFinalOpAlreadyFetched;
}
diff --git a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp
index f71ce9f0356..d02c0babe27 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp
@@ -193,7 +193,7 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) {
auto opCtx = makeOperationContext();
const auto reshardingUUID = UUID::gen();
- auto oplogBufferNss = getLocalOplogBufferNamespace(reshardingUUID, {"shard0"});
+ auto oplogBufferNss = resharding::getLocalOplogBufferNamespace(reshardingUUID, {"shard0"});
const auto minFetchTimestamp = Timestamp{10, 0};
const auto oplogId1 = ReshardingDonorOplogId{{20, 0}, {18, 0}};
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
index 5213b170753..0a9027deea2 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
@@ -129,7 +129,7 @@ std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline&
numBytes += obj.objsize();
- if (isFinalOplog(entry)) {
+ if (resharding::isFinalOplog(entry)) {
// The ReshardingOplogFetcher should never insert documents after the reshardFinalOp
// entry. We defensively check each oplog entry for being the reshardFinalOp and confirm
// the pipeline has been exhausted.
@@ -185,7 +185,7 @@ ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getN
const auto& lastEntryInBatch = batch.back();
_resumeToken = getId(lastEntryInBatch);
- if (isFinalOplog(lastEntryInBatch)) {
+ if (resharding::isFinalOplog(lastEntryInBatch)) {
_hasSeenFinalOplogEntry = true;
// Skip returning the final oplog entry because it is known to be a no-op.
batch.pop_back();
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
index 26b7646283f..a0491b06e7c 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -95,7 +95,7 @@ public:
const BSONObj oField(BSON("msg"
<< "Created temporary resharding collection"));
const BSONObj o2Field(
- BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << UUID::gen()));
+ BSON("type" << resharding::kReshardFinalOpLogType << "reshardingUUID" << UUID::gen()));
return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId);
}
@@ -103,7 +103,7 @@ public:
ReshardingDonorOplogId oplogId(ts, ts);
const BSONObj oField(BSON("msg"
<< "Latest oplog ts from donor's cursor response"));
- const BSONObj o2Field(BSON("type" << kReshardProgressMark));
+ const BSONObj o2Field(BSON("type" << resharding::kReshardProgressMark));
return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId);
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index d4aae1cd424..bb47fe20b83 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -58,7 +58,7 @@ public:
const NamespaceString kOriginalNss = NamespaceString("db", "foo");
const NamespaceString kTemporaryReshardingNss =
- constructTemporaryReshardingNss("db", kExistingUUID);
+ resharding::constructTemporaryReshardingNss("db", kExistingUUID);
const std::string kOriginalShardKey = "oldKey";
const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1);
const std::string kReshardingKey = "newKey";
@@ -195,7 +195,7 @@ protected:
const boost::optional<Timestamp>& cloneTimestamp = boost::none) {
auto recipientFields =
TypeCollectionRecipientFields(donorShards, existingUUID, originalNss, 5000);
- emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp);
+ resharding::emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp);
fields.setRecipientFields(std::move(recipientFields));
}
@@ -620,10 +620,10 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta
OperationContext* opCtx = operationContext();
NamespaceString sourceNss1 = NamespaceString("db", "one");
NamespaceString tempReshardingNss1 =
- constructTemporaryReshardingNss(sourceNss1.db(), UUID::gen());
+ resharding::constructTemporaryReshardingNss(sourceNss1.db(), UUID::gen());
NamespaceString sourceNss2 = NamespaceString("db", "two");
NamespaceString tempReshardingNss2 =
- constructTemporaryReshardingNss(sourceNss2.db(), UUID::gen());
+ resharding::constructTemporaryReshardingNss(sourceNss2.db(), UUID::gen());
ShardId shardId1 = ShardId{"recipient1"};
ShardId shardId2 = ShardId{"recipient2"};
ReshardingDonorDocument doc1 =
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 868e240e5e0..b25b25185ce 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -715,7 +715,7 @@ void ReshardingDonorService::DonorStateMachine::
oplog.setObject(
BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
_metadata.getSourceNss().toString())));
- oplog.setObject2(BSON("type" << kReshardFinalOpLogType << "reshardingUUID"
+ oplog.setObject2(BSON("type" << resharding::kReshardFinalOpLogType << "reshardingUUID"
<< _metadata.getReshardingUUID()));
oplog.setOpTime(OplogSlot());
oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
@@ -856,7 +856,7 @@ void ReshardingDonorService::DonorStateMachine::_transitionToDonatingInitialData
void ReshardingDonorService::DonorStateMachine::_transitionToError(Status abortReason) {
auto newDonorCtx = _donorCtx;
newDonorCtx.setState(DonorStateEnum::kError);
- emplaceTruncatedAbortReasonIfExists(newDonorCtx, abortReason);
+ resharding::emplaceTruncatedAbortReasonIfExists(newDonorCtx, abortReason);
_transitionState(std::move(newDonorCtx));
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
index f0ffd9ef028..4d83cfe5e44 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -150,12 +150,12 @@ public:
NamespaceString sourceNss("sourcedb.sourcecollection");
auto sourceUUID = UUID::gen();
- auto commonMetadata =
- CommonReshardingMetadata(UUID::gen(),
- sourceNss,
- sourceUUID,
- constructTemporaryReshardingNss(sourceNss.db(), sourceUUID),
- BSON("newKey" << 1));
+ auto commonMetadata = CommonReshardingMetadata(
+ UUID::gen(),
+ sourceNss,
+ sourceUUID,
+ resharding::constructTemporaryReshardingNss(sourceNss.db(), sourceUUID),
+ BSON("newKey" << 1));
commonMetadata.setStartTime(getServiceContext()->getFastClockSource()->now());
doc.setCommonReshardingMetadata(std::move(commonMetadata));
@@ -350,7 +350,7 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl
DBDirectClient client(opCtx.get());
FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace};
- findRequest.setFilter(BSON("o2.type" << kReshardFinalOpLogType));
+ findRequest.setFilter(BSON("o2.type" << resharding::kReshardFinalOpLogType));
auto cursor = client.find(std::move(findRequest));
ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection";
@@ -712,7 +712,7 @@ TEST_F(ReshardingDonorServiceTest, TruncatesXLErrorOnDonorDocument) {
// to the primitive truncation algorithm - Check that the total size is less than
// kReshardErrorMaxBytes + a couple additional bytes to provide a buffer for the field
// name sizes.
- int maxReshardErrorBytesCeiling = kReshardErrorMaxBytes + 200;
+ int maxReshardErrorBytesCeiling = resharding::kReshardErrorMaxBytes + 200;
ASSERT_LT(persistedAbortReasonBSON->objsize(), maxReshardErrorBytesCeiling);
ASSERT_EQ(persistedAbortReasonBSON->getIntField("code"),
ErrorCodes::ReshardCollectionTruncatedError);
diff --git a/src/mongo/db/s/resharding/resharding_manual_cleanup.cpp b/src/mongo/db/s/resharding/resharding_manual_cleanup.cpp
index 9c2b78385fa..74911c8518f 100644
--- a/src/mongo/db/s/resharding/resharding_manual_cleanup.cpp
+++ b/src/mongo/db/s/resharding/resharding_manual_cleanup.cpp
@@ -48,8 +48,9 @@ namespace {
std::vector<ShardId> getAllParticipantsFromCoordDoc(const ReshardingCoordinatorDocument& doc) {
std::vector<ShardId> participants;
- auto donorShards = extractShardIdsFromParticipantEntriesAsSet(doc.getDonorShards());
- auto recipientShards = extractShardIdsFromParticipantEntriesAsSet(doc.getRecipientShards());
+ auto donorShards = resharding::extractShardIdsFromParticipantEntriesAsSet(doc.getDonorShards());
+ auto recipientShards =
+ resharding::extractShardIdsFromParticipantEntriesAsSet(doc.getRecipientShards());
std::set_union(donorShards.begin(),
donorShards.end(),
recipientShards.begin(),
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
index 48a81ba5183..e57581cf8dd 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
@@ -98,12 +98,12 @@ public:
}
CommonReshardingMetadata createCommonReshardingMetadata(const UUID& operationId) {
- CommonReshardingMetadata metadata{
- operationId,
- kTestNamespace,
- getSourceCollectionId(),
- constructTemporaryReshardingNss(kTestNamespace.db(), getSourceCollectionId()),
- kShardKey};
+ CommonReshardingMetadata metadata{operationId,
+ kTestNamespace,
+ getSourceCollectionId(),
+ resharding::constructTemporaryReshardingNss(
+ kTestNamespace.db(), getSourceCollectionId()),
+ kShardKey};
metadata.setStartTime(getClockSource()->now() - kRunningTime);
return metadata;
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index cf449c4c00c..d9edf786371 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -271,7 +271,7 @@ NamespaceString ReshardingOplogApplier::ensureStashCollectionExists(
const UUID& existingUUID,
const ShardId& donorShardId,
const CollectionOptions& options) {
- auto nss = getLocalConflictStashNamespace(existingUUID, donorShardId);
+ auto nss = resharding::getLocalConflictStashNamespace(existingUUID, donorShardId);
resharding::data_copy::ensureCollectionExists(opCtx, nss, options);
return nss;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
index 5ee0720d57b..c88884ac624 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
@@ -356,11 +356,13 @@ private:
const ShardId _otherDonorId{"otherDonorId"};
const NamespaceString _outputNss =
- constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
- const NamespaceString _myStashNss = getLocalConflictStashNamespace(_sourceUUID, _myDonorId);
+ resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
+ const NamespaceString _myStashNss =
+ resharding::getLocalConflictStashNamespace(_sourceUUID, _myDonorId);
const NamespaceString _otherStashNss =
- getLocalConflictStashNamespace(_sourceUUID, _otherDonorId);
- const NamespaceString _myOplogBufferNss = getLocalOplogBufferNamespace(_sourceUUID, _myDonorId);
+ resharding::getLocalConflictStashNamespace(_sourceUUID, _otherDonorId);
+ const NamespaceString _myOplogBufferNss =
+ resharding::getLocalOplogBufferNamespace(_sourceUUID, _myDonorId);
std::unique_ptr<ReshardingMetrics> _metrics;
std::unique_ptr<ReshardingOplogApplierMetrics> _applierMetrics;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
index fa3c7804902..a9a60cd9aa0 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
@@ -335,10 +335,11 @@ private:
const ShardId _otherDonorId{"otherDonorId"};
const NamespaceString _outputNss =
- constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
- const NamespaceString _myStashNss = getLocalConflictStashNamespace(_sourceUUID, _myDonorId);
+ resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
+ const NamespaceString _myStashNss =
+ resharding::getLocalConflictStashNamespace(_sourceUUID, _myDonorId);
const NamespaceString _otherStashNss =
- getLocalConflictStashNamespace(_sourceUUID, _otherDonorId);
+ resharding::getLocalConflictStashNamespace(_sourceUUID, _otherDonorId);
std::unique_ptr<ReshardingOplogApplicationRules> _applier;
std::unique_ptr<ReshardingMetrics> _metrics;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index 40f51e56e60..ac62a1cee4d 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -272,9 +272,9 @@ AggregateCommandRequest ReshardingOplogFetcher::_makeAggregateCommandRequest(
auto opCtx = opCtxRaii.get();
auto expCtx = _makeExpressionContext(opCtx);
- auto serializedPipeline =
- createOplogFetchingPipelineForResharding(expCtx, _startAt, _collUUID, _recipientShard)
- ->serializeToBson();
+ auto serializedPipeline = resharding::createOplogFetchingPipelineForResharding(
+ expCtx, _startAt, _collUUID, _recipientShard)
+ ->serializeToBson();
AggregateCommandRequest aggRequest(NamespaceString::kRsOplogNamespace,
std::move(serializedPipeline));
@@ -368,7 +368,7 @@ bool ReshardingOplogFetcher::consume(Client* client,
_onInsertFuture = std::move(f);
}
- if (isFinalOplog(nextOplog, _reshardingUUID)) {
+ if (resharding::isFinalOplog(nextOplog, _reshardingUUID)) {
moreToCome = false;
return false;
}
@@ -392,7 +392,7 @@ bool ReshardingOplogFetcher::consume(Client* client,
oplog.set_id(Value(startAt.toBSON()));
oplog.setObject(BSON("msg"
<< "Latest oplog ts from donor's cursor response"));
- oplog.setObject2(BSON("type" << kReshardProgressMark));
+ oplog.setObject2(BSON("type" << resharding::kReshardProgressMark));
oplog.setOpTime(OplogSlot());
oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index dc6f82ad701..68523519f41 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -298,7 +298,8 @@ public:
BSON(
"msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
dataColl.getCollection()->ns().toString())),
- BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << _reshardingUUID),
+ BSON("type" << resharding::kReshardFinalOpLogType << "reshardingUUID"
+ << _reshardingUUID),
boost::none,
boost::none,
boost::none,
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index d64e180304d..f52e3b8a3a7 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -728,8 +728,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
.then([this, &factory] {
auto opCtx = factory.makeOperationContext(&cc());
for (const auto& donor : _donorShards) {
- auto stashNss =
- getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId());
+ auto stashNss = resharding::getLocalConflictStashNamespace(
+ _metadata.getSourceUUID(), donor.getShardId());
AutoGetCollection stashColl(opCtx.get(), stashNss, MODE_IS);
uassert(5356800,
"Resharding completed with non-empty stash collections",
@@ -902,7 +902,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToError(
Status abortReason, const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kError);
- emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason);
+ resharding::emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason);
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
}
@@ -1168,10 +1168,10 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
progressDocList;
for (const auto& donor : _donorShards) {
{
- AutoGetCollection oplogBufferColl(
- opCtx.get(),
- getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()),
- MODE_IS);
+ AutoGetCollection oplogBufferColl(opCtx.get(),
+ resharding::getLocalOplogBufferNamespace(
+ _metadata.getSourceUUID(), donor.getShardId()),
+ MODE_IS);
if (oplogBufferColl) {
oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get());
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 22ae1f61542..b72f0ad34e8 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -252,12 +252,12 @@ public:
NamespaceString sourceNss("sourcedb", "sourcecollection");
auto sourceUUID = UUID::gen();
- auto commonMetadata =
- CommonReshardingMetadata(UUID::gen(),
- sourceNss,
- sourceUUID,
- constructTemporaryReshardingNss(sourceNss.db(), sourceUUID),
- newShardKeyPattern());
+ auto commonMetadata = CommonReshardingMetadata(
+ UUID::gen(),
+ sourceNss,
+ sourceUUID,
+ resharding::constructTemporaryReshardingNss(sourceNss.db(), sourceUUID),
+ newShardKeyPattern());
commonMetadata.setStartTime(getServiceContext()->getFastClockSource()->now());
doc.setCommonReshardingMetadata(std::move(commonMetadata));
@@ -627,7 +627,8 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryOnReshardDoneCatchUp)
ErrorCodes::InterruptedDueToReplStateChange);
DBDirectClient client(opCtx.get());
- NamespaceString sourceNss = constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID());
+ NamespaceString sourceNss =
+ resharding::constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID());
FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace};
findRequest.setFilter(
@@ -673,7 +674,8 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryForImplicitShardColle
ErrorCodes::InterruptedDueToReplStateChange);
DBDirectClient client(opCtx.get());
- NamespaceString sourceNss = constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID());
+ NamespaceString sourceNss =
+ resharding::constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID());
FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace};
findRequest.setFilter(
@@ -741,7 +743,7 @@ TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) {
// to the primitive truncation algorithm - Check that the total size is less than
// kReshardErrorMaxBytes + a couple additional bytes to provide a buffer for the field
// name sizes.
- int maxReshardErrorBytesCeiling = kReshardErrorMaxBytes + 200;
+ int maxReshardErrorBytesCeiling = resharding::kReshardErrorMaxBytes + 200;
ASSERT_LT(persistedAbortReasonBSON->objsize(), maxReshardErrorBytesCeiling);
ASSERT_EQ(persistedAbortReasonBSON->getIntField("code"),
ErrorCodes::ReshardCollectionTruncatedError);
@@ -817,7 +819,8 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
for (const auto& donor : donorShards) {
// Setup oplogBuffer collection.
ReshardingDonorOplogId donorOplogId{{20, i}, {19, 0}};
- insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()),
+ insertFn(resharding::getLocalOplogBufferNamespace(doc.getSourceUUID(),
+ donor.getShardId()),
InsertStatement{BSON("_id" << donorOplogId.toBSON())});
++i;
@@ -925,7 +928,7 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUpWithMissingProgr
// Setup oplogBuffer collection.
ReshardingDonorOplogId donorOplogId{{20, i}, {19, 0}};
- insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()),
+ insertFn(resharding::getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()),
InsertStatement{BSON("_id" << donorOplogId.toBSON())});
// Setup reshardingApplierProgress collection.
diff --git a/src/mongo/db/s/resharding/resharding_util.cpp b/src/mongo/db/s/resharding/resharding_util.cpp
index 4febc199126..873fc7ce5d5 100644
--- a/src/mongo/db/s/resharding/resharding_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_util.cpp
@@ -63,6 +63,7 @@
namespace mongo {
+namespace resharding {
namespace {
/**
@@ -414,4 +415,5 @@ boost::optional<Milliseconds> estimateRemainingRecipientTime(bool applyingBegan,
return {};
}
+} // namespace resharding
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_util.h b/src/mongo/db/s/resharding/resharding_util.h
index 194381e7e78..0d8aacbe3f7 100644
--- a/src/mongo/db/s/resharding/resharding_util.h
+++ b/src/mongo/db/s/resharding/resharding_util.h
@@ -50,6 +50,7 @@
#include "mongo/util/str.h"
namespace mongo {
+namespace resharding {
constexpr auto kReshardFinalOpLogType = "reshardFinalOp"_sd;
constexpr auto kReshardProgressMark = "reshardProgressMark"_sd;
@@ -324,5 +325,6 @@ std::vector<std::shared_ptr<Instance>> getReshardingStateMachines(OperationConte
return result;
}
+} // namespace resharding
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_util_test.cpp b/src/mongo/db/s/resharding/resharding_util_test.cpp
index 5fd40fd86b7..12e5e15ddcd 100644
--- a/src/mongo/db/s/resharding/resharding_util_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_util_test.cpp
@@ -52,6 +52,7 @@
namespace mongo {
+namespace resharding {
namespace {
class ReshardingUtilTest : public ConfigServerTestFixture {
@@ -309,4 +310,7 @@ TEST_F(ReshardingTxnCloningPipelineTest, TxnPipelineAfterID) {
}
} // namespace
+
+} // namespace resharding
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
index e74155e374b..c5f6a831b0f 100644
--- a/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
+++ b/src/mongo/db/s/sharding_data_transform_instance_metrics.cpp
@@ -118,7 +118,8 @@ ShardingDataTransformInstanceMetrics::~ShardingDataTransformInstanceMetrics() {
Milliseconds ShardingDataTransformInstanceMetrics::getHighEstimateRemainingTimeMillis() const {
switch (_role) {
case Role::kRecipient: {
- auto estimate = estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate,
+ auto estimate =
+ resharding::estimateRemainingRecipientTime(_applyingStartTime.load() != kNoDate,
_bytesCopied.load(),
_approxBytesToCopy.load(),
getCopyingElapsedTimeSecs(),
diff --git a/src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp b/src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp
index 4e95395faaa..f0918cc5766 100644
--- a/src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp
+++ b/src/mongo/db/s/shardsvr_abort_reshard_collection_command.cpp
@@ -99,7 +99,7 @@ public:
// If abort actually went through, the resharding documents should be cleaned up.
// If they still exists, it could be because that it was interrupted or it is no
// longer primary.
- doNoopWrite(opCtx, "_shardsvrAbortReshardCollection no-op", ns());
+ resharding::doNoopWrite(opCtx, "_shardsvrAbortReshardCollection no-op", ns());
PersistentTaskStore<CommonReshardingMetadata> donorReshardingOpStore(
NamespaceString::kDonorReshardingOperationsNamespace);
uassert(5563802,
diff --git a/src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp b/src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp
index f4240c1eb0a..3d9be030fcb 100644
--- a/src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp
+++ b/src/mongo/db/s/shardsvr_commit_reshard_collection_command.cpp
@@ -107,7 +107,7 @@ public:
// If commit actually went through, the resharding documents will be cleaned up. If
// documents still exist, it could be because that commit was interrupted or that the
// underlying replica set node is no longer primary.
- doNoopWrite(opCtx, "_shardsvrCommitReshardCollection no-op", ns());
+ resharding::doNoopWrite(opCtx, "_shardsvrCommitReshardCollection no-op", ns());
PersistentTaskStore<CommonReshardingMetadata> donorReshardingOpStore(
NamespaceString::kDonorReshardingOperationsNamespace);
uassert(5795302,
diff --git a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
index 4c3e05a7879..56bf7b644f3 100644
--- a/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
+++ b/src/mongo/db/s/shardsvr_resharding_operation_time_command.cpp
@@ -108,10 +108,9 @@ public:
}
Response typedRun(OperationContext* opCtx) {
- auto instances =
- getReshardingStateMachines<ReshardingRecipientService,
- ReshardingRecipientService::RecipientStateMachine>(opCtx,
- ns());
+ auto instances = resharding::getReshardingStateMachines<
+ ReshardingRecipientService,
+ ReshardingRecipientService::RecipientStateMachine>(opCtx, ns());
if (instances.empty()) {
return Response{boost::none, boost::none};
}