diff options
author | Blake Oler <blake.oler@mongodb.com> | 2020-10-15 02:28:28 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-05 21:21:11 +0000 |
commit | 20b397389eedea51faf8699790eb62963db40d7d (patch) | |
tree | 897ac87824dae7de1222664a349220cd938aca4b /src/mongo/db | |
parent | 0e400a736bacfa291dbda2971f7381ad392000c9 (diff) | |
download | mongo-20b397389eedea51faf8699790eb62963db40d7d.tar.gz |
SERVER-51398 Write state transition and other data from resharding donor/recipient shards to the coordinator
Diffstat (limited to 'src/mongo/db')
8 files changed, 83 insertions, 28 deletions
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index 1f4e3fec8df..ccebb894ecf 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -209,11 +209,15 @@ public: instance->setInitialChunksAndZones(std::move(initialChunks), std::move(newZones)); - // This promise will currently be falsely fulfilled by a call to interrupt() inside - // the ReshardingCoordinatorService. This is to enable jsTests to pass while code - // is still being committed. - // TODO SERVER-51398 Change this comment and assess the current call to .wait(). instance->getObserver()->awaitAllDonorsReadyToDonate().wait(); + // This promise is currently automatically filled by recipient shards after creating + // their ReshardingRecipientStateMachines. + // TODO SERVER-51217 Update this comment to reflect that the temporary collection + // has been created. + instance->getObserver()->awaitAllRecipientsFinishedApplying().wait(); + + instance->interrupt( + {ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } private: diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 9af822b3d87..7d81fe332c7 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -407,6 +407,7 @@ stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNofityEnum> notifyForSta {CoordinatorStateEnum::kInitializing, ParticipantsToNofityEnum::kNone}, {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNofityEnum::kDonors}, {CoordinatorStateEnum::kCloning, ParticipantsToNofityEnum::kRecipients}, + {CoordinatorStateEnum::kApplying, ParticipantsToNofityEnum::kDonors}, {CoordinatorStateEnum::kMirroring, ParticipantsToNofityEnum::kDonors}, {CoordinatorStateEnum::kCommitted, ParticipantsToNofityEnum::kNone}, {CoordinatorStateEnum::kRenaming, ParticipantsToNofityEnum::kRecipients}, @@ -767,9 +768,6 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat return ExecutorFuture<void>(**executor, Status::OK()); } - // TODO SERVER-51398 Remove this call. - interrupt({ErrorCodes::InternalError, "Early exit to support jsTesting"}); - return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate() .thenRunOn(**executor) .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 8b848a95271..6acfb708340 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -144,8 +144,8 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( createDonorShardMirroringEntriesFromDonorShardIds( reshardingFields.getRecipientFields()->getDonorShardIds()); - auto recipientDoc = - ReshardingRecipientDocument(RecipientStateEnum::kCloning, std::move(donorShards)); + auto recipientDoc = ReshardingRecipientDocument(RecipientStateEnum::kCreatingCollection, + std::move(donorShards)); auto commonMetadata = CommonReshardingMetadata(reshardingFields.getUuid(), diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index f479d02942e..9df204a68e6 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -181,7 +181,7 @@ protected: metadata.getShardKeyPattern().toBSON(), recipientDoc); - ASSERT(recipientDoc.getState() == RecipientStateEnum::kCloning); + ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection); ASSERT(recipientDoc.getFetchTimestamp() == reshardingFields.getRecipientFields()->getFetchTimestamp()); @@ -294,6 +294,8 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) { ReshardingDonorDocument>(opCtx, kReshardingUUID); ASSERT(donorStateMachine != boost::none); + + donorStateMachine.get()->interrupt({ErrorCodes::InternalError, "Shut down for test"}); } TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) { @@ -315,6 +317,8 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) { kReshardingUUID); ASSERT(recipientStateMachine != boost::none); + + recipientStateMachine.get()->interrupt({ErrorCodes::InternalError, "Shut down for test"}); } TEST_F(ReshardingDonorRecipientCommonTest, diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 17c2bb431b8..8135fbfbaa3 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -37,7 +37,10 @@ #include "mongo/db/op_observer.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/grid.h" namespace mongo { namespace { @@ -103,7 +106,6 @@ ReshardingDonorService::DonorStateMachine::~DonorStateMachine() { void ReshardingDonorService::DonorStateMachine::run( std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) - .then([this] { _transitionToPreparingToDonate(); }) .then( [this] { _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData(); }) .then([this, executor] { @@ -166,14 +168,6 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) { void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( boost::optional<TypeCollectionReshardingFields> reshardingFields) {} -void ReshardingDonorService::DonorStateMachine::_transitionToPreparingToDonate() { - if (_donorDoc.getState() > DonorStateEnum::kUnused) { - return; - } - - _transitionState(DonorStateEnum::kPreparingToDonate); -} - void ReshardingDonorService::DonorStateMachine:: _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() { if (_donorDoc.getState() > DonorStateEnum::kPreparingToDonate) { @@ -181,8 +175,13 @@ void ReshardingDonorService::DonorStateMachine:: return; } + _insertDonorDocument(_donorDoc); + auto minFetchTimestamp = generateMinFetchTimestamp(_donorDoc); - _transitionState(DonorStateEnum::kDonatingInitialData, minFetchTimestamp); + _transitionStateAndUpdateCoordinator(DonorStateEnum::kDonatingInitialData, minFetchTimestamp); + + // TODO SERVER-XXXXX Remove this line. + interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: @@ -193,7 +192,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: } return _allRecipientsDoneCloning.getFuture().thenRunOn(**executor).then([this]() { - _transitionState(DonorStateEnum::kDonatingOplogEntries); + _transitionStateAndUpdateCoordinator(DonorStateEnum::kDonatingOplogEntries); }); } @@ -235,18 +234,13 @@ void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenDelet return; } - _transitionState(DonorStateEnum::kDone); + _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone); } void ReshardingDonorService::DonorStateMachine::_transitionState( DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp) { ReshardingDonorDocument replacementDoc(_donorDoc); replacementDoc.setState(endState); - if (endState == DonorStateEnum::kPreparingToDonate) { - _insertDonorDocument(std::move(replacementDoc)); - return; - } - if (minFetchTimestamp) { auto& minFetchTimestampStruct = replacementDoc.getMinFetchTimestampStruct(); if (minFetchTimestampStruct.getMinFetchTimestamp()) @@ -259,6 +253,31 @@ void ReshardingDonorService::DonorStateMachine::_transitionState( _updateDonorDocument(std::move(replacementDoc)); } +void ReshardingDonorService::DonorStateMachine::_transitionStateAndUpdateCoordinator( + DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp) { + _transitionState(endState, minFetchTimestamp); + + auto opCtx = cc().makeOperationContext(); + auto shardId = ShardingState::get(opCtx.get())->shardId(); + + BSONObjBuilder updateBuilder; + updateBuilder.append("donorShards.$.state", DonorState_serializer(endState)); + + if (minFetchTimestamp) { + updateBuilder.append("donorShards.$.minFetchTimestamp", minFetchTimestamp.get()); + } + + uassertStatusOK( + Grid::get(opCtx.get()) + ->catalogClient() + ->updateConfigDocument(opCtx.get(), + NamespaceString::kConfigReshardingOperationsNamespace, + BSON("_id" << _donorDoc.get_id() << "donorShards.id" << shardId), + BSON("$set" << updateBuilder.done()), + false /* upsert */, + ShardingCatalogClient::kMajorityWriteConcern)); +} + void ReshardingDonorService::DonorStateMachine::_transitionStateToError(const Status& status) { ReshardingDonorDocument replacementDoc(_donorDoc); replacementDoc.setState(DonorStateEnum::kError); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index dfd37678c94..ff34a651894 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -120,6 +120,9 @@ private: void _transitionState(DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp = boost::none); + void _transitionStateAndUpdateCoordinator( + DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp = boost::none); + // Transitions the state on-disk and in-memory to kError. void _transitionStateToError(const Status& status); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 07980cd9740..25c87941614 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -38,6 +38,7 @@ #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/s/migration_destination_manager.h" #include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/cluster_commands_helpers.h" @@ -234,7 +235,8 @@ void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSt return; } - _transitionState(RecipientStateEnum::kSteadyState); + _transitionStateAndUpdateCoordinator(RecipientStateEnum::kSteadyState); + interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: @@ -284,6 +286,29 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( _updateRecipientDocument(std::move(replacementDoc)); } +void ReshardingRecipientService::RecipientStateMachine::_transitionStateAndUpdateCoordinator( + RecipientStateEnum endState) { + _transitionState(endState, boost::none); + + auto opCtx = cc().makeOperationContext(); + + auto shardId = ShardingState::get(opCtx.get())->shardId(); + + BSONObjBuilder updateBuilder; + updateBuilder.append("recipientShards.$.state", RecipientState_serializer(endState)); + + uassertStatusOK( + Grid::get(opCtx.get()) + ->catalogClient() + ->updateConfigDocument( + opCtx.get(), + NamespaceString::kConfigReshardingOperationsNamespace, + BSON("_id" << _recipientDoc.get_id() << "recipientShards.id" << shardId), + BSON("$set" << updateBuilder.done()), + false /* upsert */, + ShardingCatalogClient::kMajorityWriteConcern)); +} + void ReshardingRecipientService::RecipientStateMachine::_transitionStateToError( const Status& status) { ReshardingRecipientDocument replacementDoc(_recipientDoc); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index da95ff3513f..9c53b20ca8a 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -135,6 +135,8 @@ private: void _transitionState(RecipientStateEnum endState, boost::optional<Timestamp> fetchTimestamp = boost::none); + void _transitionStateAndUpdateCoordinator(RecipientStateEnum endState); + // Transitions the state on-disk and in-memory to kError. void _transitionStateToError(const Status& status); |