diff options
author | Blake Oler <blake.oler@mongodb.com> | 2020-10-15 02:28:28 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-05 21:21:11 +0000 |
commit | 20b397389eedea51faf8699790eb62963db40d7d (patch) | |
tree | 897ac87824dae7de1222664a349220cd938aca4b | |
parent | 0e400a736bacfa291dbda2971f7381ad392000c9 (diff) | |
download | mongo-20b397389eedea51faf8699790eb62963db40d7d.tar.gz |
SERVER-51398 Write state transition and other data from resharding donor/recipient shards to the coordinator
9 files changed, 106 insertions, 57 deletions
diff --git a/jstests/sharding/reshard_collection_basic.js b/jstests/sharding/reshard_collection_basic.js index 0bb5537f14f..c536503c82a 100644 --- a/jstests/sharding/reshard_collection_basic.js +++ b/jstests/sharding/reshard_collection_basic.js @@ -12,15 +12,26 @@ const collName = '.foo'; const ns = kDbName + collName; const mongos = st.s0; +let removeAllReshardingCollections = () => { + mongos.getDB(kDbName).foo.drop(); + mongos.getDB('config').reshardingOperations.remove({nss: ns}); + mongos.getDB('config').collections.remove({reshardingFields: {$exists: true}}); + st.rs0.getPrimary().getDB('config').localReshardingOperations.donor.remove({nss: ns}); + st.rs0.getPrimary().getDB('config').localReshardingOperations.recipient.remove({nss: ns}); + st.rs1.getPrimary().getDB('config').localReshardingOperations.donor.remove({nss: ns}); + st.rs1.getPrimary().getDB('config').localReshardingOperations.recipient.remove({nss: ns}); +}; + // Fail if sharding is disabled. assert.commandFailedWithCode(mongos.adminCommand({reshardCollection: ns, key: {_id: 1}}), ErrorCodes.NamespaceNotFound); +assert.commandWorked(mongos.adminCommand({enableSharding: kDbName})); + // Fail if collection is unsharded. assert.commandFailedWithCode(mongos.adminCommand({reshardCollection: ns, key: {_id: 1}}), - ErrorCodes.NamespaceNotFound); + ErrorCodes.NamespaceNotSharded); -assert.commandWorked(mongos.adminCommand({enableSharding: kDbName})); assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}})); // Fail if missing required key. @@ -35,7 +46,7 @@ assert.commandFailedWithCode( assert.commandWorked( mongos.adminCommand({reshardCollection: ns, key: {_id: 1}, collation: {locale: 'simple'}})); -mongos.getDB(kDbName).foo.drop(); +removeAllReshardingCollections(); assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}})); @@ -44,7 +55,8 @@ assert.commandFailedWithCode( mongos.adminCommand({reshardCollection: ns, key: {_id: 1}, unique: true}), ErrorCodes.BadValue); // Succeed if unique is specified and is false. assert.commandWorked(mongos.adminCommand({reshardCollection: ns, key: {_id: 1}, unique: false})); -mongos.getDB(kDbName).foo.drop(); + +removeAllReshardingCollections(); // Succeed if _presetReshardedChunks is provided and test commands are enabled (default). assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}})); @@ -54,7 +66,8 @@ assert.commandWorked(mongos.adminCommand({ _presetReshardedChunks: [{recipientShardId: st.shard1.shardName, min: {_id: MinKey}, max: {_id: MaxKey}}] })); -mongos.getDB(kDbName).foo.drop(); + +removeAllReshardingCollections(); assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}})); @@ -80,7 +93,8 @@ assert.commandWorked(mongos.adminCommand({ collation: {locale: 'simple'}, numInitialChunks: 2, })); -mongos.getDB(kDbName).foo.drop(); + +removeAllReshardingCollections(); // Succeed if all optional fields and _presetReshardedChunks are provided with correct values and // test commands are enabled (default). @@ -95,7 +109,8 @@ assert.commandWorked(mongos.adminCommand({ {recipientShardId: st.shard0.shardName, min: {_id: MinKey}, max: {_id: 0}} ] })); -mongos.getDB(kDbName).foo.drop(); + +removeAllReshardingCollections(); const existingZoneName = 'x1'; @@ -141,28 +156,7 @@ assert.commandWorked(mongos.adminCommand({ ] })); -assert.commandWorked(mongos.getDB(kDbName).dropDatabase()); - -// TODO remove test case below once the resharding command actually writes to config.collections -// itself Test that shards correctly cache 'reshardingFields' in config.cache.collections -assert.commandWorked(mongos.adminCommand({enableSharding: kDbName})); -st.ensurePrimaryShard(kDbName, st.shard0.shardName); -assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}})); - -let donorReshardingFields = { - "uuid": UUID(), - "state": "initializing", - "donorFields": {"reshardingKey": {x: 1}} -}; -assert.commandWorked(st.configRS.getPrimary().getDB("config").collections.update( - {_id: ns}, {"$set": {"reshardingFields": donorReshardingFields}})); - -// Run moveChunk to force shard0 to pick up the new info in config.collections -assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard1.shardName})); - -let cachedEntry = st.rs0.getPrimary().getDB('config').cache.collections.findOne({_id: ns}); -assert.docEq(cachedEntry.reshardingFields, donorReshardingFields); +removeAllReshardingCollections(); -assert.commandWorked(mongos.getDB(kDbName).dropDatabase()); st.stop(); })(); diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index 1f4e3fec8df..ccebb894ecf 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -209,11 +209,15 @@ public: instance->setInitialChunksAndZones(std::move(initialChunks), std::move(newZones)); - // This promise will currently be falsely fulfilled by a call to interrupt() inside - // the ReshardingCoordinatorService. This is to enable jsTests to pass while code - // is still being committed. - // TODO SERVER-51398 Change this comment and assess the current call to .wait(). instance->getObserver()->awaitAllDonorsReadyToDonate().wait(); + // This promise is currently automatically filled by recipient shards after creating + // their ReshardingRecipientStateMachines. + // TODO SERVER-51217 Update this comment to reflect that the temporary collection + // has been created. + instance->getObserver()->awaitAllRecipientsFinishedApplying().wait(); + + instance->interrupt( + {ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } private: diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 9af822b3d87..7d81fe332c7 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -407,6 +407,7 @@ stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNofityEnum> notifyForSta {CoordinatorStateEnum::kInitializing, ParticipantsToNofityEnum::kNone}, {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNofityEnum::kDonors}, {CoordinatorStateEnum::kCloning, ParticipantsToNofityEnum::kRecipients}, + {CoordinatorStateEnum::kApplying, ParticipantsToNofityEnum::kDonors}, {CoordinatorStateEnum::kMirroring, ParticipantsToNofityEnum::kDonors}, {CoordinatorStateEnum::kCommitted, ParticipantsToNofityEnum::kNone}, {CoordinatorStateEnum::kRenaming, ParticipantsToNofityEnum::kRecipients}, @@ -767,9 +768,6 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat return ExecutorFuture<void>(**executor, Status::OK()); } - // TODO SERVER-51398 Remove this call. - interrupt({ErrorCodes::InternalError, "Early exit to support jsTesting"}); - return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate() .thenRunOn(**executor) .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 8b848a95271..6acfb708340 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -144,8 +144,8 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( createDonorShardMirroringEntriesFromDonorShardIds( reshardingFields.getRecipientFields()->getDonorShardIds()); - auto recipientDoc = - ReshardingRecipientDocument(RecipientStateEnum::kCloning, std::move(donorShards)); + auto recipientDoc = ReshardingRecipientDocument(RecipientStateEnum::kCreatingCollection, + std::move(donorShards)); auto commonMetadata = CommonReshardingMetadata(reshardingFields.getUuid(), diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index f479d02942e..9df204a68e6 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -181,7 +181,7 @@ protected: metadata.getShardKeyPattern().toBSON(), recipientDoc); - ASSERT(recipientDoc.getState() == RecipientStateEnum::kCloning); + ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection); ASSERT(recipientDoc.getFetchTimestamp() == reshardingFields.getRecipientFields()->getFetchTimestamp()); @@ -294,6 +294,8 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) { ReshardingDonorDocument>(opCtx, kReshardingUUID); ASSERT(donorStateMachine != boost::none); + + donorStateMachine.get()->interrupt({ErrorCodes::InternalError, "Shut down for test"}); } TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) { @@ -315,6 +317,8 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) { kReshardingUUID); ASSERT(recipientStateMachine != boost::none); + + recipientStateMachine.get()->interrupt({ErrorCodes::InternalError, "Shut down for test"}); } TEST_F(ReshardingDonorRecipientCommonTest, diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 17c2bb431b8..8135fbfbaa3 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -37,7 +37,10 @@ #include "mongo/db/op_observer.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/grid.h" namespace mongo { namespace { @@ -103,7 +106,6 @@ ReshardingDonorService::DonorStateMachine::~DonorStateMachine() { void ReshardingDonorService::DonorStateMachine::run( std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) - .then([this] { _transitionToPreparingToDonate(); }) .then( [this] { _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData(); }) .then([this, executor] { @@ -166,14 +168,6 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) { void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( boost::optional<TypeCollectionReshardingFields> reshardingFields) {} -void ReshardingDonorService::DonorStateMachine::_transitionToPreparingToDonate() { - if (_donorDoc.getState() > DonorStateEnum::kUnused) { - return; - } - - _transitionState(DonorStateEnum::kPreparingToDonate); -} - void ReshardingDonorService::DonorStateMachine:: _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() { if (_donorDoc.getState() > DonorStateEnum::kPreparingToDonate) { @@ -181,8 +175,13 @@ void ReshardingDonorService::DonorStateMachine:: return; } + _insertDonorDocument(_donorDoc); + auto minFetchTimestamp = generateMinFetchTimestamp(_donorDoc); - _transitionState(DonorStateEnum::kDonatingInitialData, minFetchTimestamp); + _transitionStateAndUpdateCoordinator(DonorStateEnum::kDonatingInitialData, minFetchTimestamp); + + // TODO SERVER-XXXXX Remove this line. + interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: @@ -193,7 +192,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: } return _allRecipientsDoneCloning.getFuture().thenRunOn(**executor).then([this]() { - _transitionState(DonorStateEnum::kDonatingOplogEntries); + _transitionStateAndUpdateCoordinator(DonorStateEnum::kDonatingOplogEntries); }); } @@ -235,18 +234,13 @@ void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenDelet return; } - _transitionState(DonorStateEnum::kDone); + _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone); } void ReshardingDonorService::DonorStateMachine::_transitionState( DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp) { ReshardingDonorDocument replacementDoc(_donorDoc); replacementDoc.setState(endState); - if (endState == DonorStateEnum::kPreparingToDonate) { - _insertDonorDocument(std::move(replacementDoc)); - return; - } - if (minFetchTimestamp) { auto& minFetchTimestampStruct = replacementDoc.getMinFetchTimestampStruct(); if (minFetchTimestampStruct.getMinFetchTimestamp()) @@ -259,6 +253,31 @@ void ReshardingDonorService::DonorStateMachine::_transitionState( _updateDonorDocument(std::move(replacementDoc)); } +void ReshardingDonorService::DonorStateMachine::_transitionStateAndUpdateCoordinator( + DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp) { + _transitionState(endState, minFetchTimestamp); + + auto opCtx = cc().makeOperationContext(); + auto shardId = ShardingState::get(opCtx.get())->shardId(); + + BSONObjBuilder updateBuilder; + updateBuilder.append("donorShards.$.state", DonorState_serializer(endState)); + + if (minFetchTimestamp) { + updateBuilder.append("donorShards.$.minFetchTimestamp", minFetchTimestamp.get()); + } + + uassertStatusOK( + Grid::get(opCtx.get()) + ->catalogClient() + ->updateConfigDocument(opCtx.get(), + NamespaceString::kConfigReshardingOperationsNamespace, + BSON("_id" << _donorDoc.get_id() << "donorShards.id" << shardId), + BSON("$set" << updateBuilder.done()), + false /* upsert */, + ShardingCatalogClient::kMajorityWriteConcern)); +} + void ReshardingDonorService::DonorStateMachine::_transitionStateToError(const Status& status) { ReshardingDonorDocument replacementDoc(_donorDoc); replacementDoc.setState(DonorStateEnum::kError); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index dfd37678c94..ff34a651894 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -120,6 +120,9 @@ private: void _transitionState(DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp = boost::none); + void _transitionStateAndUpdateCoordinator( + DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp = boost::none); + // Transitions the state on-disk and in-memory to kError. void _transitionStateToError(const Status& status); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 07980cd9740..25c87941614 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -38,6 +38,7 @@ #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/s/migration_destination_manager.h" #include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/cluster_commands_helpers.h" @@ -234,7 +235,8 @@ void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSt return; } - _transitionState(RecipientStateEnum::kSteadyState); + _transitionStateAndUpdateCoordinator(RecipientStateEnum::kSteadyState); + interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: @@ -284,6 +286,29 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( _updateRecipientDocument(std::move(replacementDoc)); } +void ReshardingRecipientService::RecipientStateMachine::_transitionStateAndUpdateCoordinator( + RecipientStateEnum endState) { + _transitionState(endState, boost::none); + + auto opCtx = cc().makeOperationContext(); + + auto shardId = ShardingState::get(opCtx.get())->shardId(); + + BSONObjBuilder updateBuilder; + updateBuilder.append("recipientShards.$.state", RecipientState_serializer(endState)); + + uassertStatusOK( + Grid::get(opCtx.get()) + ->catalogClient() + ->updateConfigDocument( + opCtx.get(), + NamespaceString::kConfigReshardingOperationsNamespace, + BSON("_id" << _recipientDoc.get_id() << "recipientShards.id" << shardId), + BSON("$set" << updateBuilder.done()), + false /* upsert */, + ShardingCatalogClient::kMajorityWriteConcern)); +} + void ReshardingRecipientService::RecipientStateMachine::_transitionStateToError( const Status& status) { ReshardingRecipientDocument replacementDoc(_recipientDoc); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index da95ff3513f..9c53b20ca8a 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -135,6 +135,8 @@ private: void _transitionState(RecipientStateEnum endState, boost::optional<Timestamp> fetchTimestamp = boost::none); + void _transitionStateAndUpdateCoordinator(RecipientStateEnum endState); + // Transitions the state on-disk and in-memory to kError. void _transitionStateToError(const Status& status); |