summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2020-08-17 01:34:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-02 06:09:17 +0000
commit6e40b75f137d4d4abebd5d0f3395bd96549ddc6d (patch)
treecdb1fe565547e536a841cc7d16814469f4eb93cf
parent68743ffeff5e4f9906ecd90395c80e6c1df31377 (diff)
downloadmongo-6e40b75f137d4d4abebd5d0f3395bd96549ddc6d.tar.gz
SERVER-49564 Create ReshardingDonorService
-rw-r--r--src/mongo/db/namespace_string.cpp4
-rw-r--r--src/mongo/db/namespace_string.h5
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp143
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h112
-rw-r--r--src/mongo/s/resharding/common_types.idl4
-rw-r--r--src/mongo/util/uuid.h1
7 files changed, 267 insertions, 3 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 29bbda32cdd..9660368c23e 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -87,6 +87,10 @@ const NamespaceString NamespaceString::kRangeDeletionNamespace(NamespaceString::
"rangeDeletions");
const NamespaceString NamespaceString::kConfigReshardingOperationsNamespace(
NamespaceString::kConfigDb, "reshardingOperations");
+
+const NamespaceString NamespaceString::kDonorReshardingOperationsNamespace(
+ NamespaceString::kConfigDb, "localReshardingOperations.donor");
+
const NamespaceString NamespaceString::kConfigSettingsNamespace(NamespaceString::kConfigDb,
"settings");
const NamespaceString NamespaceString::kVectorClockNamespace(NamespaceString::kConfigDb,
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 33c6ea13668..46811e5e6b6 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -118,9 +118,12 @@ public:
// Namespace for pending range deletions.
static const NamespaceString kRangeDeletionNamespace;
- // Namespace for resharding operation state.
+ // Namespace for the config server's resharding operation state.
static const NamespaceString kConfigReshardingOperationsNamespace;
+ // Namespace for the donor shard's local resharding operation state.
+ static const NamespaceString kDonorReshardingOperationsNamespace;
+
// Namespace for balancer settings and default read and write concerns.
static const NamespaceString kConfigSettingsNamespace;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index c00293a975d..83629f46e81 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -69,6 +69,7 @@ env.Library(
'read_only_catalog_cache_loader.cpp',
'resharding/resharding_coordinator_observer.cpp',
'resharding/resharding_coordinator_service.cpp',
+ 'resharding/resharding_donor_service.cpp',
'scoped_operation_completion_sharding_actions.cpp',
'session_catalog_migration_destination.cpp',
'session_catalog_migration_source.cpp',
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
new file mode 100644
index 00000000000..84f176a0b01
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -0,0 +1,143 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
+
+#include "mongo/db/s/resharding/resharding_donor_service.h"
+
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::constructInstance(
+ BSONObj initialState) const {
+ return std::make_shared<DonorStateMachine>(std::move(initialState));
+}
+
+DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc)
+ : repl::PrimaryOnlyService::TypedInstance<Instance>(),
+ _donorDoc(ReshardingDonorDocument::parse(IDLParserErrorContext("ReshardingDonorDocument"),
+ donorDoc)),
+ _id(_donorDoc.getCommonReshardingMetadata().get_id()) {}
+
+SemiFuture<void> DonorStateMachine::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then([this] { _onInitializingCalculateMinFetchTimestampThenBeginDonating(); })
+ .then([this, executor] {
+ return _awaitAllRecipientsDoneApplyingThenStartMirroring(executor);
+ })
+ .then([this, executor] {
+ return _awaitCoordinatorHasCommittedThenTransitionToDropping(executor);
+ })
+ .then([this] { return _dropOriginalCollectionThenDeleteLocalState(); })
+ .onError([this](Status status) {
+ LOGV2(4956400,
+ "Resharding operation donor state machine failed",
+ "namespace"_attr = _donorDoc.getNss().ns(),
+ "reshardingId"_attr = _id,
+ "error"_attr = status);
+ // TODO SERVER-50584 Report errors to the coordinator so that the resharding operation
+ // can be aborted.
+ this->_transitionStateToError(status);
+ return status;
+ })
+ .semi();
+}
+
+void DonorStateMachine::onReshardingFieldsChanges(
+ boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
+
+void DonorStateMachine::_onInitializingCalculateMinFetchTimestampThenBeginDonating() {
+ if (_donorDoc.getState() > DonorStateEnum::kInitializing) {
+ return;
+ }
+
+ // TODO SERVER-50021 Calculate minFetchTimestamp and send to coordinator.
+
+ _transitionState(DonorStateEnum::kDonating);
+}
+
+ExecutorFuture<void> DonorStateMachine::_awaitAllRecipientsDoneApplyingThenStartMirroring(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_donorDoc.getState() > DonorStateEnum::kDonating) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _allRecipientsDoneApplying.getFuture().thenRunOn(**executor).then([this]() {
+ _transitionState(DonorStateEnum::kMirroring);
+ });
+}
+
+ExecutorFuture<void> DonorStateMachine::_awaitCoordinatorHasCommittedThenTransitionToDropping(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_donorDoc.getState() > DonorStateEnum::kMirroring) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _coordinatorHasCommitted.getFuture().thenRunOn(**executor).then([this]() {
+ _transitionState(DonorStateEnum::kDropping);
+ });
+}
+
+void DonorStateMachine::_dropOriginalCollectionThenDeleteLocalState() {
+ if (_donorDoc.getState() > DonorStateEnum::kDropping) {
+ return;
+ }
+
+ _transitionState(DonorStateEnum::kDone);
+}
+
+void DonorStateMachine::_transitionState(DonorStateEnum endState) {
+ ReshardingDonorDocument replacementDoc(_donorDoc);
+ replacementDoc.setState(endState);
+ _updateDonorDocument(std::move(replacementDoc));
+}
+
+void DonorStateMachine::_transitionStateToError(const Status& status) {
+ ReshardingDonorDocument replacementDoc(_donorDoc);
+ replacementDoc.setState(DonorStateEnum::kError);
+ _updateDonorDocument(std::move(replacementDoc));
+}
+
+void DonorStateMachine::_updateDonorDocument(ReshardingDonorDocument&& replacementDoc) {
+ auto opCtx = cc().makeOperationContext();
+ PersistentTaskStore<ReshardingDonorDocument> store(
+ NamespaceString::kDonorReshardingOperationsNamespace);
+ store.update(opCtx.get(),
+ BSON(ReshardingDonorDocument::k_idFieldName << _id),
+ replacementDoc.toBSON(),
+ WriteConcerns::kMajorityWriteConcern);
+
+ _donorDoc = replacementDoc;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
new file mode 100644
index 00000000000..8fac56be8bb
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/s/resharding/donor_document_gen.h"
+#include "mongo/s/resharding/type_collection_fields_gen.h"
+
+namespace mongo {
+constexpr StringData kReshardingDonorServiceName = "ReshardingDonorService"_sd;
+
+class ReshardingDonorService final : public repl::PrimaryOnlyService {
+public:
+ explicit ReshardingDonorService(ServiceContext* serviceContext)
+ : PrimaryOnlyService(serviceContext) {}
+ ~ReshardingDonorService() = default;
+
+ StringData getServiceName() const override {
+ return kReshardingDonorServiceName;
+ }
+
+ NamespaceString getStateDocumentsNS() const override {
+ return NamespaceString::kDonorReshardingOperationsNamespace;
+ }
+
+ ThreadPool::Limits getThreadPoolLimits() const override {
+ // TODO Limit the size of ReshardingDonorService thread pool.
+ return ThreadPool::Limits();
+ }
+
+ std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(
+ BSONObj initialState) const override;
+};
+
+/**
+ * Represents the current state of a resharding donor operation on this shard. This class drives
+ * state transitions and updates to underlying on-disk metadata.
+ */
+class DonorStateMachine final
+ : public repl::PrimaryOnlyService::TypedInstance<ReshardingDonorService::Instance> {
+public:
+ explicit DonorStateMachine(const BSONObj& donorDoc);
+
+ virtual SemiFuture<void> run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override;
+
+ void onReshardingFieldsChanges(
+ boost::optional<TypeCollectionReshardingFields> reshardingFields);
+
+private:
+ // The following functions correspond to the actions to take at a particular donor state.
+ void _onInitializingCalculateMinFetchTimestampThenBeginDonating();
+
+ ExecutorFuture<void> _awaitAllRecipientsDoneApplyingThenStartMirroring(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ ExecutorFuture<void> _awaitCoordinatorHasCommittedThenTransitionToDropping(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ void _dropOriginalCollectionThenDeleteLocalState();
+
+ // Transitions the state on-disk and in-memory to 'endState'.
+ void _transitionState(DonorStateEnum endState);
+
+ // Transitions the state on-disk and in-memory to kError.
+ void _transitionStateToError(const Status& status);
+
+ // Updates the donor document on-disk and in-memory with the 'replacementDoc.'
+ void _updateDonorDocument(ReshardingDonorDocument&& replacementDoc);
+
+ // The in-memory representation of the underlying document in
+ // config.localReshardingOperations.donor.
+ ReshardingDonorDocument _donorDoc;
+
+ // Each promise below corresponds to a state on the donor state machine. They are listed in
+ // ascending order, such that the first promise below will be the first promise fulfilled.
+ SharedPromise<void> _allRecipientsDoneApplying;
+
+ SharedPromise<void> _coordinatorHasCommitted;
+
+ // The id both for the resharding operation and for the primary-only-service instance.
+ const UUID _id;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl
index 37a0375501e..f1feb042b36 100644
--- a/src/mongo/s/resharding/common_types.idl
+++ b/src/mongo/s/resharding/common_types.idl
@@ -58,10 +58,10 @@ enums:
values:
kUnused: "unused"
kInitializing: "initializing"
- kDonatingInitialData: "donating-initial-data"
- kDonatingOplogEntries: "donating-oplog-entries"
+ kDonating: "donating"
kMirroring: "mirroring"
kDropping: "dropping"
+ kDone: "done"
kError: "error"
RecipientState:
diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h
index 9faf72b74e6..c78bdfd2813 100644
--- a/src/mongo/util/uuid.h
+++ b/src/mongo/util/uuid.h
@@ -69,6 +69,7 @@ class UUID {
friend class DonorStartMigration;
friend class DonorWaitForMigrationToCommit;
friend class DonorForgetMigration;
+ friend class DonorStateMachine;
friend class DatabaseVersion;
friend class DbCheckOplogCollection;
friend class EncryptionPlaceholder;