diff options
author | Blake Oler <blake.oler@mongodb.com> | 2020-08-17 01:34:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-02 06:09:17 +0000 |
commit | 6e40b75f137d4d4abebd5d0f3395bd96549ddc6d (patch) | |
tree | cdb1fe565547e536a841cc7d16814469f4eb93cf | |
parent | 68743ffeff5e4f9906ecd90395c80e6c1df31377 (diff) | |
download | mongo-6e40b75f137d4d4abebd5d0f3395bd96549ddc6d.tar.gz |
SERVER-49564 Create ReshardingDonorService
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service.cpp | 143 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service.h | 112 | ||||
-rw-r--r-- | src/mongo/s/resharding/common_types.idl | 4 | ||||
-rw-r--r-- | src/mongo/util/uuid.h | 1 |
7 files changed, 267 insertions, 3 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 29bbda32cdd..9660368c23e 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -87,6 +87,10 @@ const NamespaceString NamespaceString::kRangeDeletionNamespace(NamespaceString:: "rangeDeletions"); const NamespaceString NamespaceString::kConfigReshardingOperationsNamespace( NamespaceString::kConfigDb, "reshardingOperations"); + +const NamespaceString NamespaceString::kDonorReshardingOperationsNamespace( + NamespaceString::kConfigDb, "localReshardingOperations.donor"); + const NamespaceString NamespaceString::kConfigSettingsNamespace(NamespaceString::kConfigDb, "settings"); const NamespaceString NamespaceString::kVectorClockNamespace(NamespaceString::kConfigDb, diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 33c6ea13668..46811e5e6b6 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -118,9 +118,12 @@ public: // Namespace for pending range deletions. static const NamespaceString kRangeDeletionNamespace; - // Namespace for resharding operation state. + // Namespace for the config server's resharding operation state. static const NamespaceString kConfigReshardingOperationsNamespace; + // Namespace for the donor shard's local resharding operation state. + static const NamespaceString kDonorReshardingOperationsNamespace; + // Namespace for balancer settings and default read and write concerns. static const NamespaceString kConfigSettingsNamespace; diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index c00293a975d..83629f46e81 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -69,6 +69,7 @@ env.Library( 'read_only_catalog_cache_loader.cpp', 'resharding/resharding_coordinator_observer.cpp', 'resharding/resharding_coordinator_service.cpp', + 'resharding/resharding_donor_service.cpp', 'scoped_operation_completion_sharding_actions.cpp', 'session_catalog_migration_destination.cpp', 'session_catalog_migration_source.cpp', diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp new file mode 100644 index 00000000000..84f176a0b01 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -0,0 +1,143 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration + +#include "mongo/db/s/resharding/resharding_donor_service.h" + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::constructInstance( + BSONObj initialState) const { + return std::make_shared<DonorStateMachine>(std::move(initialState)); +} + +DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc) + : repl::PrimaryOnlyService::TypedInstance<Instance>(), + _donorDoc(ReshardingDonorDocument::parse(IDLParserErrorContext("ReshardingDonorDocument"), + donorDoc)), + _id(_donorDoc.getCommonReshardingMetadata().get_id()) {} + +SemiFuture<void> DonorStateMachine::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { + return ExecutorFuture<void>(**executor) + .then([this] { _onInitializingCalculateMinFetchTimestampThenBeginDonating(); }) + .then([this, executor] { + return _awaitAllRecipientsDoneApplyingThenStartMirroring(executor); + }) + .then([this, executor] { + return _awaitCoordinatorHasCommittedThenTransitionToDropping(executor); + }) + .then([this] { return _dropOriginalCollectionThenDeleteLocalState(); }) + .onError([this](Status status) { + LOGV2(4956400, + "Resharding operation donor state machine failed", + "namespace"_attr = _donorDoc.getNss().ns(), + "reshardingId"_attr = _id, + "error"_attr = status); + // TODO SERVER-50584 Report errors to the coordinator so that the resharding operation + // can be aborted. + this->_transitionStateToError(status); + return status; + }) + .semi(); +} + +void DonorStateMachine::onReshardingFieldsChanges( + boost::optional<TypeCollectionReshardingFields> reshardingFields) {} + +void DonorStateMachine::_onInitializingCalculateMinFetchTimestampThenBeginDonating() { + if (_donorDoc.getState() > DonorStateEnum::kInitializing) { + return; + } + + // TODO SERVER-50021 Calculate minFetchTimestamp and send to coordinator. + + _transitionState(DonorStateEnum::kDonating); +} + +ExecutorFuture<void> DonorStateMachine::_awaitAllRecipientsDoneApplyingThenStartMirroring( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_donorDoc.getState() > DonorStateEnum::kDonating) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _allRecipientsDoneApplying.getFuture().thenRunOn(**executor).then([this]() { + _transitionState(DonorStateEnum::kMirroring); + }); +} + +ExecutorFuture<void> DonorStateMachine::_awaitCoordinatorHasCommittedThenTransitionToDropping( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_donorDoc.getState() > DonorStateEnum::kMirroring) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _coordinatorHasCommitted.getFuture().thenRunOn(**executor).then([this]() { + _transitionState(DonorStateEnum::kDropping); + }); +} + +void DonorStateMachine::_dropOriginalCollectionThenDeleteLocalState() { + if (_donorDoc.getState() > DonorStateEnum::kDropping) { + return; + } + + _transitionState(DonorStateEnum::kDone); +} + +void DonorStateMachine::_transitionState(DonorStateEnum endState) { + ReshardingDonorDocument replacementDoc(_donorDoc); + replacementDoc.setState(endState); + _updateDonorDocument(std::move(replacementDoc)); +} + +void DonorStateMachine::_transitionStateToError(const Status& status) { + ReshardingDonorDocument replacementDoc(_donorDoc); + replacementDoc.setState(DonorStateEnum::kError); + _updateDonorDocument(std::move(replacementDoc)); +} + +void DonorStateMachine::_updateDonorDocument(ReshardingDonorDocument&& replacementDoc) { + auto opCtx = cc().makeOperationContext(); + PersistentTaskStore<ReshardingDonorDocument> store( + NamespaceString::kDonorReshardingOperationsNamespace); + store.update(opCtx.get(), + BSON(ReshardingDonorDocument::k_idFieldName << _id), + replacementDoc.toBSON(), + WriteConcerns::kMajorityWriteConcern); + + _donorDoc = replacementDoc; +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h new file mode 100644 index 00000000000..8fac56be8bb --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/s/resharding/donor_document_gen.h" +#include "mongo/s/resharding/type_collection_fields_gen.h" + +namespace mongo { +constexpr StringData kReshardingDonorServiceName = "ReshardingDonorService"_sd; + +class ReshardingDonorService final : public repl::PrimaryOnlyService { +public: + explicit ReshardingDonorService(ServiceContext* serviceContext) + : PrimaryOnlyService(serviceContext) {} + ~ReshardingDonorService() = default; + + StringData getServiceName() const override { + return kReshardingDonorServiceName; + } + + NamespaceString getStateDocumentsNS() const override { + return NamespaceString::kDonorReshardingOperationsNamespace; + } + + ThreadPool::Limits getThreadPoolLimits() const override { + // TODO Limit the size of ReshardingDonorService thread pool. + return ThreadPool::Limits(); + } + + std::shared_ptr<PrimaryOnlyService::Instance> constructInstance( + BSONObj initialState) const override; +}; + +/** + * Represents the current state of a resharding donor operation on this shard. This class drives + * state transitions and updates to underlying on-disk metadata. + */ +class DonorStateMachine final + : public repl::PrimaryOnlyService::TypedInstance<ReshardingDonorService::Instance> { +public: + explicit DonorStateMachine(const BSONObj& donorDoc); + + virtual SemiFuture<void> run( + std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override; + + void onReshardingFieldsChanges( + boost::optional<TypeCollectionReshardingFields> reshardingFields); + +private: + // The following functions correspond to the actions to take at a particular donor state. + void _onInitializingCalculateMinFetchTimestampThenBeginDonating(); + + ExecutorFuture<void> _awaitAllRecipientsDoneApplyingThenStartMirroring( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + ExecutorFuture<void> _awaitCoordinatorHasCommittedThenTransitionToDropping( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + void _dropOriginalCollectionThenDeleteLocalState(); + + // Transitions the state on-disk and in-memory to 'endState'. + void _transitionState(DonorStateEnum endState); + + // Transitions the state on-disk and in-memory to kError. + void _transitionStateToError(const Status& status); + + // Updates the donor document on-disk and in-memory with the 'replacementDoc.' + void _updateDonorDocument(ReshardingDonorDocument&& replacementDoc); + + // The in-memory representation of the underlying document in + // config.localReshardingOperations.donor. + ReshardingDonorDocument _donorDoc; + + // Each promise below corresponds to a state on the donor state machine. They are listed in + // ascending order, such that the first promise below will be the first promise fulfilled. + SharedPromise<void> _allRecipientsDoneApplying; + + SharedPromise<void> _coordinatorHasCommitted; + + // The id both for the resharding operation and for the primary-only-service instance. + const UUID _id; +}; + +} // namespace mongo diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl index 37a0375501e..f1feb042b36 100644 --- a/src/mongo/s/resharding/common_types.idl +++ b/src/mongo/s/resharding/common_types.idl @@ -58,10 +58,10 @@ enums: values: kUnused: "unused" kInitializing: "initializing" - kDonatingInitialData: "donating-initial-data" - kDonatingOplogEntries: "donating-oplog-entries" + kDonating: "donating" kMirroring: "mirroring" kDropping: "dropping" + kDone: "done" kError: "error" RecipientState: diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h index 9faf72b74e6..c78bdfd2813 100644 --- a/src/mongo/util/uuid.h +++ b/src/mongo/util/uuid.h @@ -69,6 +69,7 @@ class UUID { friend class DonorStartMigration; friend class DonorWaitForMigrationToCommit; friend class DonorForgetMigration; + friend class DonorStateMachine; friend class DatabaseVersion; friend class DbCheckOplogCollection; friend class EncryptionPlaceholder; |