diff options
author | jannaerin <golden.janna@gmail.com> | 2020-08-04 19:24:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-26 15:39:57 +0000 |
commit | a3a33f7d40147f50442292df215063fb36f6f70b (patch) | |
tree | 961855ca3cc28d510221972f881eab4e722dde40 /src/mongo/db/s | |
parent | 96b779115f139a47fe9d6349f7ce0d54140ad25f (diff) | |
download | mongo-a3a33f7d40147f50442292df215063fb36f6f70b.tar.gz |
SERVER-49569 Create ReshardingCoordinatorService
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/coordinator_document.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator.cpp | 92 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator.h | 137 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_observer.cpp | 96 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_observer.h | 163 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.cpp | 286 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.h | 207 |
8 files changed, 758 insertions, 230 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 9e4e1426a83..fe4bc629199 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -68,7 +68,8 @@ env.Library( 'range_deletion_util.cpp', 'read_only_catalog_cache_loader.cpp', 'resharding_util.cpp', - 'resharding/resharding_coordinator.cpp', + 'resharding/resharding_coordinator_observer.cpp', + 'resharding/resharding_coordinator_service.cpp', 'scoped_operation_completion_sharding_actions.cpp', 'session_catalog_migration_destination.cpp', 'session_catalog_migration_source.cpp', diff --git a/src/mongo/db/s/resharding/coordinator_document.idl b/src/mongo/db/s/resharding/coordinator_document.idl index 761e55fb6b2..127d69c779b 100644 --- a/src/mongo/db/s/resharding/coordinator_document.idl +++ b/src/mongo/db/s/resharding/coordinator_document.idl @@ -73,6 +73,10 @@ structs: generate_comparison_operators: false strict: true fields: + tempReshardingNss: + type: namespacestring + description: "The namespace of the temporary resharding collection that exists on + recipient shards." state: CoordinatorState donorShards: type: array<DonorShardEntry> diff --git a/src/mongo/db/s/resharding/resharding_coordinator.cpp b/src/mongo/db/s/resharding/resharding_coordinator.cpp deleted file mode 100644 index 61b586a5294..00000000000 --- a/src/mongo/db/s/resharding/resharding_coordinator.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/s/resharding/resharding_coordinator.h" - -#include "mongo/db/service_context.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/shard_id.h" - -namespace mongo { - -namespace { -const auto getReshardingCoordinator = ServiceContext::declareDecoration<ReshardingCoordinator>(); -} // namespace - -ReshardingCoordinator::ReshardingCoordinator() = default; - -ReshardingCoordinator::~ReshardingCoordinator() { - // TODO SERVER-49569 uncomment line below - // invariant(_reshardingOperationsInProgress.empty()); -} - -ReshardingCoordinator& ReshardingCoordinator::get(ServiceContext* serviceContext) { - return getReshardingCoordinator(serviceContext); -} - -ReshardingCoordinator& ReshardingCoordinator::get(OperationContext* opCtx) { - return get(opCtx->getServiceContext()); -} - -// TODO SERVER-49570 -void ReshardingCoordinator::onNewReshardCollection(const NamespaceString& nss, - const std::vector<ShardId>& donors, - const std::vector<ShardId>& recipients, - const std::vector<ChunkType>& initialChunks) {} - -// TODO SERVER-49572 -void ReshardingCoordinator::onRecipientReportsCreatedCollection(const NamespaceString& nss, - const ShardId& recipient) {} - -// TODO SERVER-49573 -void ReshardingCoordinator::onDonorReportsMinFetchTimestamp(const NamespaceString& nss, - const ShardId& donor, - Timestamp timestamp) {} - -// TODO SERVER-49574 -void ReshardingCoordinator::onRecipientFinishesCloning(const NamespaceString& nss, - const ShardId& recipient) {} - -// TODO SERVER-49575 -void ReshardingCoordinator::onRecipientReportsStrictlyConsistent(const NamespaceString& nss, - const ShardId& recipient) {} - -// TODO SERVER-49576 -void ReshardingCoordinator::onRecipientRenamesCollection(const NamespaceString& nss, - const ShardId& recipient) {} - -// TODO SERVER-49577 -void ReshardingCoordinator::onDonorDropsOriginalCollection(const NamespaceString& nss, - const ShardId& donor) {} - -// TODO SERVER-49578 -void ReshardingCoordinator::onRecipientReportsUnrecoverableError(const NamespaceString& nss, - const ShardId& recipient, - Status error) {} -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator.h b/src/mongo/db/s/resharding/resharding_coordinator.h deleted file mode 100644 index 9de335abf40..00000000000 --- a/src/mongo/db/s/resharding/resharding_coordinator.h +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <vector> - -#include "mongo/base/status.h" -#include "mongo/bson/timestamp.h" -#include "mongo/platform/mutex.h" - -namespace mongo { - -class ChunkType; -class NamespaceString; -class OperationContext; -class ServiceContext; -class ShardId; - -/** - * Manages active resharding operations and holds a catalog of ReshardingCoordinatorStateMachine - * objects indexed by collection namespace. - * - * There is one instance of this object per service context. - */ - -class ReshardingCoordinator { - ReshardingCoordinator(const ReshardingCoordinator&) = delete; - ReshardingCoordinator& operator=(const ReshardingCoordinator&) = delete; - -public: - ReshardingCoordinator(); - ~ReshardingCoordinator(); - - static ReshardingCoordinator& get(ServiceContext* serviceContext); - static ReshardingCoordinator& get(OperationContext* opCtx); - - /** - * Creates a new ReshardingCoordinatorStateMachine for the resharding operation on the - * collection 'nss' and adds it to _reshardingOperationsInProgress. Updates on-disk metadata to - * indicate that the collection is being resharded. - */ - void onNewReshardCollection(const NamespaceString& nss, - const std::vector<ShardId>& donors, - const std::vector<ShardId>& recipients, - const std::vector<ChunkType>& initialChunks); - - /** - * Called when each recipient reports it has finished creating the collection. Triggers a state - * transition and updates on-disk metadata if this recipient is the last to report it has - * finished creating the collection. - */ - void onRecipientReportsCreatedCollection(const NamespaceString& nss, const ShardId& recipient); - - /** - * Called when each donor reports its minFetchTimestamp. If this donor is the last to report its - * minFetchTimestamp, selects the highest timestamp among all donors to be the fetchTimestamp, - * triggers a state change, and updates on-disk metadata. - */ - void onDonorReportsMinFetchTimestamp(const NamespaceString& nss, - const ShardId& donor, - Timestamp timestamp); - - /** - * Called when each recipient finishes cloning and enters steady-state. Triggers a state - * transition and updates on-disk metadata if this recipient is the last to report it has - * finished cloning the collection. - */ - void onRecipientFinishesCloning(const NamespaceString& nss, const ShardId& recipient); - - /** - * Called when a recipient has met the following criteria: - * 1. Has been notified by all donors all writes must be run in transactions and - * 2. Has applied all oplog entries and - * 3. Has entered strict-consistency state - * - * If the recipient is the last to report it is in strict-consistency, commits the resharding - * operation, triggers a state transition, and updates on-disk metadata. - */ - void onRecipientReportsStrictlyConsistent(const NamespaceString& nss, const ShardId& recipient); - - /** - * Called when each recipient finishes renaming the temporary collection. Triggers a state - * transition and updates on-disk metadata if this recipient is the last to report it has - * finished renaming. - */ - void onRecipientRenamesCollection(const NamespaceString& nss, const ShardId& recipient); - - /** - * Called when each donor finishes dropping the original collection. Triggers a state transition - * and updates on-disk metadata if this donor is the last to report it has finished dropping. - */ - void onDonorDropsOriginalCollection(const NamespaceString& nss, const ShardId& donor); - - /** - * Called if a recipient reports an unrecoverable error. - */ - void onRecipientReportsUnrecoverableError(const NamespaceString& nss, - const ShardId& recipient, - Status error); - -private: - // Protects the state below - Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinator::_mutex"); - - // Contains ReshardingCoordinatorStateMachine objects by collection namespace. - // TODO SERVER-49569 uncomment line below - // StringMap<ReshardingCoordinatorStateMachine> _reshardingOperationsInProgress; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp new file mode 100644 index 00000000000..dfa92b132e5 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/resharding/resharding_coordinator_observer.h" + +#include "mongo/db/service_context.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/shard_id.h" + +namespace mongo { + +ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default; + +ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() = default; + +// TODO SERVER-49572 +void ReshardingCoordinatorObserver::onRecipientReportsCreatedCollection(const ShardId& recipient) {} + +// TODO SERVER-49573 +void ReshardingCoordinatorObserver::onDonorReportsMinFetchTimestamp(const ShardId& donor, + Timestamp timestamp) {} + +// TODO SERVER-49574 +void ReshardingCoordinatorObserver::onRecipientFinishesCloning(const ShardId& recipient) {} + +// TODO SERVER-49575 +void ReshardingCoordinatorObserver::onRecipientReportsStrictlyConsistent(const ShardId& recipient) { +} + +// TODO SERVER-49576 +void ReshardingCoordinatorObserver::onRecipientRenamesCollection(const ShardId& recipient) {} + +// TODO SERVER-49577 +void ReshardingCoordinatorObserver::onDonorDropsOriginalCollection(const ShardId& donor) {} + +// TODO SERVER-49578 +void ReshardingCoordinatorObserver::onRecipientReportsUnrecoverableError(const ShardId& recipient, + Status error) {} + +// TODO SERVER-49572 +SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllRecipientsCreatedCollection() { + return _allRecipientsCreatedCollection.getFuture(); +} + +// TODO SERVER-49573 +SharedSemiFuture<Timestamp> ReshardingCoordinatorObserver::awaitAllDonorsReadyToDonate() { + return _allDonorsReportedMinFetchTimestamp.getFuture(); +} + +// TODO SERVER-49574 +SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllRecipientsFinishedCloning() { + return _allRecipientsFinishedCloning.getFuture(); +} + +// TODO SERVER-49575 +SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllRecipientsInStrictConsistency() { + return _allRecipientsReportedStrictConsistencyTimestamp.getFuture(); +} + +// TODO SERVER-49577 +SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllDonorsDroppedOriginalCollection() { + return _allDonorsDroppedOriginalCollection.getFuture(); +} + +// TODO SERVER-49576 +SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() { + return _allRecipientsRenamedCollection.getFuture(); +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h new file mode 100644 index 00000000000..df637cb96c4 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/status.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/s/resharding/coordinator_document_gen.h" +#include "mongo/platform/mutex.h" +#include "mongo/util/future.h" +#include "mongo/util/string_map.h" + +namespace mongo { + +class ChunkType; +class NamespaceString; +class OperationContext; +class ServiceContext; +class ShardId; + +/** + * Observes writes that indicate state changes for a resharding operation. Holds promises that + * the ReshardingCoordinator waits on in order to transition to a new state. + * + * An instance of this object is specific to one resharding operation. + */ + +class ReshardingCoordinatorObserver { +public: + ReshardingCoordinatorObserver(); + ~ReshardingCoordinatorObserver(); + + /** + * Called when each recipient updates its 'state' field to 'initializing' in the + * 'recipientShards' field in a config.reshardingOperations entry. + */ + void onRecipientReportsCreatedCollection(const ShardId& recipient); + + /** + * Called when each donor updates its 'state' field to 'donating' and writes its + * 'minFetchTimestamp' in the 'donorShards' field in a config.reshardingOperations entry. + */ + void onDonorReportsMinFetchTimestamp(const ShardId& donor, Timestamp timestamp); + + /** + * Called when each recipient updates its 'state' field to 'steady-state' in the + * 'recipientShards' field in a config.reshardingOperations entry. + */ + void onRecipientFinishesCloning(const ShardId& recipient); + + /** + * Called when each recipient updates its 'state' field to 'strictly-consistent' and writes its + * 'strictConsistencyTimestamp' in the 'recipientShards' field in a config.reshardingOperations + * entry. + */ + void onRecipientReportsStrictlyConsistent(const ShardId& recipient); + + /** + * Called when each recipient updates its 'state' field to 'done' in the 'recipientShards' + * field in a config.reshardingOperations entry. + */ + void onRecipientRenamesCollection(const ShardId& recipient); + + /** + * Called when each donor updates its 'state' field to 'done' in the 'donorShards' field in + * a config.reshardingOperations entry. + */ + void onDonorDropsOriginalCollection(const ShardId& donor); + + /** + * Called if a recipient reports an unrecoverable error. + */ + void onRecipientReportsUnrecoverableError(const ShardId& recipient, Status error); + + /** + * Fulfills the '_allRecipientsCreatedCollection' promise when the last recipient writes that it + * is in 'initializing' state. + */ + SharedSemiFuture<void> awaitAllRecipientsCreatedCollection(); + + /** + * When the last donor reports its 'minFetchTimestamp', selects the highest 'minFetchTimestamp' + * of all donors to be the 'fetchTimestamp'. Fulfills the '_allDonorsReportedMinFetchTimestamp' + * promise with this 'fetchTimestamp'. + */ + SharedSemiFuture<Timestamp> awaitAllDonorsReadyToDonate(); + + /** + * Fulfills the '_allRecipientsFinishedCloning' promise when the last recipient writes that it + * is in 'steady-state'. + */ + SharedSemiFuture<void> awaitAllRecipientsFinishedCloning(); + + /** + * Fulfills the '_allRecipientsReportedStrictConsistencyTimestamp' promise when the last + * recipient writes that it is in 'strict-consistency' state as well as its + * 'strictConsistencyTimestamp'. + */ + SharedSemiFuture<void> awaitAllRecipientsInStrictConsistency(); + + /** + * Fulfills the '_allDonorsDroppedOriginalCollection' promise when the last donor writes that it + * is in 'done' state. + */ + SharedSemiFuture<void> awaitAllDonorsDroppedOriginalCollection(); + + /** + * Fulfills the '_allRecipientsRenamedCollection' promise when the last recipient writes + * that it is in 'done' state. + */ + SharedSemiFuture<void> awaitAllRecipientsRenamedCollection(); + + +private: + // Protects the state below + Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorObserver::_mutex"); + + /** + * Promises indicating that either all donors or all recipients have entered a specific state. + * The ReshardingCoordinator waits on these in order to transition states. + */ + SharedPromise<void> _allRecipientsCreatedCollection; + + SharedPromise<Timestamp> _allDonorsReportedMinFetchTimestamp; + + SharedPromise<void> _allRecipientsFinishedCloning; + + SharedPromise<void> _allRecipientsReportedStrictConsistencyTimestamp; + + SharedPromise<void> _allDonorsDroppedOriginalCollection; + + SharedPromise<void> _allRecipientsRenamedCollection; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp new file mode 100644 index 00000000000..f21e90ccb9f --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -0,0 +1,286 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +#include "mongo/db/s/resharding/resharding_coordinator_service.h" + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/shard_id.h" +#include "mongo/util/string_map.h" +#include "mongo/util/uuid.h" + +namespace mongo { + +ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const BSONObj& state) + : PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(), + _id(state["_id"].wrap().getOwned()), + _stateDoc(ReshardingCoordinatorDocument::parse( + IDLParserErrorContext("ReshardingCoordinatorStateDoc"), state)) { + _reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>(); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { + ExecutorFuture<void>(**executor) + .then([this, executor] { return _init(executor); }) + .then([this] { _tellAllRecipientsToRefresh(); }) + .then([this, executor] { return _awaitAllRecipientsCreatedCollection(executor); }) + .then([this] { _tellAllDonorsToRefresh(); }) + .then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); }) + .then([this] { _tellAllRecipientsToRefresh(); }) + .then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); }) + .then([this] { _tellAllDonorsToRefresh(); }) + .then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); }) + .then([this] { return _commit(); }) + .then([this] { + if (_stateDoc.getState() > CoordinatorStateEnum::kRenaming) { + return; + } + + this->_runUpdates(CoordinatorStateEnum::kRenaming); + return; + }) + .then([this, executor] { return _awaitAllRecipientsRenamedCollection(executor); }) + .then([this] { _tellAllDonorsToRefresh(); }) + .then([this, executor] { return _awaitAllDonorsDroppedOriginalCollection(executor); }) + .then([this] { _tellAllRecipientsToRefresh(); }) + .then([this] { _tellAllDonorsToRefresh(); }) + .onError([this](Status status) { + _runUpdates(CoordinatorStateEnum::kError); + + LOGV2(4956902, + "Resharding failed", + "namespace"_attr = _stateDoc.getNss().ns(), + "newShardKeyPattern"_attr = _stateDoc.getReshardingKey(), + "error"_attr = status); + + // TODO wait for donors and recipients to abort the operation and clean up state + _tellAllRecipientsToRefresh(); + _tellAllDonorsToRefresh(); + + return status; + }) + .getAsync([this](Status status) { + if (!status.isOK()) { + invariant(_stateDoc.getState() == CoordinatorStateEnum::kError); + _completionPromise.setError(status); + return; + } + + invariant(_stateDoc.getState() == CoordinatorStateEnum::kDone); + _completionPromise.emplaceValue(); + }); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::setInitialChunksAndZones( + std::vector<ChunkType> initialChunks, std::vector<TagsType> newZones) { + if (_stateDoc.getState() > CoordinatorStateEnum::kInitializing || + _initialChunksAndZonesPromise.getFuture().isReady()) { + return; + } + + _initialChunksAndZonesPromise.emplaceValue( + ChunksAndZones{std::move(initialChunks), std::move(newZones)}); +} + +ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::_init( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_stateDoc.getState() > CoordinatorStateEnum::kInitializing) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _initialChunksAndZonesPromise.getFuture() + .thenRunOn(**executor) + .then([this](const ChunksAndZones& initialChunksAndZones) { + // TODO SERVER-50304 Run this insert in a transaction with writes to config.collections, + // config.chunks, and config.tags + auto opCtx = cc().makeOperationContext(); + DBDirectClient client(opCtx.get()); + + const auto commandResponse = client.runCommand([&] { + write_ops::Insert insertOp(NamespaceString::kConfigReshardingOperationsNamespace); + insertOp.setDocuments({_stateDoc.toBSON()}); + return insertOp.serialize({}); + }()); + uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply())); + + invariant(_stateDoc.getState() == CoordinatorStateEnum::kInitializing); + _stateDoc.setState(CoordinatorStateEnum::kInitialized); + }); +} + +ExecutorFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsCreatedCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_stateDoc.getState() > CoordinatorStateEnum::kInitialized) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _reshardingCoordinatorObserver->awaitAllRecipientsCreatedCollection() + .thenRunOn(**executor) + .then([this]() { this->_runUpdates(CoordinatorStateEnum::kPreparingToDonate); }); +} + +ExecutorFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonate( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_stateDoc.getState() > CoordinatorStateEnum::kPreparingToDonate) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate() + .thenRunOn(**executor) + .then([this](Timestamp fetchTimestamp) { + this->_runUpdates(CoordinatorStateEnum::kCloning, fetchTimestamp); + }); +} + +ExecutorFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinishedCloning( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_stateDoc.getState() > CoordinatorStateEnum::kCloning) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning() + .thenRunOn(**executor) + .then([this]() { this->_runUpdates(CoordinatorStateEnum::kMirroring); }); +} + +SharedSemiFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrictConsistency( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_stateDoc.getState() > CoordinatorStateEnum::kMirroring) { + return Status::OK(); + } + + return _reshardingCoordinatorObserver->awaitAllRecipientsInStrictConsistency(); +} + +Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_commit() { + if (_stateDoc.getState() > CoordinatorStateEnum::kMirroring) { + return Status::OK(); + } + + // TODO SERVER-50304 Run this update in a transaction with writes to config.collections, + // config.chunks, and config.tags + this->_runUpdates(CoordinatorStateEnum::kCommitted); + + return Status::OK(); +}; + +ExecutorFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsRenamedCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_stateDoc.getState() > CoordinatorStateEnum::kRenaming) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _reshardingCoordinatorObserver->awaitAllRecipientsRenamedCollection() + .thenRunOn(**executor) + .then([this]() { this->_runUpdates(CoordinatorStateEnum::kDropping); }); +} + +ExecutorFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsDroppedOriginalCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_stateDoc.getState() > CoordinatorStateEnum::kDropping) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection() + .thenRunOn(**executor) + .then([this]() { this->_runUpdates(CoordinatorStateEnum::kDone); }); +} + +// TODO SERVER-50304 Run this write in a transaction with updates to config.collections (and +// the initial chunks to config.chunks and config.tags if nextState is kInitialized) +void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates( + CoordinatorStateEnum nextState, boost::optional<Timestamp> fetchTimestamp) { + // Build new state doc for update + ReshardingCoordinatorDocument updatedStateDoc = _stateDoc; + updatedStateDoc.setState(nextState); + if (fetchTimestamp) { + auto fetchTimestampStruct = updatedStateDoc.getFetchTimestampStruct(); + if (fetchTimestampStruct.getFetchTimestamp()) + invariant(fetchTimestampStruct.getFetchTimestamp().get() == fetchTimestamp.get()); + + fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp)); + } + + // Run update + auto opCtx = cc().makeOperationContext(); + DBDirectClient client(opCtx.get()); + + const auto commandResponse = client.runCommand([&] { + write_ops::Update updateOp(NamespaceString::kConfigReshardingOperationsNamespace); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(_id); + entry.setU(updatedStateDoc.toBSON()); + return entry; + }()}); + return updateOp.serialize( + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + // Throw if the update did not match a document. This means the state doc was removed out from + // under the operation. + uassert(495690, + str::stream() << "Found that the resharding coordinator state document is missing when " + "attempting to update state for namespace " + << _stateDoc.getNss().ns(), + commandReply.getIntField("n") == 1); + + // Update in-memory state doc + _stateDoc = updatedStateDoc; +} + +// TODO +void ReshardingCoordinatorService::ReshardingCoordinator:: + _markCoordinatorStateDocAsGarbageCollectable() {} + +// TODO +void ReshardingCoordinatorService::ReshardingCoordinator::_removeReshardingFields() {} + +// TODO +void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh() {} + +// TODO +void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh() {} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h new file mode 100644 index 00000000000..ad27fa27851 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/s/resharding/coordinator_document_gen.h" +#include "mongo/db/s/resharding/resharding_coordinator_observer.h" +#include "mongo/platform/mutex.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_tags.h" +#include "mongo/s/shard_id.h" +#include "mongo/util/future.h" + +namespace mongo { + +class ServiceContext; +class OperationContext; + +constexpr StringData kReshardingCoordinatorServiceName = "ReshardingCoordinatorService"_sd; + +class ReshardingCoordinatorService final : public repl::PrimaryOnlyService { +public: + explicit ReshardingCoordinatorService(ServiceContext* serviceContext) + : PrimaryOnlyService(serviceContext) {} + ~ReshardingCoordinatorService() = default; + + StringData getServiceName() const override { + return kReshardingCoordinatorServiceName; + } + NamespaceString getStateDocumentsNS() const override { + return NamespaceString::kConfigReshardingOperationsNamespace; + } + + ThreadPool::Limits getThreadPoolLimits() const override { + // TODO Limit the size of ReshardingCoordinatorService thread pool. + return ThreadPool::Limits(); + } + std::shared_ptr<PrimaryOnlyService::Instance> constructInstance( + BSONObj initialState) const override { + return std::make_shared<ReshardingCoordinatorService::ReshardingCoordinator>( + std::move(initialState)); + } + + class ReshardingCoordinator final + : public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> { + public: + explicit ReshardingCoordinator(const BSONObj& state); + + SharedSemiFuture<void> getCompletionPromise() { + return _completionPromise.getFuture(); + } + + void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override; + + void setInitialChunksAndZones(std::vector<ChunkType> initialChunks, + std::vector<TagsType> newZones); + + private: + struct ChunksAndZones { + std::vector<ChunkType> initialChunks; + std::vector<TagsType> newZones; + }; + + /** + * Does the following writes: + * 1. Inserts coordinator state document into config.reshardingOperations + * 2. Adds reshardingFields to the config.collections entry for the original collection + * 3. Inserts an entry into config.collections for the temporary collection + * 4. Inserts entries into config.chunks for ranges based on the new shard key + * 5. Upserts entries into config.tags for any zones associated with the new shard key + * + * Transitions to 'kInitialized'. + */ + ExecutorFuture<void> _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all recipients have created the + * temporary collection. Transitions to 'kPreparingToDonate'. + */ + ExecutorFuture<void> _awaitAllRecipientsCreatedCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all donors have picked a + * minFetchTimestamp and are ready to donate. Transitions to 'kCloning'. + */ + ExecutorFuture<void> _awaitAllDonorsReadyToDonate( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all recipients have finished + * cloning. Transitions to 'kMirroring'. + */ + ExecutorFuture<void> _awaitAllRecipientsFinishedCloning( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all recipients have entered + * strict-consistency. + */ + SharedSemiFuture<void> _awaitAllRecipientsInStrictConsistency( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Does the following writes: + * 1. Updates the config.collections entry for the new sharded collection + * 2. Updates config.chunks entries for the new sharded collection + * 3. Updates config.tags for the new sharded collection + * + * Transitions to 'kCommitted'. + */ + Future<void> _commit(); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all recipients have renamed the + * temporary collection to the original collection namespace. Transitions to 'kDropping'. + */ + ExecutorFuture<void> _awaitAllRecipientsRenamedCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all donors have dropped the + * original collection. Transitions to 'kDone'. + */ + ExecutorFuture<void> _awaitAllDonorsDroppedOriginalCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Updates the entry for this resharding operation in config.reshardingOperations and the + * catalog entries for the original and temporary namespaces in config.collections. + */ + void _runUpdates(CoordinatorStateEnum nextState, + boost::optional<Timestamp> fetchTimestamp = boost::none); + + /** + * Marks the state doc as garbage collectable so that it can be cleaned up by the TTL + * monitor. + */ + void _markCoordinatorStateDocAsGarbageCollectable(); + + /** + * Removes the 'reshardingFields' from the config.collections entry. + */ + void _removeReshardingFields(); + + /** + * Sends 'flushRoutingTableCacheUpdates' for the temporary namespace to all recipient + * shards. + */ + void _tellAllRecipientsToRefresh(); + + /** + * Sends 'flushRoutingTableCacheUpdates' for the original namespace to all donor shards. + */ + void _tellAllDonorsToRefresh(); + + // The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The + // value of this is the UUID that will be used as the collection UUID for the new sharded + // collection. The object looks like: {_id: 'reshardingUUID'} + const InstanceID _id; + + // Promise containing the initial chunks and new zones based on the new shard key. These are + // not a part of the state document, so must be set by configsvrReshardCollection after + // construction. + SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise; + + // Observes writes that indicate state changes for this resharding operation and notifies + // 'this' when all donors/recipients have entered some state so that 'this' can transition + // states. + std::shared_ptr<ReshardingCoordinatorObserver> _reshardingCoordinatorObserver; + + // The updated coordinator state document. + ReshardingCoordinatorDocument _stateDoc; + + // Fulfilled only after transitioned to kDone or kError + SharedPromise<void> _completionPromise; + }; +}; + +} // namespace mongo |