diff options
19 files changed, 1594 insertions, 15 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 337cdcf5d54..681aa0d2e16 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -188,6 +188,9 @@ const NamespaceString NamespaceString::kDistLocksNamepsace(NamespaceString::kCon const NamespaceString NamespaceString::kSetChangeStreamStateCoordinatorNamespace( NamespaceString::kConfigDb, "change_stream_coordinator"); +const NamespaceString NamespaceString::kGlobalIndexClonerNamespace( + NamespaceString::kConfigDb, "localGlobalIndexOperations.cloner"); + NamespaceString NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(StringData ns) { if (!gMultitenancySupport) { return NamespaceString(ns, boost::none); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 4d7cbfa6dab..5045d925c92 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -258,6 +258,9 @@ public: // Namespace used to store the state document of 'SetChangeStreamStateCoordinator'. static const NamespaceString kSetChangeStreamStateCoordinatorNamespace; + // Namespace used for storing global index cloner state documents. + static const NamespaceString kGlobalIndexClonerNamespace; + /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 3cccac32d2d..c2b6ea19b70 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -58,8 +58,14 @@ env.Library( 'commit_chunk_migration.idl', 'config_server_op_observer.cpp', 'global_index/global_index_cloner_fetcher.cpp', + 'global_index/global_index_cloner_fetcher_factory.cpp', + 'global_index/global_index_cloner.idl', + 'global_index/global_index_cloning_external_state.cpp', + 'global_index/global_index_cloning_service.cpp', 'global_index/global_index_entry.idl', 'global_index/global_index_inserter.cpp', + 'global_index/global_index_server_parameters.idl', + 'global_index/global_index_util.cpp', 'global_index_cumulative_metrics_field_name_provider.cpp', 'global_index_cumulative_metrics.cpp', 'global_index_metrics.cpp', @@ -596,6 +602,7 @@ env.CppUnitTest( 'collection_sharding_runtime_test.cpp', 'database_sharding_state_test.cpp', 'global_index/global_index_cloner_fetcher_test.cpp', + 'global_index/global_index_cloning_service_test.cpp', 'global_index/global_index_inserter_test.cpp', 'global_index_metrics_test.cpp', 'global_index_cumulative_metrics_test.cpp', @@ -663,6 +670,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/catalog_test_fixture', '$BUILD_DIR/mongo/db/catalog/database_holder', + '$BUILD_DIR/mongo/db/commands/list_collections_filter', '$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util', '$BUILD_DIR/mongo/db/keys_collection_client_direct', '$BUILD_DIR/mongo/db/op_observer/op_observer', @@ -674,6 +682,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/query/query_request', '$BUILD_DIR/mongo/db/query/query_test_service_context', '$BUILD_DIR/mongo/db/query_expressions', + '$BUILD_DIR/mongo/db/read_write_concern_defaults_mock', '$BUILD_DIR/mongo/db/repl/image_collection_entry', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', '$BUILD_DIR/mongo/db/repl/oplog_application', diff --git a/src/mongo/db/s/global_index/global_index_cloner.idl b/src/mongo/db/s/global_index/global_index_cloner.idl new file mode 100644 index 00000000000..81ab433f171 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_cloner.idl @@ -0,0 +1,74 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file contains the schema for the document representing a global index cloner primary only +# service state document. + +global: + cpp_namespace: "mongo::global_index" + +imports: + - "mongo/db/basic_types.idl" + +enums: + GlobalIndexClonerState: + description: "The current state of a global index cloner." + type: string + values: + kUnused: "unused" + kCloning: "cloning" + +structs: + GlobalIndexClonerDoc: + description: "Document containing the state and specs of global index cloning operation." + strict: false + fields: + _id: + type: uuid + cpp_name: indexCollectionUUID + description: "The uuid of the global index collection." + nss: + type: namespacestring + description: "The namespace of the source collection." + collectionUUID: + type: uuid + description: "The uuid of source collection." + indexName: + type: string + description: "The name of the global index being created." + indexSpec: + type: object_owned + description: "Temporary placeholder for index spec until global index api is + available." + minFetchTimestamp: + type: timestamp + description: "The minimum timestamp to use for fetching documents from the source + collection" + state: + type: GlobalIndexClonerState + description: "The current state of cloner." diff --git a/src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp b/src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp index b67dfc4c717..e8421758f8f 100644 --- a/src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp +++ b/src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp @@ -156,6 +156,10 @@ GlobalIndexClonerFetcher::GlobalIndexClonerFetcher(NamespaceString nss, boost::optional<GlobalIndexClonerFetcher::FetchedEntry> GlobalIndexClonerFetcher::getNext( OperationContext* opCtx) { + if (!_pipeline) { + _pipeline = _restartPipeline(opCtx); + } + _pipeline->reattachToOperationContext(opCtx); ON_BLOCK_EXIT([this] { _pipeline->detachFromOperationContext(); }); @@ -164,10 +168,6 @@ boost::optional<GlobalIndexClonerFetcher::FetchedEntry> GlobalIndexClonerFetcher _pipeline.reset(); }); - if (!_pipeline) { - _pipeline = _restartPipeline(opCtx); - } - auto next = _pipeline->getNext(); guard.dismiss(); diff --git a/src/mongo/db/s/global_index/global_index_cloner_fetcher.h b/src/mongo/db/s/global_index/global_index_cloner_fetcher.h index 192cd4988a3..9c5cebc81be 100644 --- a/src/mongo/db/s/global_index/global_index_cloner_fetcher.h +++ b/src/mongo/db/s/global_index/global_index_cloner_fetcher.h @@ -39,17 +39,31 @@ namespace mongo { namespace global_index { -/** - * Responsible for fetching documents to clone for a particular shard. - */ -class GlobalIndexClonerFetcher { +class GlobalIndexClonerFetcherInterface { public: struct FetchedEntry { public: + FetchedEntry(BSONObj _documentKey, BSONObj _indexKeyValues) + : documentKey(_documentKey.getOwned()), indexKeyValues(_indexKeyValues.getOwned()) {} + BSONObj documentKey; BSONObj indexKeyValues; }; + virtual ~GlobalIndexClonerFetcherInterface() {} + + /** + * Returns the next document to clone. + * Returns boost::none if there are no documents left. + */ + virtual boost::optional<FetchedEntry> getNext(OperationContext* opCtx) = 0; +}; + +/** + * Responsible for fetching documents to clone for a particular shard. + */ +class GlobalIndexClonerFetcher : public GlobalIndexClonerFetcherInterface { +public: GlobalIndexClonerFetcher(NamespaceString nss, UUID collUUId, UUID indexUUID, @@ -58,7 +72,7 @@ public: KeyPattern sourceShardKeyPattern, KeyPattern globalIndexPattern); - boost::optional<FetchedEntry> getNext(OperationContext* opCtx); + boost::optional<FetchedEntry> getNext(OperationContext* opCtx) override; /** * Builds the aggregation pipeline for fetching the documents diff --git a/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.cpp b/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.cpp new file mode 100644 index 00000000000..9d8a87847d6 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.cpp @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/global_index/global_index_cloner_fetcher_factory.h" + +namespace mongo { +namespace global_index { + +std::unique_ptr<GlobalIndexClonerFetcherInterface> GlobalIndexClonerFetcherFactory::make( + NamespaceString nss, + UUID collUUID, + UUID indexUUID, + ShardId myShardId, + Timestamp minFetchTimestamp, + KeyPattern sourceShardKeyPattern, + KeyPattern globalIndexPattern) { + return std::make_unique<GlobalIndexClonerFetcher>(std::move(nss), + std::move(collUUID), + std::move(indexUUID), + std::move(myShardId), + std::move(minFetchTimestamp), + std::move(sourceShardKeyPattern), + std::move(globalIndexPattern)); +} + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.h b/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.h new file mode 100644 index 00000000000..46c4422dd2b --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/s/global_index/global_index_cloner_fetcher.h" + +namespace mongo { +namespace global_index { + +class GlobalIndexClonerFetcherFactoryInterface { +public: + virtual ~GlobalIndexClonerFetcherFactoryInterface() {} + + virtual std::unique_ptr<GlobalIndexClonerFetcherInterface> make( + NamespaceString nss, + UUID collUUID, + UUID indexUUID, + ShardId myShardId, + Timestamp minFetchTimestamp, + KeyPattern sourceShardKeyPattern, + KeyPattern globalIndexPattern) = 0; +}; + +class GlobalIndexClonerFetcherFactory : public GlobalIndexClonerFetcherFactoryInterface { +public: + std::unique_ptr<GlobalIndexClonerFetcherInterface> make(NamespaceString nss, + UUID collUUID, + UUID indexUUID, + ShardId myShardId, + Timestamp minFetchTimestamp, + KeyPattern sourceShardKeyPattern, + KeyPattern globalIndexPattern) override; +}; + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_cloning_external_state.cpp b/src/mongo/db/s/global_index/global_index_cloning_external_state.cpp new file mode 100644 index 00000000000..7473479e716 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_cloning_external_state.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/global_index/global_index_cloning_external_state.h" + +#include "mongo/db/s/sharding_state.h" +#include "mongo/s/grid.h" + +namespace mongo { +namespace global_index { + +ShardId GlobalIndexCloningStateImpl::myShardId(ServiceContext* service) const { + return ShardingState::get(service)->shardId(); +} + +ChunkManager GlobalIndexCloningStateImpl::getShardedCollectionRoutingInfo( + OperationContext* opCtx, const NamespaceString& nss) const { + auto catalogCache = Grid::get(opCtx)->catalogCache(); + return catalogCache->getShardedCollectionRoutingInfo(opCtx, nss); +} + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_cloning_external_state.h b/src/mongo/db/s/global_index/global_index_cloning_external_state.h new file mode 100644 index 00000000000..ef039841a9c --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_cloning_external_state.h @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/s/global_index/global_index_cloning_service.h" + +#include "mongo/s/chunk_manager.h" + +namespace mongo { +namespace global_index { + +/** + * Interface for using functionality from other modules (mostly sharding) to allow mocking in tests. + */ +class GlobalIndexCloningService::CloningExternalState { +public: + virtual ~CloningExternalState() = default; + + virtual ShardId myShardId(ServiceContext* service) const = 0; + + virtual ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) const = 0; +}; + +class GlobalIndexCloningStateImpl : public GlobalIndexCloningService::CloningExternalState { +public: + ShardId myShardId(ServiceContext* service) const override; + + ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) const override; +}; + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.cpp b/src/mongo/db/s/global_index/global_index_cloning_service.cpp new file mode 100644 index 00000000000..0002d7ab032 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_cloning_service.cpp @@ -0,0 +1,393 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/global_index/global_index_cloning_service.h" + +#include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/index/index_descriptor.h" +#include "mongo/db/ops/delete.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/s/global_index/global_index_cloning_external_state.h" +#include "mongo/db/s/global_index/global_index_server_parameters_gen.h" +#include "mongo/db/s/global_index/global_index_util.h" +#include "mongo/db/s/resharding/resharding_data_copy_util.h" +#include "mongo/logv2/log.h" +#include "mongo/util/future_util.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kGlobalIndex + +namespace mongo { +namespace global_index { + +namespace { + +const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}; + +} // namespace + +GlobalIndexCloningService::GlobalIndexCloningService(ServiceContext* serviceContext) + : PrimaryOnlyService(serviceContext), _serviceContext(serviceContext) {} + +ThreadPool::Limits GlobalIndexCloningService::getThreadPoolLimits() const { + ThreadPool::Limits threadPoolLimit; + threadPoolLimit.maxThreads = gGlobalIndexClonerServiceMaxThreadCount; + return threadPoolLimit; +} + +std::shared_ptr<repl::PrimaryOnlyService::Instance> GlobalIndexCloningService::constructInstance( + BSONObj initialState) { + return std::make_shared<GlobalIndexCloningService::CloningStateMachine>( + _serviceContext, + this, + std::make_unique<GlobalIndexCloningStateImpl>(), + std::make_unique<GlobalIndexClonerFetcherFactory>(), + GlobalIndexClonerDoc::parse(IDLParserContext{"GlobalIndexCloner"}, initialState)); +} + +void GlobalIndexCloningService::checkIfConflictsWithOtherInstances( + OperationContext* opCtx, + BSONObj initialStateDoc, + const std::vector<const PrimaryOnlyService::Instance*>& existingInstances) { + // There are no restrictions on running concurrent global index instances. +} + +GlobalIndexCloningService::CloningStateMachine::CloningStateMachine( + ServiceContext* serviceContext, + const GlobalIndexCloningService* cloningService, + std::unique_ptr<GlobalIndexCloningService::CloningExternalState> externalState, + std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> fetcherFactory, + GlobalIndexClonerDoc clonerDoc) + : _serviceContext(serviceContext), + _cloningService(cloningService), + _execForCancelableOpCtx(std::make_shared<ThreadPool>([] { + ThreadPool::Options options; + options.poolName = "GlobalIndexCloningServiceCancelableOpCtxPool"; + options.minThreads = 0; + options.maxThreads = 1; + return options; + }())), + _clonerState(std::move(clonerDoc)), + _fetcherFactory(std::move(fetcherFactory)), + _externalState(std::move(externalState)) {} + +SemiFuture<void> GlobalIndexCloningService::CloningStateMachine::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& stepdownToken) noexcept { + auto abortToken = _initAbortSource(stepdownToken); + _execForCancelableOpCtx->startup(); + _retryingCancelableOpCtxFactory.emplace(abortToken, _execForCancelableOpCtx); + + _init(executor); + + return ExecutorFuture(**executor) + .then([this, executor, abortToken] { return _persistStateDocument(executor, abortToken); }) + .then([this, executor, abortToken] { return _runUntilDoneCloning(executor, abortToken); }) + // TODO: SERVER-68706 wait from coordinator to commit or abort. + .onCompletion([this, stepdownToken](const Status& status) { + _retryingCancelableOpCtxFactory.emplace(stepdownToken, _execForCancelableOpCtx); + return status; + }) + .then([this, executor, stepdownToken] { return _cleanup(executor, stepdownToken); }) + .thenRunOn(_cloningService->getInstanceCleanupExecutor()) + .onError([](const Status& status) { + LOGV2( + 6755903, "Global index cloner encountered an error", "error"_attr = redact(status)); + return status; + }) + .onCompletion([this, self = shared_from_this()](const Status& status) { + if (!_completionPromise.getFuture().isReady()) { + if (status.isOK()) { + _completionPromise.emplaceValue(); + } else { + _completionPromise.setError(status); + } + } + }) + .semi(); +} + +void GlobalIndexCloningService::CloningStateMachine::interrupt(Status status) {} + +void GlobalIndexCloningService::CloningStateMachine::abort() { + stdx::lock_guard<Latch> lk(_mutex); + if (_abortSource) { + _abortSource->cancel(); + } +} + +ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_cleanup( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& stepdownToken) { + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry([this, executor, stepdownToken](auto& cancelableFactory) { + return ExecutorFuture(**executor) + .then([this, executor, stepdownToken, &cancelableFactory] { + auto opCtx = cancelableFactory.makeOperationContext(&cc()); + _removeStateDocument(opCtx.get()); + }); + }) + .onTransientError([](const auto& status) {}) + .onUnrecoverableError([](const auto& status) {}) + .until<Status>([](const Status& status) { return status.isOK(); }) + .on(**executor, stepdownToken); +} + +void GlobalIndexCloningService::CloningStateMachine::_init( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + _inserter = std::make_unique<GlobalIndexInserter>(_clonerState.getNss(), + _clonerState.getIndexName(), + _clonerState.getIndexCollectionUUID(), + **executor); + + auto client = _serviceContext->makeClient("globalIndexClonerServiceInit"); + AlternativeClientRegion clientRegion(client); + + auto opCtx = _serviceContext->makeOperationContext(Client::getCurrent()); + + auto routingInfo = + _externalState->getShardedCollectionRoutingInfo(opCtx.get(), _clonerState.getNss()); + + uassert(6755901, + str::stream() << "Cannot create global index on unsharded ns " + << _clonerState.getNss().ns(), + routingInfo.isSharded()); + + auto myShardId = _externalState->myShardId(_serviceContext); + + auto indexKeyPattern = + _clonerState.getIndexSpec().getObjectField(IndexDescriptor::kKeyPatternFieldName); + _fetcher = _fetcherFactory->make(_clonerState.getNss(), + _clonerState.getCollectionUUID(), + _clonerState.getIndexCollectionUUID(), + myShardId, + _clonerState.getMinFetchTimestamp(), + routingInfo.getShardKeyPattern().getKeyPattern(), + indexKeyPattern.getOwned()); +} + +ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_runUntilDoneCloning( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken) { + + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry([this, executor, cancelToken](auto& cancelableFactory) { + return ExecutorFuture(**executor) + .then([this, executor, &cancelableFactory] { + return _initializeCollections(cancelableFactory); + }) + .then([this, executor, cancelToken, cancelableFactory] { + return _clone(executor, cancelToken, cancelableFactory); + }); + }) + .onTransientError([](const Status& status) { + + }) + .onUnrecoverableError([](const Status& status) { + + }) + .until<Status>([](const Status& status) { return status.isOK(); }) + .on(**executor, cancelToken); +} + +boost::optional<BSONObj> GlobalIndexCloningService::CloningStateMachine::reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode, + MongoProcessInterface::CurrentOpSessionsMode) noexcept { + // TODO: SERVER-68707 + return boost::none; +} + +void GlobalIndexCloningService::CloningStateMachine::abort(bool isUserCancelled) {} + +void GlobalIndexCloningService::CloningStateMachine::checkIfOptionsConflict( + const BSONObj& stateDoc) const { + auto newCloning = + GlobalIndexClonerDoc::parse(IDLParserContext("globalIndexCloningCheckConflict"), stateDoc); + uassert(6755900, + str::stream() << "new global index " << stateDoc << " is incompatible with ongoing " + << _clonerState.toBSON(), + newCloning.getNss() == _clonerState.getNss() && + newCloning.getCollectionUUID() == _clonerState.getCollectionUUID()); +} + +CancellationToken GlobalIndexCloningService::CloningStateMachine::_initAbortSource( + const CancellationToken& stepdownToken) { + { + stdx::lock_guard<Latch> lk(_mutex); + _abortSource = CancellationSource(stepdownToken); + } + + // TODO: SERVER-67563 Handle possible race between _initAbortSource and abort + + return _abortSource->token(); +} + +ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_persistStateDocument( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken) { + if (_clonerState.getState() > GlobalIndexClonerStateEnum::kUnused) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry([this, executor](auto& cancelableFactory) { + auto opCtx = cancelableFactory.makeOperationContext(Client::getCurrent()); + + GlobalIndexClonerDoc newDoc(_clonerState); + newDoc.setState(GlobalIndexClonerStateEnum::kCloning); + PersistentTaskStore<GlobalIndexClonerDoc> store(_cloningService->getStateDocumentsNS()); + store.add(opCtx.get(), newDoc, kNoWaitWriteConcern); + + std::swap(_clonerState, newDoc); + + LOGV2(6755904, "Persisted global index state document"); + }) + .onTransientError([](const Status& status) {}) + .onUnrecoverableError([](const Status& status) {}) + .until<Status>([](const Status& status) { return status.isOK(); }) + .on(**executor, cancelToken); +} + +void GlobalIndexCloningService::CloningStateMachine::_removeStateDocument(OperationContext* opCtx) { + const auto& nss = _cloningService->getStateDocumentsNS(); + writeConflictRetry( + opCtx, "GlobalIndexCloningStateMachine::removeStateDocument", nss.toString(), [&] { + AutoGetCollection coll(opCtx, nss, MODE_IX); + + if (!coll) { + return; + } + + WriteUnitOfWork wuow(opCtx); + + // Set the promise when the delete commits, this is to ensure that any interruption that + // happens later won't result in setting an error on the completion promise. + opCtx->recoveryUnit()->onCommit([this](boost::optional<Timestamp> unusedCommitTime) { + _completionPromise.emplaceValue(); + }); + + deleteObjects(opCtx, + *coll, + nss, + BSON(GlobalIndexClonerDoc::kIndexCollectionUUIDFieldName + << _clonerState.getIndexCollectionUUID()), + true /* justOne */); + + wuow.commit(); + }); +} + +void GlobalIndexCloningService::CloningStateMachine::_initializeCollections( + const CancelableOperationContextFactory& cancelableOpCtxFactory) { + auto cancelableOpCtx = cancelableOpCtxFactory.makeOperationContext(Client::getCurrent()); + auto opCtx = cancelableOpCtx.get(); + + resharding::data_copy::ensureCollectionExists( + opCtx, skipIdNss(_clonerState.getNss(), _clonerState.getIndexName()), {}); +} + +ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_clone( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& cancelToken, + const CancelableOperationContextFactory& cancelableOpCtxFactory) { + return AsyncTry([this, executor, cancelToken, cancelableOpCtxFactory] { + auto cancelableOpCtx = + cancelableOpCtxFactory.makeOperationContext(Client::getCurrent()); + _fetchNextBatch(cancelableOpCtx.get()); + + return _processBatch(executor, cancelToken, cancelableOpCtxFactory); + }) + .until([this](const Status& status) { return !status.isOK() || !_hasMoreToFetch; }) + .on(**executor, cancelToken); +} + +void GlobalIndexCloningService::CloningStateMachine::_fetchNextBatch(OperationContext* opCtx) { + if (!_fetchedDocs.empty()) { + // There are still documents that haven't not been processed from the previous attempt. + return; + } + + int totalSize = 0; + + do { + if (auto next = _fetcher->getNext(opCtx)) { + totalSize += next->indexKeyValues.objsize(); + _fetchedDocs.push(*next); + } else { + _hasMoreToFetch = false; + } + } while (totalSize < gGlobalIndexClonerServiceFetchBatchMaxSizeBytes && _hasMoreToFetch); +} + +ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_processBatch( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& cancelToken, + const CancelableOperationContextFactory& cancelableOpCtxFactory) { + return AsyncTry([this, &cancelableOpCtxFactory] { + if (_fetchedDocs.empty()) { + return; + } + + const auto& next = _fetchedDocs.front(); + + auto cancelableOpCtx = + cancelableOpCtxFactory.makeOperationContext(Client::getCurrent()); + _inserter->processDoc(cancelableOpCtx.get(), next.indexKeyValues, next.documentKey); + + _fetchedDocs.pop(); + }) + .until([this](const Status& status) { return !status.isOK() || _fetchedDocs.empty(); }) + .on(**executor, cancelToken); +} + +void GlobalIndexCloningService::CloningStateMachine::_ensureCollection(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + + // Create the destination collection if necessary. + writeConflictRetry(opCtx, "CloningStateMachine::_ensureCollection", nss.toString(), [&] { + const CollectionPtr coll = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss); + if (coll) { + return; + } + + WriteUnitOfWork wuow(opCtx); + AutoGetDb autoDb(opCtx, nss.dbName(), LockMode::MODE_IX); + Lock::CollectionLock collLock(opCtx, nss, MODE_IX); + auto db = autoDb.ensureDbExists(opCtx); + + CollectionOptions options; + db->createCollection(opCtx, nss, options); + wuow.commit(); + }); +} + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.h b/src/mongo/db/s/global_index/global_index_cloning_service.h new file mode 100644 index 00000000000..6342938e529 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_cloning_service.h @@ -0,0 +1,220 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <queue> + +#include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/s/global_index/global_index_cloner_fetcher_factory.h" +#include "mongo/db/s/global_index/global_index_cloner_gen.h" +#include "mongo/db/s/global_index/global_index_inserter.h" +#include "mongo/db/s/resharding/resharding_future_util.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { +namespace global_index { + +class GlobalIndexCloningService : public repl::PrimaryOnlyService { +public: + static constexpr StringData kServiceName = "GlobalIndexCloningService"_sd; + + explicit GlobalIndexCloningService(ServiceContext* serviceContext); + + class CloningExternalState; + class CloningStateMachine; + + StringData getServiceName() const override { + return kServiceName; + } + + NamespaceString getStateDocumentsNS() const override { + return NamespaceString::kGlobalIndexClonerNamespace; + } + + ThreadPool::Limits getThreadPoolLimits() const override; + + void checkIfConflictsWithOtherInstances( + OperationContext* opCtx, + BSONObj initialState, + const std::vector<const repl::PrimaryOnlyService::Instance*>& existingInstances) override; + + std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance( + BSONObj initialState) override; + +private: + ServiceContext* const _serviceContext; +}; + +/** + * Represents the current state of a global index operation on this shard. This class drives state + * transitions and updates to underlying on-disk metadata. + */ +class GlobalIndexCloningService::CloningStateMachine final + : public repl::PrimaryOnlyService::TypedInstance<CloningStateMachine> { +public: + CloningStateMachine( + ServiceContext* service, + const GlobalIndexCloningService* cloningService, + std::unique_ptr<GlobalIndexCloningService::CloningExternalState> externalState, + std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> fetcherFactory, + GlobalIndexClonerDoc clonerDoc); + + SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept override; + + void interrupt(Status status) override; + + void abort(); + + /** + * Returns a Future that will be resolved when all work associated with this Instance is done + * making forward progress. + */ + SharedSemiFuture<void> getCompletionFuture() const { + return _completionPromise.getFuture(); + } + + boost::optional<BSONObj> reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode, + MongoProcessInterface::CurrentOpSessionsMode) noexcept override; + + /** + * Initiates the cancellation of the cloning operation. + */ + void abort(bool isUserCancelled); + + void checkIfOptionsConflict(const BSONObj& stateDoc) const final; + +private: + /** + * Initializes the _abortSource and generates a token from it to return back the caller. + * Should only be called once per lifetime. + */ + CancellationToken _initAbortSource(const CancellationToken& stepdownToken); + + /** + * Initializes the necessary components. + */ + void _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Make sure that the necessary collections are created. + */ + void _initializeCollections(const CancelableOperationContextFactory& cancelableOpCtxFactory); + + /** + * Inserts the state document to storage. + */ + ExecutorFuture<void> _persistStateDocument( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& cancelToken); + + /** + * Deletes the state document from storage. + */ + void _removeStateDocument(OperationContext* opCtx); + + /** + * Performs the entire cloning process. + */ + ExecutorFuture<void> _runUntilDoneCloning( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& stepdownToken); + + /** + * Iterates over the documents from the source collection and convert them into global index + * entries. + */ + ExecutorFuture<void> _clone(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& cancelToken, + const CancelableOperationContextFactory& cancelableOpCtxFactory); + + /** + * Removes the side collections created by this cloner. + */ + ExecutorFuture<void> _cleanup(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& stepdownToken); + + /** + * Fetches documents from source collection and store them in a queue. + */ + void _fetchNextBatch(OperationContext* opCtx); + + /** + * Convert fetched documents to global index entries. + */ + ExecutorFuture<void> _processBatch( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& cancelToken, + const CancelableOperationContextFactory& cancelableOpCtxFactory); + + /** + * Create collection with the given namespace only if it doesn't exist. + */ + void _ensureCollection(OperationContext* opCtx, const NamespaceString& nss); + + ServiceContext* const _serviceContext; + + // The primary-only service instance corresponding to the cloner instance. Not owned. + const GlobalIndexCloningService* const _cloningService; + + // A separate executor different from the one supplied by the primary only service is needed + // because the one from POS can be shut down during step down. This will ensure that the + // operation context created from the cancelableOpCtxFactory can be interrupted when the cancel + // token is aborted during step down. + const std::shared_ptr<ThreadPool> _execForCancelableOpCtx; + + boost::optional<resharding::RetryingCancelableOperationContextFactory> + _retryingCancelableOpCtxFactory; + + Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningStateMachine::_mutex"); + + GlobalIndexClonerDoc _clonerState; + + // Canceled when there is an unrecoverable error or stepdown. + boost::optional<CancellationSource> _abortSource; + + std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> _fetcherFactory; + std::unique_ptr<GlobalIndexClonerFetcherInterface> _fetcher; + std::unique_ptr<GlobalIndexInserter> _inserter; + + // Keeps track if there is still a posibility that we still have documents that needs to be + // fetched from the source collection. + bool _hasMoreToFetch{true}; + + std::queue<GlobalIndexClonerFetcher::FetchedEntry> _fetchedDocs; + + SharedPromise<void> _completionPromise; + const std::unique_ptr<CloningExternalState> _externalState; +}; + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp new file mode 100644 index 00000000000..187271df7cf --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp @@ -0,0 +1,484 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/commands/list_collections_filter.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/op_observer/op_observer_noop.h" +#include "mongo/db/op_observer/op_observer_registry.h" +#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h" +#include "mongo/db/repl/database_cloner_gen.h" +#include "mongo/db/repl/primary_only_service_test_fixture.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/db/s/global_index/global_index_cloning_external_state.h" +#include "mongo/db/s/global_index/global_index_cloning_service.h" +#include "mongo/db/s/global_index/global_index_util.h" +#include "mongo/db/session/logical_session_cache_noop.h" +#include "mongo/db/session/session_catalog_mongod.h" +#include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h" +#include "mongo/idl/server_parameter_test_util.h" +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +namespace mongo { +namespace global_index { +namespace { + +const ShardId kRecipientShardId{"myShardId"}; +const NamespaceString kSourceNss{"sourcedb", "sourcecollection"}; +constexpr auto kSourceShardKey = "key"_sd; + +class GlobalIndexExternalStateForTest : public GlobalIndexCloningService::CloningExternalState { +public: + ShardId myShardId(ServiceContext* serviceContext) const override { + return kRecipientShardId; + } + + ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) const override { + invariant(nss == kSourceNss); + + const OID epoch = OID::gen(); + std::vector<ChunkType> chunks = { + ChunkType{_sourceUUID, + ChunkRange{BSON(kSourceShardKey << MINKEY), BSON(kSourceShardKey << MAXKEY)}, + ChunkVersion({epoch, Timestamp(1, 1)}, {100, 0}), + _someDonorId}}; + + auto rt = RoutingTableHistory::makeNew(kSourceNss, + _sourceUUID, + BSON(kSourceShardKey << 1), + nullptr /* defaultCollator */, + false /* unique */, + std::move(epoch), + Timestamp(1, 1), + boost::none /* timeseriesFields */, + boost::none /* reshardingFields */, + boost::none /* chunkSizeBytes */, + true /* allowMigrations */, + chunks); + + return ChunkManager(_someDonorId, + DatabaseVersion(UUID::gen(), Timestamp(1, 1)), + _makeStandaloneRoutingTableHistory(std::move(rt)), + boost::none /* clusterTime */); + } + +private: + RoutingTableHistoryValueHandle _makeStandaloneRoutingTableHistory( + RoutingTableHistory rt) const { + const auto version = rt.getVersion(); + return RoutingTableHistoryValueHandle( + std::make_shared<RoutingTableHistory>(std::move(rt)), + ComparableChunkVersion::makeComparableChunkVersion(version)); + } + + const UUID _sourceUUID{UUID::gen()}; + const ShardId _someDonorId{"otherShardId"}; +}; + +class MockGlobalIndexClonerFetcher : public GlobalIndexClonerFetcherInterface { +public: + void setResultList(std::list<FetchedEntry> newResults) { + _docs = std::move(newResults); + } + + boost::optional<FetchedEntry> getNext(OperationContext* opCtx) override { + if (_docs.empty()) { + return boost::none; + } + + auto ret = _docs.front(); + _docs.pop_front(); + return ret; + } + +private: + std::list<FetchedEntry> _docs; +}; + +class GlobalIndexCloningFetcherFactoryForTest : public GlobalIndexClonerFetcherFactoryInterface { +public: + explicit GlobalIndexCloningFetcherFactoryForTest(MockGlobalIndexClonerFetcher* mockFetcher) + : _mockFetcher(mockFetcher) {} + + std::unique_ptr<GlobalIndexClonerFetcherInterface> make( + NamespaceString nss, + UUID collUUId, + UUID indexUUID, + ShardId myShardId, + Timestamp minFetchTimestamp, + KeyPattern sourceShardKeyPattern, + KeyPattern globalIndexPattern) override { + return std::make_unique<MockGlobalIndexClonerFetcher>(*_mockFetcher); + } + +private: + MockGlobalIndexClonerFetcher* _mockFetcher; +}; + +class GlobalIndexCloningServiceForTest : public GlobalIndexCloningService { +public: + explicit GlobalIndexCloningServiceForTest(ServiceContext* serviceContext, + MockGlobalIndexClonerFetcher* mockFetcher) + : GlobalIndexCloningService(serviceContext), + _serviceContext(serviceContext), + _mockFetcher(mockFetcher) {} + + std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance( + BSONObj initialState) override { + return std::make_shared<GlobalIndexCloningService::CloningStateMachine>( + _serviceContext, + this, + std::make_unique<GlobalIndexExternalStateForTest>(), + std::make_unique<GlobalIndexCloningFetcherFactoryForTest>(_mockFetcher), + GlobalIndexClonerDoc::parse(IDLParserContext{"GlobalIndexCloningServiceForTest"}, + initialState)); + } + +private: + ServiceContext* _serviceContext; + MockGlobalIndexClonerFetcher* _mockFetcher; +}; + +class Blocker { +public: + ~Blocker() { + stdx::unique_lock lk(_mutex); + _shouldBlock = false; + _cvBlocked.notify_all(); + } + + void blockIfActivated(OperationContext* opCtx) { + stdx::unique_lock lk(_mutex); + _blockedOnce = true; + opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return !_shouldBlock; }); + } + + void waitUntilBlockedOccurred(OperationContext* opCtx) { + stdx::unique_lock lk(_mutex); + opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return _blockedOnce; }); + } + + void block() { + stdx::unique_lock lk(_mutex); + _shouldBlock = true; + } + + void unblock() { + stdx::unique_lock lk(_mutex); + _shouldBlock = false; + } + +private: + Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningServiceTestBlocker::_mutex"); + stdx::condition_variable _cvBlocked; + bool _shouldBlock{false}; + bool _blockedOnce{false}; +}; + +class OpObserverForTest : public OpObserverNoop { +public: + OpObserverForTest(Blocker* insertBlocker, Blocker* deleteBlocker) + : _insertBlocker(insertBlocker), _deleteBlocker(deleteBlocker) {} + + void onInserts(OperationContext* opCtx, + const CollectionPtr& coll, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) override { + if (NamespaceString::kGlobalIndexClonerNamespace == coll->ns()) { + _insertBlocker->blockIfActivated(opCtx); + } + } + + void onDelete(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + StmtId stmtId, + const OplogDeleteEntryArgs& args) override { + if (NamespaceString::kGlobalIndexClonerNamespace == nss) { + _deleteBlocker->blockIfActivated(opCtx); + } + } + +private: + Blocker* _insertBlocker; + Blocker* _deleteBlocker; +}; + +GlobalIndexCloningService::InstanceID extractInstanceId(const GlobalIndexClonerDoc& doc) { + return BSON("_id" << doc.getIndexCollectionUUID()); +} + +using GlobalIndexStateMachine = GlobalIndexCloningServiceForTest::CloningStateMachine; + +class GlobalIndexClonerServiceTest : public repl::PrimaryOnlyServiceMongoDTest { +public: + std::unique_ptr<repl::PrimaryOnlyService> makeService(ServiceContext* serviceContext) override { + return std::make_unique<GlobalIndexCloningServiceForTest>(serviceContext, &_mockFetcher); + } + + void setUp() override { + repl::PrimaryOnlyServiceMongoDTest::setUp(); + + auto serviceContext = getServiceContext(); + auto storageMock = std::make_unique<repl::StorageInterfaceMock>(); + repl::StorageInterface::set(serviceContext, std::move(storageMock)); + + // The ReadWriteConcernDefaults decoration on the service context won't always be created, + // so we should manually instantiate it to ensure it exists in our tests. + ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn()); + + _opObserverRegistry->addObserver( + std::make_unique<OpObserverForTest>(&_stateDocInsertBlocker, &_stateDocDeleteBlocker)); + + // Create config.transactions collection + auto opCtx = serviceContext->makeOperationContext(Client::getCurrent()); + DBDirectClient client(opCtx.get()); + client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); + + MongoDSessionCatalog::set( + getServiceContext(), + std::make_unique<MongoDSessionCatalog>( + std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>())); + + // Session cache is needed otherwise client session info will ignored. + LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); + + std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> fetcherResults; + fetcherResults.push_front( + {BSON("_id" << 10 << kSourceShardKey << 20), BSON(_indexKey << 30)}); + replaceFetcherResultList(std::move(fetcherResults)); + + const auto& indexNs = globalIndexNss(kSourceNss, _indexName); + client.createCollection(indexNs.ns()); + auto all = client.getCollectionInfos(indexNs.db().toString(), + BSON("name" << indexNs.coll().toString())); + + ASSERT_EQ(1, all.size()); + _indexCollectionUUID.emplace(uassertStatusOK(UUID::parse(all.front()["info"]["uuid"]))); + } + + void checkIndexCollection(OperationContext* opCtx) { + DBDirectClient client(opCtx); + + MockGlobalIndexClonerFetcher fetcherCopy(_fetcherCopyForVerification); + while (auto next = fetcherCopy.getNext(opCtx)) { + FindCommandRequest query(globalIndexNss(kSourceNss, _indexName)); + query.setFilter(BSON("_id" << next->indexKeyValues)); + + auto doc = client.findOne(query); + ASSERT_TRUE(!doc.isEmpty()) + << "doc with index key: " << next->indexKeyValues + << " missing in global index output collection: " << dumpOutputColl(opCtx); + } + } + + GlobalIndexClonerDoc makeStateDocument() { + return GlobalIndexClonerDoc(*_indexCollectionUUID, + kSourceNss, + _collectionUUID, + _indexName, + _indexSpec, + {}, + GlobalIndexClonerStateEnum::kUnused); + } + + bool doesCollectionExist(OperationContext* opCtx, const NamespaceString& nss) { + DBDirectClient client(opCtx); + auto collectionInfos = client.getCollectionInfos( + nss.db().toString(), ListCollectionsFilter::makeTypeCollectionFilter()); + + for (auto&& info : collectionInfos) { + auto coll = + repl::ListCollectionResult::parse(IDLParserContext("doesCollectionExist"), info); + + if (coll.getName() == nss.coll()) { + return true; + } + } + + return false; + } + + Blocker* getStateDocInsertBlocker() { + return &_stateDocInsertBlocker; + } + + Blocker* getStateDocDeleteBlocker() { + return &_stateDocDeleteBlocker; + } + + void replaceFetcherResultList( + std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> newResults) { + _mockFetcher.setResultList(std::move(newResults)); + _fetcherCopyForVerification = _mockFetcher; + } + + StringData indexKey() const { + return _indexKey; + } + +private: + std::string dumpOutputColl(OperationContext* opCtx) { + DBDirectClient client(opCtx); + FindCommandRequest query(globalIndexNss(kSourceNss, _indexName)); + + std::ostringstream outputStr; + auto res = client.find(query); + + if (!res || !res->more()) { + return "<empty>"; + } + + outputStr << "docs: " << std::endl; + while (res->more()) { + auto doc = res->next(); + outputStr << doc.toString() << std::endl; + } + + return outputStr.str(); + } + + boost::optional<UUID> _indexCollectionUUID; + const UUID _collectionUUID{UUID::gen()}; + const std::string _indexName{"global_x_1"}; + const StringData _indexKey{"x"}; + const BSONObj _indexSpec{BSON("key" << BSON(_indexKey << 1) << "unique" << true)}; + + ReadWriteConcernDefaultsLookupMock _lookupMock; + Blocker _stateDocInsertBlocker; + Blocker _stateDocDeleteBlocker; + + MockGlobalIndexClonerFetcher _mockFetcher; + MockGlobalIndexClonerFetcher _fetcherCopyForVerification; +}; + +TEST_F(GlobalIndexClonerServiceTest, CloneInsertsToGlobalIndexCollection) { + auto doc = makeStateDocument(); + auto opCtx = makeOperationContext(); + auto rawOpCtx = opCtx.get(); + + auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); + auto future = cloner->getCompletionFuture(); + future.get(); + + ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName()))); + checkIndexCollection(rawOpCtx); +} + +TEST_F(GlobalIndexClonerServiceTest, ShouldBeSafeToRetryOnStepDown) { + auto doc = makeStateDocument(); + auto opCtx = makeOperationContext(); + auto rawOpCtx = opCtx.get(); + + auto stateDocInsertBlocker = getStateDocInsertBlocker(); + stateDocInsertBlocker->block(); + auto stateDocDeleteBlocker = getStateDocDeleteBlocker(); + stateDocDeleteBlocker->block(); + + { + auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); + stateDocInsertBlocker->waitUntilBlockedOccurred(rawOpCtx); + stepDown(); + + ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException); + } + + stepUp(rawOpCtx); + + { + auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); + stateDocInsertBlocker->unblock(); + stateDocDeleteBlocker->waitUntilBlockedOccurred(rawOpCtx); + stepDown(); + + ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException); + } + + stepUp(rawOpCtx); + + // It is possible for the primary only service to run to completion and no longer exists. + { + auto cloner = GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc)); + stateDocDeleteBlocker->unblock(); + (*cloner)->getCompletionFuture().get(); + } + + checkIndexCollection(rawOpCtx); +} + +TEST_F(GlobalIndexClonerServiceTest, ShouldBeAbleToConsumeMultipleBatchesWorthofDocs) { + std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> fetcherResults; + + RAIIServerParameterControllerForTest batchSizeForTest( + "globalIndexClonerServiceFetchBatchMaxSizeBytes", 50); + std::string padding(50, 'x'); + + // Populate enough to have more than one batch worth of documents. + for (int x = 0; x < 4; x++) { + fetcherResults.push_front({BSON("_id" << x << kSourceShardKey << x), + BSON(indexKey() << (std::to_string(x) + padding))}); + } + replaceFetcherResultList(std::move(fetcherResults)); + + auto doc = makeStateDocument(); + auto opCtx = makeOperationContext(); + auto rawOpCtx = opCtx.get(); + + auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); + auto future = cloner->getCompletionFuture(); + future.get(); + + ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName()))); + checkIndexCollection(rawOpCtx); +} + +TEST_F(GlobalIndexClonerServiceTest, ShouldWorkWithEmptyCollection) { + replaceFetcherResultList({}); + + auto doc = makeStateDocument(); + auto opCtx = makeOperationContext(); + auto rawOpCtx = opCtx.get(); + + auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); + auto future = cloner->getCompletionFuture(); + future.get(); + + ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName()))); + checkIndexCollection(rawOpCtx); +} + +} // namespace +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp index eff9cd7073e..ac1a82a8cf4 100644 --- a/src/mongo/db/s/global_index/global_index_inserter.cpp +++ b/src/mongo/db/s/global_index/global_index_inserter.cpp @@ -32,6 +32,7 @@ #include <fmt/format.h> #include "mongo/db/s/global_index/global_index_entry_gen.h" +#include "mongo/db/s/global_index/global_index_util.h" #include "mongo/db/transaction/transaction_api.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" @@ -56,12 +57,11 @@ GlobalIndexInserter::GlobalIndexInserter(NamespaceString nss, _executor(std::move(executor)) {} NamespaceString GlobalIndexInserter::_skipIdNss() { - return NamespaceString(NamespaceString::kConfigDb, - "{}.globalIndex.{}.skipList"_format(_nss.coll(), _indexName)); + return skipIdNss(_nss, _indexName); } NamespaceString GlobalIndexInserter::_globalIndexNss() { - return NamespaceString(_nss.db(), "{}.globalIndex.{}"_format(_nss.coll(), _indexName)); + return globalIndexNss(_nss, _indexName); } void GlobalIndexInserter::processDoc(OperationContext* opCtx, diff --git a/src/mongo/db/s/global_index/global_index_inserter_test.cpp b/src/mongo/db/s/global_index/global_index_inserter_test.cpp index 2b38bd9d674..51cff39ec44 100644 --- a/src/mongo/db/s/global_index/global_index_inserter_test.cpp +++ b/src/mongo/db/s/global_index/global_index_inserter_test.cpp @@ -33,6 +33,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/global_index/global_index_entry_gen.h" +#include "mongo/db/s/global_index/global_index_util.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/db/session/logical_session_cache_noop.h" @@ -102,8 +103,7 @@ public: } NamespaceString skipIdNss() const { - return NamespaceString(NamespaceString::kConfigDb, - "{}.globalIndex.{}.skipList"_format(_nss.coll(), _indexName)); + return global_index::skipIdNss(_nss, _indexName); } NamespaceString globalIndexNss() const { diff --git a/src/mongo/db/s/global_index/global_index_server_parameters.idl b/src/mongo/db/s/global_index/global_index_server_parameters.idl new file mode 100644 index 00000000000..56a0ca88998 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_server_parameters.idl @@ -0,0 +1,55 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# Server parameters for global index. + +global: + cpp_namespace: "mongo::global_index" + +imports: + - "mongo/db/basic_types.idl" + +server_parameters: + globalIndexClonerServiceMaxThreadCount: + description: The max number of threads in the global index cloner's thread pool. + set_at: startup + cpp_vartype: int + cpp_varname: gGlobalIndexClonerServiceMaxThreadCount + default: 8 + validator: + gte: 1 + lte: 256 + + globalIndexClonerServiceFetchBatchMaxSizeBytes: + description: The max number of threads in the global index cloner's thread pool. + set_at: startup + cpp_vartype: int + cpp_varname: gGlobalIndexClonerServiceFetchBatchMaxSizeBytes + default: 102400 + validator: + gte: 1 diff --git a/src/mongo/db/s/global_index/global_index_util.cpp b/src/mongo/db/s/global_index/global_index_util.cpp new file mode 100644 index 00000000000..306b02ba071 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_util.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/global_index/global_index_util.h" + +#include <fmt/format.h> + +using namespace fmt::literals; + +namespace mongo { +namespace global_index { + +NamespaceString skipIdNss(const NamespaceString& nss, StringData indexName) { + return NamespaceString(nss.db(), + "{}.globalIndex.{}.skipList"_format(nss.coll(), indexName.toString())); +} + +NamespaceString globalIndexNss(const NamespaceString& nss, StringData indexName) { + return NamespaceString(nss.db(), "{}.globalIndex.{}"_format(nss.coll(), indexName)); +} + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_util.h b/src/mongo/db/s/global_index/global_index_util.h new file mode 100644 index 00000000000..f3fbc62b000 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_util.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/namespace_string.h" + +namespace mongo { +namespace global_index { + +/** + * Returns the namespace used by global indexes to store ids of documents that has already been + * inserted to the global index collection. + */ +NamespaceString skipIdNss(const NamespaceString& nss, StringData indexName); + +/** + * Returns the namespace of global index collection. This should be removed once the global index + * catalog api is available. + */ +NamespaceString globalIndexNss(const NamespaceString& nss, StringData indexName); + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_future_util.h b/src/mongo/db/s/resharding/resharding_future_util.h index 0bb858a963c..005d8fceba5 100644 --- a/src/mongo/db/s/resharding/resharding_future_util.h +++ b/src/mongo/db/s/resharding/resharding_future_util.h @@ -183,7 +183,6 @@ public: return resharding::WithAutomaticRetry([this, body]() { return body(_factory); }); } - private: const CancelableOperationContextFactory _factory; }; |