summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-08-04 19:24:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-26 15:39:57 +0000
commita3a33f7d40147f50442292df215063fb36f6f70b (patch)
tree961855ca3cc28d510221972f881eab4e722dde40 /src/mongo/db/s
parent96b779115f139a47fe9d6349f7ce0d54140ad25f (diff)
downloadmongo-a3a33f7d40147f50442292df215063fb36f6f70b.tar.gz
SERVER-49569 Create ReshardingCoordinatorService
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/resharding/coordinator_document.idl4
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator.cpp92
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator.h137
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.cpp96
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.h163
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp286
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h207
8 files changed, 758 insertions, 230 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 9e4e1426a83..fe4bc629199 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -68,7 +68,8 @@ env.Library(
'range_deletion_util.cpp',
'read_only_catalog_cache_loader.cpp',
'resharding_util.cpp',
- 'resharding/resharding_coordinator.cpp',
+ 'resharding/resharding_coordinator_observer.cpp',
+ 'resharding/resharding_coordinator_service.cpp',
'scoped_operation_completion_sharding_actions.cpp',
'session_catalog_migration_destination.cpp',
'session_catalog_migration_source.cpp',
diff --git a/src/mongo/db/s/resharding/coordinator_document.idl b/src/mongo/db/s/resharding/coordinator_document.idl
index 761e55fb6b2..127d69c779b 100644
--- a/src/mongo/db/s/resharding/coordinator_document.idl
+++ b/src/mongo/db/s/resharding/coordinator_document.idl
@@ -73,6 +73,10 @@ structs:
generate_comparison_operators: false
strict: true
fields:
+ tempReshardingNss:
+ type: namespacestring
+ description: "The namespace of the temporary resharding collection that exists on
+ recipient shards."
state: CoordinatorState
donorShards:
type: array<DonorShardEntry>
diff --git a/src/mongo/db/s/resharding/resharding_coordinator.cpp b/src/mongo/db/s/resharding/resharding_coordinator.cpp
deleted file mode 100644
index 61b586a5294..00000000000
--- a/src/mongo/db/s/resharding/resharding_coordinator.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/s/resharding/resharding_coordinator.h"
-
-#include "mongo/db/service_context.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/shard_id.h"
-
-namespace mongo {
-
-namespace {
-const auto getReshardingCoordinator = ServiceContext::declareDecoration<ReshardingCoordinator>();
-} // namespace
-
-ReshardingCoordinator::ReshardingCoordinator() = default;
-
-ReshardingCoordinator::~ReshardingCoordinator() {
- // TODO SERVER-49569 uncomment line below
- // invariant(_reshardingOperationsInProgress.empty());
-}
-
-ReshardingCoordinator& ReshardingCoordinator::get(ServiceContext* serviceContext) {
- return getReshardingCoordinator(serviceContext);
-}
-
-ReshardingCoordinator& ReshardingCoordinator::get(OperationContext* opCtx) {
- return get(opCtx->getServiceContext());
-}
-
-// TODO SERVER-49570
-void ReshardingCoordinator::onNewReshardCollection(const NamespaceString& nss,
- const std::vector<ShardId>& donors,
- const std::vector<ShardId>& recipients,
- const std::vector<ChunkType>& initialChunks) {}
-
-// TODO SERVER-49572
-void ReshardingCoordinator::onRecipientReportsCreatedCollection(const NamespaceString& nss,
- const ShardId& recipient) {}
-
-// TODO SERVER-49573
-void ReshardingCoordinator::onDonorReportsMinFetchTimestamp(const NamespaceString& nss,
- const ShardId& donor,
- Timestamp timestamp) {}
-
-// TODO SERVER-49574
-void ReshardingCoordinator::onRecipientFinishesCloning(const NamespaceString& nss,
- const ShardId& recipient) {}
-
-// TODO SERVER-49575
-void ReshardingCoordinator::onRecipientReportsStrictlyConsistent(const NamespaceString& nss,
- const ShardId& recipient) {}
-
-// TODO SERVER-49576
-void ReshardingCoordinator::onRecipientRenamesCollection(const NamespaceString& nss,
- const ShardId& recipient) {}
-
-// TODO SERVER-49577
-void ReshardingCoordinator::onDonorDropsOriginalCollection(const NamespaceString& nss,
- const ShardId& donor) {}
-
-// TODO SERVER-49578
-void ReshardingCoordinator::onRecipientReportsUnrecoverableError(const NamespaceString& nss,
- const ShardId& recipient,
- Status error) {}
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator.h b/src/mongo/db/s/resharding/resharding_coordinator.h
deleted file mode 100644
index 9de335abf40..00000000000
--- a/src/mongo/db/s/resharding/resharding_coordinator.h
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <vector>
-
-#include "mongo/base/status.h"
-#include "mongo/bson/timestamp.h"
-#include "mongo/platform/mutex.h"
-
-namespace mongo {
-
-class ChunkType;
-class NamespaceString;
-class OperationContext;
-class ServiceContext;
-class ShardId;
-
-/**
- * Manages active resharding operations and holds a catalog of ReshardingCoordinatorStateMachine
- * objects indexed by collection namespace.
- *
- * There is one instance of this object per service context.
- */
-
-class ReshardingCoordinator {
- ReshardingCoordinator(const ReshardingCoordinator&) = delete;
- ReshardingCoordinator& operator=(const ReshardingCoordinator&) = delete;
-
-public:
- ReshardingCoordinator();
- ~ReshardingCoordinator();
-
- static ReshardingCoordinator& get(ServiceContext* serviceContext);
- static ReshardingCoordinator& get(OperationContext* opCtx);
-
- /**
- * Creates a new ReshardingCoordinatorStateMachine for the resharding operation on the
- * collection 'nss' and adds it to _reshardingOperationsInProgress. Updates on-disk metadata to
- * indicate that the collection is being resharded.
- */
- void onNewReshardCollection(const NamespaceString& nss,
- const std::vector<ShardId>& donors,
- const std::vector<ShardId>& recipients,
- const std::vector<ChunkType>& initialChunks);
-
- /**
- * Called when each recipient reports it has finished creating the collection. Triggers a state
- * transition and updates on-disk metadata if this recipient is the last to report it has
- * finished creating the collection.
- */
- void onRecipientReportsCreatedCollection(const NamespaceString& nss, const ShardId& recipient);
-
- /**
- * Called when each donor reports its minFetchTimestamp. If this donor is the last to report its
- * minFetchTimestamp, selects the highest timestamp among all donors to be the fetchTimestamp,
- * triggers a state change, and updates on-disk metadata.
- */
- void onDonorReportsMinFetchTimestamp(const NamespaceString& nss,
- const ShardId& donor,
- Timestamp timestamp);
-
- /**
- * Called when each recipient finishes cloning and enters steady-state. Triggers a state
- * transition and updates on-disk metadata if this recipient is the last to report it has
- * finished cloning the collection.
- */
- void onRecipientFinishesCloning(const NamespaceString& nss, const ShardId& recipient);
-
- /**
- * Called when a recipient has met the following criteria:
- * 1. Has been notified by all donors all writes must be run in transactions and
- * 2. Has applied all oplog entries and
- * 3. Has entered strict-consistency state
- *
- * If the recipient is the last to report it is in strict-consistency, commits the resharding
- * operation, triggers a state transition, and updates on-disk metadata.
- */
- void onRecipientReportsStrictlyConsistent(const NamespaceString& nss, const ShardId& recipient);
-
- /**
- * Called when each recipient finishes renaming the temporary collection. Triggers a state
- * transition and updates on-disk metadata if this recipient is the last to report it has
- * finished renaming.
- */
- void onRecipientRenamesCollection(const NamespaceString& nss, const ShardId& recipient);
-
- /**
- * Called when each donor finishes dropping the original collection. Triggers a state transition
- * and updates on-disk metadata if this donor is the last to report it has finished dropping.
- */
- void onDonorDropsOriginalCollection(const NamespaceString& nss, const ShardId& donor);
-
- /**
- * Called if a recipient reports an unrecoverable error.
- */
- void onRecipientReportsUnrecoverableError(const NamespaceString& nss,
- const ShardId& recipient,
- Status error);
-
-private:
- // Protects the state below
- Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinator::_mutex");
-
- // Contains ReshardingCoordinatorStateMachine objects by collection namespace.
- // TODO SERVER-49569 uncomment line below
- // StringMap<ReshardingCoordinatorStateMachine> _reshardingOperationsInProgress;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
new file mode 100644
index 00000000000..dfa92b132e5
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/resharding/resharding_coordinator_observer.h"
+
+#include "mongo/db/service_context.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+
+ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default;
+
+ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() = default;
+
+// TODO SERVER-49572
+void ReshardingCoordinatorObserver::onRecipientReportsCreatedCollection(const ShardId& recipient) {}
+
+// TODO SERVER-49573
+void ReshardingCoordinatorObserver::onDonorReportsMinFetchTimestamp(const ShardId& donor,
+ Timestamp timestamp) {}
+
+// TODO SERVER-49574
+void ReshardingCoordinatorObserver::onRecipientFinishesCloning(const ShardId& recipient) {}
+
+// TODO SERVER-49575
+void ReshardingCoordinatorObserver::onRecipientReportsStrictlyConsistent(const ShardId& recipient) {
+}
+
+// TODO SERVER-49576
+void ReshardingCoordinatorObserver::onRecipientRenamesCollection(const ShardId& recipient) {}
+
+// TODO SERVER-49577
+void ReshardingCoordinatorObserver::onDonorDropsOriginalCollection(const ShardId& donor) {}
+
+// TODO SERVER-49578
+void ReshardingCoordinatorObserver::onRecipientReportsUnrecoverableError(const ShardId& recipient,
+ Status error) {}
+
+// TODO SERVER-49572
+SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllRecipientsCreatedCollection() {
+ return _allRecipientsCreatedCollection.getFuture();
+}
+
+// TODO SERVER-49573
+SharedSemiFuture<Timestamp> ReshardingCoordinatorObserver::awaitAllDonorsReadyToDonate() {
+ return _allDonorsReportedMinFetchTimestamp.getFuture();
+}
+
+// TODO SERVER-49574
+SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllRecipientsFinishedCloning() {
+ return _allRecipientsFinishedCloning.getFuture();
+}
+
+// TODO SERVER-49575
+SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllRecipientsInStrictConsistency() {
+ return _allRecipientsReportedStrictConsistencyTimestamp.getFuture();
+}
+
+// TODO SERVER-49577
+SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllDonorsDroppedOriginalCollection() {
+ return _allDonorsDroppedOriginalCollection.getFuture();
+}
+
+// TODO SERVER-49576
+SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() {
+ return _allRecipientsRenamedCollection.getFuture();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
new file mode 100644
index 00000000000..df637cb96c4
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/status.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/s/resharding/coordinator_document_gen.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/future.h"
+#include "mongo/util/string_map.h"
+
+namespace mongo {
+
+class ChunkType;
+class NamespaceString;
+class OperationContext;
+class ServiceContext;
+class ShardId;
+
+/**
+ * Observes writes that indicate state changes for a resharding operation. Holds promises that
+ * the ReshardingCoordinator waits on in order to transition to a new state.
+ *
+ * An instance of this object is specific to one resharding operation.
+ */
+
+class ReshardingCoordinatorObserver {
+public:
+ ReshardingCoordinatorObserver();
+ ~ReshardingCoordinatorObserver();
+
+ /**
+ * Called when each recipient updates its 'state' field to 'initializing' in the
+ * 'recipientShards' field in a config.reshardingOperations entry.
+ */
+ void onRecipientReportsCreatedCollection(const ShardId& recipient);
+
+ /**
+ * Called when each donor updates its 'state' field to 'donating' and writes its
+ * 'minFetchTimestamp' in the 'donorShards' field in a config.reshardingOperations entry.
+ */
+ void onDonorReportsMinFetchTimestamp(const ShardId& donor, Timestamp timestamp);
+
+ /**
+ * Called when each recipient updates its 'state' field to 'steady-state' in the
+ * 'recipientShards' field in a config.reshardingOperations entry.
+ */
+ void onRecipientFinishesCloning(const ShardId& recipient);
+
+ /**
+ * Called when each recipient updates its 'state' field to 'strictly-consistent' and writes its
+ * 'strictConsistencyTimestamp' in the 'recipientShards' field in a config.reshardingOperations
+ * entry.
+ */
+ void onRecipientReportsStrictlyConsistent(const ShardId& recipient);
+
+ /**
+ * Called when each recipient updates its 'state' field to 'done' in the 'recipientShards'
+ * field in a config.reshardingOperations entry.
+ */
+ void onRecipientRenamesCollection(const ShardId& recipient);
+
+ /**
+ * Called when each donor updates its 'state' field to 'done' in the 'donorShards' field in
+ * a config.reshardingOperations entry.
+ */
+ void onDonorDropsOriginalCollection(const ShardId& donor);
+
+ /**
+ * Called if a recipient reports an unrecoverable error.
+ */
+ void onRecipientReportsUnrecoverableError(const ShardId& recipient, Status error);
+
+ /**
+ * Fulfills the '_allRecipientsCreatedCollection' promise when the last recipient writes that it
+ * is in 'initializing' state.
+ */
+ SharedSemiFuture<void> awaitAllRecipientsCreatedCollection();
+
+ /**
+ * When the last donor reports its 'minFetchTimestamp', selects the highest 'minFetchTimestamp'
+ * of all donors to be the 'fetchTimestamp'. Fulfills the '_allDonorsReportedMinFetchTimestamp'
+ * promise with this 'fetchTimestamp'.
+ */
+ SharedSemiFuture<Timestamp> awaitAllDonorsReadyToDonate();
+
+ /**
+ * Fulfills the '_allRecipientsFinishedCloning' promise when the last recipient writes that it
+ * is in 'steady-state'.
+ */
+ SharedSemiFuture<void> awaitAllRecipientsFinishedCloning();
+
+ /**
+ * Fulfills the '_allRecipientsReportedStrictConsistencyTimestamp' promise when the last
+ * recipient writes that it is in 'strict-consistency' state as well as its
+ * 'strictConsistencyTimestamp'.
+ */
+ SharedSemiFuture<void> awaitAllRecipientsInStrictConsistency();
+
+ /**
+ * Fulfills the '_allDonorsDroppedOriginalCollection' promise when the last donor writes that it
+ * is in 'done' state.
+ */
+ SharedSemiFuture<void> awaitAllDonorsDroppedOriginalCollection();
+
+ /**
+ * Fulfills the '_allRecipientsRenamedCollection' promise when the last recipient writes
+ * that it is in 'done' state.
+ */
+ SharedSemiFuture<void> awaitAllRecipientsRenamedCollection();
+
+
+private:
+ // Protects the state below
+ Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorObserver::_mutex");
+
+ /**
+ * Promises indicating that either all donors or all recipients have entered a specific state.
+ * The ReshardingCoordinator waits on these in order to transition states.
+ */
+ SharedPromise<void> _allRecipientsCreatedCollection;
+
+ SharedPromise<Timestamp> _allDonorsReportedMinFetchTimestamp;
+
+ SharedPromise<void> _allRecipientsFinishedCloning;
+
+ SharedPromise<void> _allRecipientsReportedStrictConsistencyTimestamp;
+
+ SharedPromise<void> _allDonorsDroppedOriginalCollection;
+
+ SharedPromise<void> _allRecipientsRenamedCollection;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
new file mode 100644
index 00000000000..f21e90ccb9f
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -0,0 +1,286 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
+
+#include "mongo/db/s/resharding/resharding_coordinator_service.h"
+
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/logv2/log.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/util/string_map.h"
+#include "mongo/util/uuid.h"
+
+namespace mongo {
+
+ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const BSONObj& state)
+ : PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(),
+ _id(state["_id"].wrap().getOwned()),
+ _stateDoc(ReshardingCoordinatorDocument::parse(
+ IDLParserErrorContext("ReshardingCoordinatorStateDoc"), state)) {
+ _reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>();
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
+ ExecutorFuture<void>(**executor)
+ .then([this, executor] { return _init(executor); })
+ .then([this] { _tellAllRecipientsToRefresh(); })
+ .then([this, executor] { return _awaitAllRecipientsCreatedCollection(executor); })
+ .then([this] { _tellAllDonorsToRefresh(); })
+ .then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); })
+ .then([this] { _tellAllRecipientsToRefresh(); })
+ .then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); })
+ .then([this] { _tellAllDonorsToRefresh(); })
+ .then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); })
+ .then([this] { return _commit(); })
+ .then([this] {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kRenaming) {
+ return;
+ }
+
+ this->_runUpdates(CoordinatorStateEnum::kRenaming);
+ return;
+ })
+ .then([this, executor] { return _awaitAllRecipientsRenamedCollection(executor); })
+ .then([this] { _tellAllDonorsToRefresh(); })
+ .then([this, executor] { return _awaitAllDonorsDroppedOriginalCollection(executor); })
+ .then([this] { _tellAllRecipientsToRefresh(); })
+ .then([this] { _tellAllDonorsToRefresh(); })
+ .onError([this](Status status) {
+ _runUpdates(CoordinatorStateEnum::kError);
+
+ LOGV2(4956902,
+ "Resharding failed",
+ "namespace"_attr = _stateDoc.getNss().ns(),
+ "newShardKeyPattern"_attr = _stateDoc.getReshardingKey(),
+ "error"_attr = status);
+
+ // TODO wait for donors and recipients to abort the operation and clean up state
+ _tellAllRecipientsToRefresh();
+ _tellAllDonorsToRefresh();
+
+ return status;
+ })
+ .getAsync([this](Status status) {
+ if (!status.isOK()) {
+ invariant(_stateDoc.getState() == CoordinatorStateEnum::kError);
+ _completionPromise.setError(status);
+ return;
+ }
+
+ invariant(_stateDoc.getState() == CoordinatorStateEnum::kDone);
+ _completionPromise.emplaceValue();
+ });
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::setInitialChunksAndZones(
+ std::vector<ChunkType> initialChunks, std::vector<TagsType> newZones) {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kInitializing ||
+ _initialChunksAndZonesPromise.getFuture().isReady()) {
+ return;
+ }
+
+ _initialChunksAndZonesPromise.emplaceValue(
+ ChunksAndZones{std::move(initialChunks), std::move(newZones)});
+}
+
+ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::_init(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kInitializing) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _initialChunksAndZonesPromise.getFuture()
+ .thenRunOn(**executor)
+ .then([this](const ChunksAndZones& initialChunksAndZones) {
+ // TODO SERVER-50304 Run this insert in a transaction with writes to config.collections,
+ // config.chunks, and config.tags
+ auto opCtx = cc().makeOperationContext();
+ DBDirectClient client(opCtx.get());
+
+ const auto commandResponse = client.runCommand([&] {
+ write_ops::Insert insertOp(NamespaceString::kConfigReshardingOperationsNamespace);
+ insertOp.setDocuments({_stateDoc.toBSON()});
+ return insertOp.serialize({});
+ }());
+ uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply()));
+
+ invariant(_stateDoc.getState() == CoordinatorStateEnum::kInitializing);
+ _stateDoc.setState(CoordinatorStateEnum::kInitialized);
+ });
+}
+
+ExecutorFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsCreatedCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kInitialized) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _reshardingCoordinatorObserver->awaitAllRecipientsCreatedCollection()
+ .thenRunOn(**executor)
+ .then([this]() { this->_runUpdates(CoordinatorStateEnum::kPreparingToDonate); });
+}
+
+ExecutorFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonate(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kPreparingToDonate) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate()
+ .thenRunOn(**executor)
+ .then([this](Timestamp fetchTimestamp) {
+ this->_runUpdates(CoordinatorStateEnum::kCloning, fetchTimestamp);
+ });
+}
+
+ExecutorFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinishedCloning(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kCloning) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning()
+ .thenRunOn(**executor)
+ .then([this]() { this->_runUpdates(CoordinatorStateEnum::kMirroring); });
+}
+
+SharedSemiFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrictConsistency(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kMirroring) {
+ return Status::OK();
+ }
+
+ return _reshardingCoordinatorObserver->awaitAllRecipientsInStrictConsistency();
+}
+
+Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_commit() {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kMirroring) {
+ return Status::OK();
+ }
+
+ // TODO SERVER-50304 Run this update in a transaction with writes to config.collections,
+ // config.chunks, and config.tags
+ this->_runUpdates(CoordinatorStateEnum::kCommitted);
+
+ return Status::OK();
+};
+
+ExecutorFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsRenamedCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kRenaming) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _reshardingCoordinatorObserver->awaitAllRecipientsRenamedCollection()
+ .thenRunOn(**executor)
+ .then([this]() { this->_runUpdates(CoordinatorStateEnum::kDropping); });
+}
+
+ExecutorFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsDroppedOriginalCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_stateDoc.getState() > CoordinatorStateEnum::kDropping) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection()
+ .thenRunOn(**executor)
+ .then([this]() { this->_runUpdates(CoordinatorStateEnum::kDone); });
+}
+
+// TODO SERVER-50304 Run this write in a transaction with updates to config.collections (and
+// the initial chunks to config.chunks and config.tags if nextState is kInitialized)
+void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates(
+ CoordinatorStateEnum nextState, boost::optional<Timestamp> fetchTimestamp) {
+ // Build new state doc for update
+ ReshardingCoordinatorDocument updatedStateDoc = _stateDoc;
+ updatedStateDoc.setState(nextState);
+ if (fetchTimestamp) {
+ auto fetchTimestampStruct = updatedStateDoc.getFetchTimestampStruct();
+ if (fetchTimestampStruct.getFetchTimestamp())
+ invariant(fetchTimestampStruct.getFetchTimestamp().get() == fetchTimestamp.get());
+
+ fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp));
+ }
+
+ // Run update
+ auto opCtx = cc().makeOperationContext();
+ DBDirectClient client(opCtx.get());
+
+ const auto commandResponse = client.runCommand([&] {
+ write_ops::Update updateOp(NamespaceString::kConfigReshardingOperationsNamespace);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(_id);
+ entry.setU(updatedStateDoc.toBSON());
+ return entry;
+ }()});
+ return updateOp.serialize(
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ // Throw if the update did not match a document. This means the state doc was removed out from
+ // under the operation.
+ uassert(495690,
+ str::stream() << "Found that the resharding coordinator state document is missing when "
+ "attempting to update state for namespace "
+ << _stateDoc.getNss().ns(),
+ commandReply.getIntField("n") == 1);
+
+ // Update in-memory state doc
+ _stateDoc = updatedStateDoc;
+}
+
+// TODO
+void ReshardingCoordinatorService::ReshardingCoordinator::
+ _markCoordinatorStateDocAsGarbageCollectable() {}
+
+// TODO
+void ReshardingCoordinatorService::ReshardingCoordinator::_removeReshardingFields() {}
+
+// TODO
+void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh() {}
+
+// TODO
+void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh() {}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
new file mode 100644
index 00000000000..ad27fa27851
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -0,0 +1,207 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/s/resharding/coordinator_document_gen.h"
+#include "mongo/db/s/resharding/resharding_coordinator_observer.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_tags.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+class ServiceContext;
+class OperationContext;
+
+constexpr StringData kReshardingCoordinatorServiceName = "ReshardingCoordinatorService"_sd;
+
+class ReshardingCoordinatorService final : public repl::PrimaryOnlyService {
+public:
+ explicit ReshardingCoordinatorService(ServiceContext* serviceContext)
+ : PrimaryOnlyService(serviceContext) {}
+ ~ReshardingCoordinatorService() = default;
+
+ StringData getServiceName() const override {
+ return kReshardingCoordinatorServiceName;
+ }
+ NamespaceString getStateDocumentsNS() const override {
+ return NamespaceString::kConfigReshardingOperationsNamespace;
+ }
+
+ ThreadPool::Limits getThreadPoolLimits() const override {
+ // TODO Limit the size of ReshardingCoordinatorService thread pool.
+ return ThreadPool::Limits();
+ }
+ std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(
+ BSONObj initialState) const override {
+ return std::make_shared<ReshardingCoordinatorService::ReshardingCoordinator>(
+ std::move(initialState));
+ }
+
+ class ReshardingCoordinator final
+ : public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> {
+ public:
+ explicit ReshardingCoordinator(const BSONObj& state);
+
+ SharedSemiFuture<void> getCompletionPromise() {
+ return _completionPromise.getFuture();
+ }
+
+ void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override;
+
+ void setInitialChunksAndZones(std::vector<ChunkType> initialChunks,
+ std::vector<TagsType> newZones);
+
+ private:
+ struct ChunksAndZones {
+ std::vector<ChunkType> initialChunks;
+ std::vector<TagsType> newZones;
+ };
+
+ /**
+ * Does the following writes:
+ * 1. Inserts coordinator state document into config.reshardingOperations
+ * 2. Adds reshardingFields to the config.collections entry for the original collection
+ * 3. Inserts an entry into config.collections for the temporary collection
+ * 4. Inserts entries into config.chunks for ranges based on the new shard key
+ * 5. Upserts entries into config.tags for any zones associated with the new shard key
+ *
+ * Transitions to 'kInitialized'.
+ */
+ ExecutorFuture<void> _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all recipients have created the
+ * temporary collection. Transitions to 'kPreparingToDonate'.
+ */
+ ExecutorFuture<void> _awaitAllRecipientsCreatedCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all donors have picked a
+ * minFetchTimestamp and are ready to donate. Transitions to 'kCloning'.
+ */
+ ExecutorFuture<void> _awaitAllDonorsReadyToDonate(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all recipients have finished
+ * cloning. Transitions to 'kMirroring'.
+ */
+ ExecutorFuture<void> _awaitAllRecipientsFinishedCloning(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all recipients have entered
+ * strict-consistency.
+ */
+ SharedSemiFuture<void> _awaitAllRecipientsInStrictConsistency(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Does the following writes:
+ * 1. Updates the config.collections entry for the new sharded collection
+ * 2. Updates config.chunks entries for the new sharded collection
+ * 3. Updates config.tags for the new sharded collection
+ *
+ * Transitions to 'kCommitted'.
+ */
+ Future<void> _commit();
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all recipients have renamed the
+ * temporary collection to the original collection namespace. Transitions to 'kDropping'.
+ */
+ ExecutorFuture<void> _awaitAllRecipientsRenamedCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all donors have dropped the
+ * original collection. Transitions to 'kDone'.
+ */
+ ExecutorFuture<void> _awaitAllDonorsDroppedOriginalCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Updates the entry for this resharding operation in config.reshardingOperations and the
+ * catalog entries for the original and temporary namespaces in config.collections.
+ */
+ void _runUpdates(CoordinatorStateEnum nextState,
+ boost::optional<Timestamp> fetchTimestamp = boost::none);
+
+ /**
+ * Marks the state doc as garbage collectable so that it can be cleaned up by the TTL
+ * monitor.
+ */
+ void _markCoordinatorStateDocAsGarbageCollectable();
+
+ /**
+ * Removes the 'reshardingFields' from the config.collections entry.
+ */
+ void _removeReshardingFields();
+
+ /**
+ * Sends 'flushRoutingTableCacheUpdates' for the temporary namespace to all recipient
+ * shards.
+ */
+ void _tellAllRecipientsToRefresh();
+
+ /**
+ * Sends 'flushRoutingTableCacheUpdates' for the original namespace to all donor shards.
+ */
+ void _tellAllDonorsToRefresh();
+
+ // The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The
+ // value of this is the UUID that will be used as the collection UUID for the new sharded
+ // collection. The object looks like: {_id: 'reshardingUUID'}
+ const InstanceID _id;
+
+ // Promise containing the initial chunks and new zones based on the new shard key. These are
+ // not a part of the state document, so must be set by configsvrReshardCollection after
+ // construction.
+ SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise;
+
+ // Observes writes that indicate state changes for this resharding operation and notifies
+ // 'this' when all donors/recipients have entered some state so that 'this' can transition
+ // states.
+ std::shared_ptr<ReshardingCoordinatorObserver> _reshardingCoordinatorObserver;
+
+ // The updated coordinator state document.
+ ReshardingCoordinatorDocument _stateDoc;
+
+ // Fulfilled only after transitioned to kDone or kError
+ SharedPromise<void> _completionPromise;
+ };
+};
+
+} // namespace mongo