summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2022-08-11 14:23:21 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-02 14:45:11 +0000
commit22adb905bd9ab2c957998577ba2838021423e4ed (patch)
treeb15116aec5fa5b09a9339d2a560f6804d7e52eb4
parent8581b4968646ea09b703d985c22b08cc01102597 (diff)
downloadmongo-22adb905bd9ab2c957998577ba2838021423e4ed.tar.gz
SERVER-67559 Implement global index cloner primary only service
-rw-r--r--src/mongo/db/namespace_string.cpp3
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/s/SConscript9
-rw-r--r--src/mongo/db/s/global_index/global_index_cloner.idl74
-rw-r--r--src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp8
-rw-r--r--src/mongo/db/s/global_index/global_index_cloner_fetcher.h24
-rw-r--r--src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.cpp53
-rw-r--r--src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.h63
-rw-r--r--src/mongo/db/s/global_index/global_index_cloning_external_state.cpp49
-rw-r--r--src/mongo/db/s/global_index/global_index_cloning_external_state.h61
-rw-r--r--src/mongo/db/s/global_index/global_index_cloning_service.cpp393
-rw-r--r--src/mongo/db/s/global_index/global_index_cloning_service.h220
-rw-r--r--src/mongo/db/s/global_index/global_index_cloning_service_test.cpp484
-rw-r--r--src/mongo/db/s/global_index/global_index_inserter.cpp6
-rw-r--r--src/mongo/db/s/global_index/global_index_inserter_test.cpp4
-rw-r--r--src/mongo/db/s/global_index/global_index_server_parameters.idl55
-rw-r--r--src/mongo/db/s/global_index/global_index_util.cpp49
-rw-r--r--src/mongo/db/s/global_index/global_index_util.h50
-rw-r--r--src/mongo/db/s/resharding/resharding_future_util.h1
19 files changed, 1594 insertions, 15 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 337cdcf5d54..681aa0d2e16 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -188,6 +188,9 @@ const NamespaceString NamespaceString::kDistLocksNamepsace(NamespaceString::kCon
const NamespaceString NamespaceString::kSetChangeStreamStateCoordinatorNamespace(
NamespaceString::kConfigDb, "change_stream_coordinator");
+const NamespaceString NamespaceString::kGlobalIndexClonerNamespace(
+ NamespaceString::kConfigDb, "localGlobalIndexOperations.cloner");
+
NamespaceString NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(StringData ns) {
if (!gMultitenancySupport) {
return NamespaceString(ns, boost::none);
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 4d7cbfa6dab..5045d925c92 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -258,6 +258,9 @@ public:
// Namespace used to store the state document of 'SetChangeStreamStateCoordinator'.
static const NamespaceString kSetChangeStreamStateCoordinatorNamespace;
+ // Namespace used for storing global index cloner state documents.
+ static const NamespaceString kGlobalIndexClonerNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 3cccac32d2d..c2b6ea19b70 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -58,8 +58,14 @@ env.Library(
'commit_chunk_migration.idl',
'config_server_op_observer.cpp',
'global_index/global_index_cloner_fetcher.cpp',
+ 'global_index/global_index_cloner_fetcher_factory.cpp',
+ 'global_index/global_index_cloner.idl',
+ 'global_index/global_index_cloning_external_state.cpp',
+ 'global_index/global_index_cloning_service.cpp',
'global_index/global_index_entry.idl',
'global_index/global_index_inserter.cpp',
+ 'global_index/global_index_server_parameters.idl',
+ 'global_index/global_index_util.cpp',
'global_index_cumulative_metrics_field_name_provider.cpp',
'global_index_cumulative_metrics.cpp',
'global_index_metrics.cpp',
@@ -596,6 +602,7 @@ env.CppUnitTest(
'collection_sharding_runtime_test.cpp',
'database_sharding_state_test.cpp',
'global_index/global_index_cloner_fetcher_test.cpp',
+ 'global_index/global_index_cloning_service_test.cpp',
'global_index/global_index_inserter_test.cpp',
'global_index_metrics_test.cpp',
'global_index_cumulative_metrics_test.cpp',
@@ -663,6 +670,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/catalog_test_fixture',
'$BUILD_DIR/mongo/db/catalog/database_holder',
+ '$BUILD_DIR/mongo/db/commands/list_collections_filter',
'$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util',
'$BUILD_DIR/mongo/db/keys_collection_client_direct',
'$BUILD_DIR/mongo/db/op_observer/op_observer',
@@ -674,6 +682,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/query/query_request',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'$BUILD_DIR/mongo/db/query_expressions',
+ '$BUILD_DIR/mongo/db/read_write_concern_defaults_mock',
'$BUILD_DIR/mongo/db/repl/image_collection_entry',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/db/repl/oplog_application',
diff --git a/src/mongo/db/s/global_index/global_index_cloner.idl b/src/mongo/db/s/global_index/global_index_cloner.idl
new file mode 100644
index 00000000000..81ab433f171
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloner.idl
@@ -0,0 +1,74 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This file contains the schema for the document representing a global index cloner primary only
+# service state document.
+
+global:
+ cpp_namespace: "mongo::global_index"
+
+imports:
+ - "mongo/db/basic_types.idl"
+
+enums:
+ GlobalIndexClonerState:
+ description: "The current state of a global index cloner."
+ type: string
+ values:
+ kUnused: "unused"
+ kCloning: "cloning"
+
+structs:
+ GlobalIndexClonerDoc:
+ description: "Document containing the state and specs of global index cloning operation."
+ strict: false
+ fields:
+ _id:
+ type: uuid
+ cpp_name: indexCollectionUUID
+ description: "The uuid of the global index collection."
+ nss:
+ type: namespacestring
+ description: "The namespace of the source collection."
+ collectionUUID:
+ type: uuid
+ description: "The uuid of source collection."
+ indexName:
+ type: string
+ description: "The name of the global index being created."
+ indexSpec:
+ type: object_owned
+ description: "Temporary placeholder for index spec until global index api is
+ available."
+ minFetchTimestamp:
+ type: timestamp
+ description: "The minimum timestamp to use for fetching documents from the source
+ collection"
+ state:
+ type: GlobalIndexClonerState
+ description: "The current state of cloner."
diff --git a/src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp b/src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp
index b67dfc4c717..e8421758f8f 100644
--- a/src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp
+++ b/src/mongo/db/s/global_index/global_index_cloner_fetcher.cpp
@@ -156,6 +156,10 @@ GlobalIndexClonerFetcher::GlobalIndexClonerFetcher(NamespaceString nss,
boost::optional<GlobalIndexClonerFetcher::FetchedEntry> GlobalIndexClonerFetcher::getNext(
OperationContext* opCtx) {
+ if (!_pipeline) {
+ _pipeline = _restartPipeline(opCtx);
+ }
+
_pipeline->reattachToOperationContext(opCtx);
ON_BLOCK_EXIT([this] { _pipeline->detachFromOperationContext(); });
@@ -164,10 +168,6 @@ boost::optional<GlobalIndexClonerFetcher::FetchedEntry> GlobalIndexClonerFetcher
_pipeline.reset();
});
- if (!_pipeline) {
- _pipeline = _restartPipeline(opCtx);
- }
-
auto next = _pipeline->getNext();
guard.dismiss();
diff --git a/src/mongo/db/s/global_index/global_index_cloner_fetcher.h b/src/mongo/db/s/global_index/global_index_cloner_fetcher.h
index 192cd4988a3..9c5cebc81be 100644
--- a/src/mongo/db/s/global_index/global_index_cloner_fetcher.h
+++ b/src/mongo/db/s/global_index/global_index_cloner_fetcher.h
@@ -39,17 +39,31 @@
namespace mongo {
namespace global_index {
-/**
- * Responsible for fetching documents to clone for a particular shard.
- */
-class GlobalIndexClonerFetcher {
+class GlobalIndexClonerFetcherInterface {
public:
struct FetchedEntry {
public:
+ FetchedEntry(BSONObj _documentKey, BSONObj _indexKeyValues)
+ : documentKey(_documentKey.getOwned()), indexKeyValues(_indexKeyValues.getOwned()) {}
+
BSONObj documentKey;
BSONObj indexKeyValues;
};
+ virtual ~GlobalIndexClonerFetcherInterface() {}
+
+ /**
+ * Returns the next document to clone.
+ * Returns boost::none if there are no documents left.
+ */
+ virtual boost::optional<FetchedEntry> getNext(OperationContext* opCtx) = 0;
+};
+
+/**
+ * Responsible for fetching documents to clone for a particular shard.
+ */
+class GlobalIndexClonerFetcher : public GlobalIndexClonerFetcherInterface {
+public:
GlobalIndexClonerFetcher(NamespaceString nss,
UUID collUUId,
UUID indexUUID,
@@ -58,7 +72,7 @@ public:
KeyPattern sourceShardKeyPattern,
KeyPattern globalIndexPattern);
- boost::optional<FetchedEntry> getNext(OperationContext* opCtx);
+ boost::optional<FetchedEntry> getNext(OperationContext* opCtx) override;
/**
* Builds the aggregation pipeline for fetching the documents
diff --git a/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.cpp b/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.cpp
new file mode 100644
index 00000000000..9d8a87847d6
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.cpp
@@ -0,0 +1,53 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/global_index/global_index_cloner_fetcher_factory.h"
+
+namespace mongo {
+namespace global_index {
+
+std::unique_ptr<GlobalIndexClonerFetcherInterface> GlobalIndexClonerFetcherFactory::make(
+ NamespaceString nss,
+ UUID collUUID,
+ UUID indexUUID,
+ ShardId myShardId,
+ Timestamp minFetchTimestamp,
+ KeyPattern sourceShardKeyPattern,
+ KeyPattern globalIndexPattern) {
+ return std::make_unique<GlobalIndexClonerFetcher>(std::move(nss),
+ std::move(collUUID),
+ std::move(indexUUID),
+ std::move(myShardId),
+ std::move(minFetchTimestamp),
+ std::move(sourceShardKeyPattern),
+ std::move(globalIndexPattern));
+}
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.h b/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.h
new file mode 100644
index 00000000000..46c4422dd2b
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloner_fetcher_factory.h
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/s/global_index/global_index_cloner_fetcher.h"
+
+namespace mongo {
+namespace global_index {
+
+class GlobalIndexClonerFetcherFactoryInterface {
+public:
+ virtual ~GlobalIndexClonerFetcherFactoryInterface() {}
+
+ virtual std::unique_ptr<GlobalIndexClonerFetcherInterface> make(
+ NamespaceString nss,
+ UUID collUUID,
+ UUID indexUUID,
+ ShardId myShardId,
+ Timestamp minFetchTimestamp,
+ KeyPattern sourceShardKeyPattern,
+ KeyPattern globalIndexPattern) = 0;
+};
+
+class GlobalIndexClonerFetcherFactory : public GlobalIndexClonerFetcherFactoryInterface {
+public:
+ std::unique_ptr<GlobalIndexClonerFetcherInterface> make(NamespaceString nss,
+ UUID collUUID,
+ UUID indexUUID,
+ ShardId myShardId,
+ Timestamp minFetchTimestamp,
+ KeyPattern sourceShardKeyPattern,
+ KeyPattern globalIndexPattern) override;
+};
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_cloning_external_state.cpp b/src/mongo/db/s/global_index/global_index_cloning_external_state.cpp
new file mode 100644
index 00000000000..7473479e716
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloning_external_state.cpp
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/global_index/global_index_cloning_external_state.h"
+
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+namespace global_index {
+
+ShardId GlobalIndexCloningStateImpl::myShardId(ServiceContext* service) const {
+ return ShardingState::get(service)->shardId();
+}
+
+ChunkManager GlobalIndexCloningStateImpl::getShardedCollectionRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& nss) const {
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ return catalogCache->getShardedCollectionRoutingInfo(opCtx, nss);
+}
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_cloning_external_state.h b/src/mongo/db/s/global_index/global_index_cloning_external_state.h
new file mode 100644
index 00000000000..ef039841a9c
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloning_external_state.h
@@ -0,0 +1,61 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/s/global_index/global_index_cloning_service.h"
+
+#include "mongo/s/chunk_manager.h"
+
+namespace mongo {
+namespace global_index {
+
+/**
+ * Interface for using functionality from other modules (mostly sharding) to allow mocking in tests.
+ */
+class GlobalIndexCloningService::CloningExternalState {
+public:
+ virtual ~CloningExternalState() = default;
+
+ virtual ShardId myShardId(ServiceContext* service) const = 0;
+
+ virtual ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) const = 0;
+};
+
+class GlobalIndexCloningStateImpl : public GlobalIndexCloningService::CloningExternalState {
+public:
+ ShardId myShardId(ServiceContext* service) const override;
+
+ ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) const override;
+};
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.cpp b/src/mongo/db/s/global_index/global_index_cloning_service.cpp
new file mode 100644
index 00000000000..0002d7ab032
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloning_service.cpp
@@ -0,0 +1,393 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/global_index/global_index_cloning_service.h"
+
+#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/ops/delete.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/s/global_index/global_index_cloning_external_state.h"
+#include "mongo/db/s/global_index/global_index_server_parameters_gen.h"
+#include "mongo/db/s/global_index/global_index_util.h"
+#include "mongo/db/s/resharding/resharding_data_copy_util.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/future_util.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kGlobalIndex
+
+namespace mongo {
+namespace global_index {
+
+namespace {
+
+const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)};
+
+} // namespace
+
+GlobalIndexCloningService::GlobalIndexCloningService(ServiceContext* serviceContext)
+ : PrimaryOnlyService(serviceContext), _serviceContext(serviceContext) {}
+
+ThreadPool::Limits GlobalIndexCloningService::getThreadPoolLimits() const {
+ ThreadPool::Limits threadPoolLimit;
+ threadPoolLimit.maxThreads = gGlobalIndexClonerServiceMaxThreadCount;
+ return threadPoolLimit;
+}
+
+std::shared_ptr<repl::PrimaryOnlyService::Instance> GlobalIndexCloningService::constructInstance(
+ BSONObj initialState) {
+ return std::make_shared<GlobalIndexCloningService::CloningStateMachine>(
+ _serviceContext,
+ this,
+ std::make_unique<GlobalIndexCloningStateImpl>(),
+ std::make_unique<GlobalIndexClonerFetcherFactory>(),
+ GlobalIndexClonerDoc::parse(IDLParserContext{"GlobalIndexCloner"}, initialState));
+}
+
+void GlobalIndexCloningService::checkIfConflictsWithOtherInstances(
+ OperationContext* opCtx,
+ BSONObj initialStateDoc,
+ const std::vector<const PrimaryOnlyService::Instance*>& existingInstances) {
+ // There are no restrictions on running concurrent global index instances.
+}
+
+GlobalIndexCloningService::CloningStateMachine::CloningStateMachine(
+ ServiceContext* serviceContext,
+ const GlobalIndexCloningService* cloningService,
+ std::unique_ptr<GlobalIndexCloningService::CloningExternalState> externalState,
+ std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> fetcherFactory,
+ GlobalIndexClonerDoc clonerDoc)
+ : _serviceContext(serviceContext),
+ _cloningService(cloningService),
+ _execForCancelableOpCtx(std::make_shared<ThreadPool>([] {
+ ThreadPool::Options options;
+ options.poolName = "GlobalIndexCloningServiceCancelableOpCtxPool";
+ options.minThreads = 0;
+ options.maxThreads = 1;
+ return options;
+ }())),
+ _clonerState(std::move(clonerDoc)),
+ _fetcherFactory(std::move(fetcherFactory)),
+ _externalState(std::move(externalState)) {}
+
+SemiFuture<void> GlobalIndexCloningService::CloningStateMachine::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& stepdownToken) noexcept {
+ auto abortToken = _initAbortSource(stepdownToken);
+ _execForCancelableOpCtx->startup();
+ _retryingCancelableOpCtxFactory.emplace(abortToken, _execForCancelableOpCtx);
+
+ _init(executor);
+
+ return ExecutorFuture(**executor)
+ .then([this, executor, abortToken] { return _persistStateDocument(executor, abortToken); })
+ .then([this, executor, abortToken] { return _runUntilDoneCloning(executor, abortToken); })
+ // TODO: SERVER-68706 wait from coordinator to commit or abort.
+ .onCompletion([this, stepdownToken](const Status& status) {
+ _retryingCancelableOpCtxFactory.emplace(stepdownToken, _execForCancelableOpCtx);
+ return status;
+ })
+ .then([this, executor, stepdownToken] { return _cleanup(executor, stepdownToken); })
+ .thenRunOn(_cloningService->getInstanceCleanupExecutor())
+ .onError([](const Status& status) {
+ LOGV2(
+ 6755903, "Global index cloner encountered an error", "error"_attr = redact(status));
+ return status;
+ })
+ .onCompletion([this, self = shared_from_this()](const Status& status) {
+ if (!_completionPromise.getFuture().isReady()) {
+ if (status.isOK()) {
+ _completionPromise.emplaceValue();
+ } else {
+ _completionPromise.setError(status);
+ }
+ }
+ })
+ .semi();
+}
+
+void GlobalIndexCloningService::CloningStateMachine::interrupt(Status status) {}
+
+void GlobalIndexCloningService::CloningStateMachine::abort() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (_abortSource) {
+ _abortSource->cancel();
+ }
+}
+
+ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_cleanup(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& stepdownToken) {
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor, stepdownToken](auto& cancelableFactory) {
+ return ExecutorFuture(**executor)
+ .then([this, executor, stepdownToken, &cancelableFactory] {
+ auto opCtx = cancelableFactory.makeOperationContext(&cc());
+ _removeStateDocument(opCtx.get());
+ });
+ })
+ .onTransientError([](const auto& status) {})
+ .onUnrecoverableError([](const auto& status) {})
+ .until<Status>([](const Status& status) { return status.isOK(); })
+ .on(**executor, stepdownToken);
+}
+
+void GlobalIndexCloningService::CloningStateMachine::_init(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ _inserter = std::make_unique<GlobalIndexInserter>(_clonerState.getNss(),
+ _clonerState.getIndexName(),
+ _clonerState.getIndexCollectionUUID(),
+ **executor);
+
+ auto client = _serviceContext->makeClient("globalIndexClonerServiceInit");
+ AlternativeClientRegion clientRegion(client);
+
+ auto opCtx = _serviceContext->makeOperationContext(Client::getCurrent());
+
+ auto routingInfo =
+ _externalState->getShardedCollectionRoutingInfo(opCtx.get(), _clonerState.getNss());
+
+ uassert(6755901,
+ str::stream() << "Cannot create global index on unsharded ns "
+ << _clonerState.getNss().ns(),
+ routingInfo.isSharded());
+
+ auto myShardId = _externalState->myShardId(_serviceContext);
+
+ auto indexKeyPattern =
+ _clonerState.getIndexSpec().getObjectField(IndexDescriptor::kKeyPatternFieldName);
+ _fetcher = _fetcherFactory->make(_clonerState.getNss(),
+ _clonerState.getCollectionUUID(),
+ _clonerState.getIndexCollectionUUID(),
+ myShardId,
+ _clonerState.getMinFetchTimestamp(),
+ routingInfo.getShardKeyPattern().getKeyPattern(),
+ indexKeyPattern.getOwned());
+}
+
+ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_runUntilDoneCloning(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken) {
+
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor, cancelToken](auto& cancelableFactory) {
+ return ExecutorFuture(**executor)
+ .then([this, executor, &cancelableFactory] {
+ return _initializeCollections(cancelableFactory);
+ })
+ .then([this, executor, cancelToken, cancelableFactory] {
+ return _clone(executor, cancelToken, cancelableFactory);
+ });
+ })
+ .onTransientError([](const Status& status) {
+
+ })
+ .onUnrecoverableError([](const Status& status) {
+
+ })
+ .until<Status>([](const Status& status) { return status.isOK(); })
+ .on(**executor, cancelToken);
+}
+
+boost::optional<BSONObj> GlobalIndexCloningService::CloningStateMachine::reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode,
+ MongoProcessInterface::CurrentOpSessionsMode) noexcept {
+ // TODO: SERVER-68707
+ return boost::none;
+}
+
+void GlobalIndexCloningService::CloningStateMachine::abort(bool isUserCancelled) {}
+
+void GlobalIndexCloningService::CloningStateMachine::checkIfOptionsConflict(
+ const BSONObj& stateDoc) const {
+ auto newCloning =
+ GlobalIndexClonerDoc::parse(IDLParserContext("globalIndexCloningCheckConflict"), stateDoc);
+ uassert(6755900,
+ str::stream() << "new global index " << stateDoc << " is incompatible with ongoing "
+ << _clonerState.toBSON(),
+ newCloning.getNss() == _clonerState.getNss() &&
+ newCloning.getCollectionUUID() == _clonerState.getCollectionUUID());
+}
+
+CancellationToken GlobalIndexCloningService::CloningStateMachine::_initAbortSource(
+ const CancellationToken& stepdownToken) {
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _abortSource = CancellationSource(stepdownToken);
+ }
+
+ // TODO: SERVER-67563 Handle possible race between _initAbortSource and abort
+
+ return _abortSource->token();
+}
+
+ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_persistStateDocument(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken) {
+ if (_clonerState.getState() > GlobalIndexClonerStateEnum::kUnused) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor](auto& cancelableFactory) {
+ auto opCtx = cancelableFactory.makeOperationContext(Client::getCurrent());
+
+ GlobalIndexClonerDoc newDoc(_clonerState);
+ newDoc.setState(GlobalIndexClonerStateEnum::kCloning);
+ PersistentTaskStore<GlobalIndexClonerDoc> store(_cloningService->getStateDocumentsNS());
+ store.add(opCtx.get(), newDoc, kNoWaitWriteConcern);
+
+ std::swap(_clonerState, newDoc);
+
+ LOGV2(6755904, "Persisted global index state document");
+ })
+ .onTransientError([](const Status& status) {})
+ .onUnrecoverableError([](const Status& status) {})
+ .until<Status>([](const Status& status) { return status.isOK(); })
+ .on(**executor, cancelToken);
+}
+
+void GlobalIndexCloningService::CloningStateMachine::_removeStateDocument(OperationContext* opCtx) {
+ const auto& nss = _cloningService->getStateDocumentsNS();
+ writeConflictRetry(
+ opCtx, "GlobalIndexCloningStateMachine::removeStateDocument", nss.toString(), [&] {
+ AutoGetCollection coll(opCtx, nss, MODE_IX);
+
+ if (!coll) {
+ return;
+ }
+
+ WriteUnitOfWork wuow(opCtx);
+
+ // Set the promise when the delete commits, this is to ensure that any interruption that
+ // happens later won't result in setting an error on the completion promise.
+ opCtx->recoveryUnit()->onCommit([this](boost::optional<Timestamp> unusedCommitTime) {
+ _completionPromise.emplaceValue();
+ });
+
+ deleteObjects(opCtx,
+ *coll,
+ nss,
+ BSON(GlobalIndexClonerDoc::kIndexCollectionUUIDFieldName
+ << _clonerState.getIndexCollectionUUID()),
+ true /* justOne */);
+
+ wuow.commit();
+ });
+}
+
+void GlobalIndexCloningService::CloningStateMachine::_initializeCollections(
+ const CancelableOperationContextFactory& cancelableOpCtxFactory) {
+ auto cancelableOpCtx = cancelableOpCtxFactory.makeOperationContext(Client::getCurrent());
+ auto opCtx = cancelableOpCtx.get();
+
+ resharding::data_copy::ensureCollectionExists(
+ opCtx, skipIdNss(_clonerState.getNss(), _clonerState.getIndexName()), {});
+}
+
+ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_clone(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& cancelToken,
+ const CancelableOperationContextFactory& cancelableOpCtxFactory) {
+ return AsyncTry([this, executor, cancelToken, cancelableOpCtxFactory] {
+ auto cancelableOpCtx =
+ cancelableOpCtxFactory.makeOperationContext(Client::getCurrent());
+ _fetchNextBatch(cancelableOpCtx.get());
+
+ return _processBatch(executor, cancelToken, cancelableOpCtxFactory);
+ })
+ .until([this](const Status& status) { return !status.isOK() || !_hasMoreToFetch; })
+ .on(**executor, cancelToken);
+}
+
+void GlobalIndexCloningService::CloningStateMachine::_fetchNextBatch(OperationContext* opCtx) {
+ if (!_fetchedDocs.empty()) {
+ // There are still documents that haven't not been processed from the previous attempt.
+ return;
+ }
+
+ int totalSize = 0;
+
+ do {
+ if (auto next = _fetcher->getNext(opCtx)) {
+ totalSize += next->indexKeyValues.objsize();
+ _fetchedDocs.push(*next);
+ } else {
+ _hasMoreToFetch = false;
+ }
+ } while (totalSize < gGlobalIndexClonerServiceFetchBatchMaxSizeBytes && _hasMoreToFetch);
+}
+
+ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_processBatch(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& cancelToken,
+ const CancelableOperationContextFactory& cancelableOpCtxFactory) {
+ return AsyncTry([this, &cancelableOpCtxFactory] {
+ if (_fetchedDocs.empty()) {
+ return;
+ }
+
+ const auto& next = _fetchedDocs.front();
+
+ auto cancelableOpCtx =
+ cancelableOpCtxFactory.makeOperationContext(Client::getCurrent());
+ _inserter->processDoc(cancelableOpCtx.get(), next.indexKeyValues, next.documentKey);
+
+ _fetchedDocs.pop();
+ })
+ .until([this](const Status& status) { return !status.isOK() || _fetchedDocs.empty(); })
+ .on(**executor, cancelToken);
+}
+
+void GlobalIndexCloningService::CloningStateMachine::_ensureCollection(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+
+ // Create the destination collection if necessary.
+ writeConflictRetry(opCtx, "CloningStateMachine::_ensureCollection", nss.toString(), [&] {
+ const CollectionPtr coll =
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
+ if (coll) {
+ return;
+ }
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetDb autoDb(opCtx, nss.dbName(), LockMode::MODE_IX);
+ Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
+ auto db = autoDb.ensureDbExists(opCtx);
+
+ CollectionOptions options;
+ db->createCollection(opCtx, nss, options);
+ wuow.commit();
+ });
+}
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.h b/src/mongo/db/s/global_index/global_index_cloning_service.h
new file mode 100644
index 00000000000..6342938e529
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloning_service.h
@@ -0,0 +1,220 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <queue>
+
+#include "mongo/db/cancelable_operation_context.h"
+#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/s/global_index/global_index_cloner_fetcher_factory.h"
+#include "mongo/db/s/global_index/global_index_cloner_gen.h"
+#include "mongo/db/s/global_index/global_index_inserter.h"
+#include "mongo/db/s/resharding/resharding_future_util.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+namespace global_index {
+
+class GlobalIndexCloningService : public repl::PrimaryOnlyService {
+public:
+ static constexpr StringData kServiceName = "GlobalIndexCloningService"_sd;
+
+ explicit GlobalIndexCloningService(ServiceContext* serviceContext);
+
+ class CloningExternalState;
+ class CloningStateMachine;
+
+ StringData getServiceName() const override {
+ return kServiceName;
+ }
+
+ NamespaceString getStateDocumentsNS() const override {
+ return NamespaceString::kGlobalIndexClonerNamespace;
+ }
+
+ ThreadPool::Limits getThreadPoolLimits() const override;
+
+ void checkIfConflictsWithOtherInstances(
+ OperationContext* opCtx,
+ BSONObj initialState,
+ const std::vector<const repl::PrimaryOnlyService::Instance*>& existingInstances) override;
+
+ std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance(
+ BSONObj initialState) override;
+
+private:
+ ServiceContext* const _serviceContext;
+};
+
+/**
+ * Represents the current state of a global index operation on this shard. This class drives state
+ * transitions and updates to underlying on-disk metadata.
+ */
+class GlobalIndexCloningService::CloningStateMachine final
+ : public repl::PrimaryOnlyService::TypedInstance<CloningStateMachine> {
+public:
+ CloningStateMachine(
+ ServiceContext* service,
+ const GlobalIndexCloningService* cloningService,
+ std::unique_ptr<GlobalIndexCloningService::CloningExternalState> externalState,
+ std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> fetcherFactory,
+ GlobalIndexClonerDoc clonerDoc);
+
+ SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept override;
+
+ void interrupt(Status status) override;
+
+ void abort();
+
+ /**
+ * Returns a Future that will be resolved when all work associated with this Instance is done
+ * making forward progress.
+ */
+ SharedSemiFuture<void> getCompletionFuture() const {
+ return _completionPromise.getFuture();
+ }
+
+ boost::optional<BSONObj> reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode,
+ MongoProcessInterface::CurrentOpSessionsMode) noexcept override;
+
+ /**
+ * Initiates the cancellation of the cloning operation.
+ */
+ void abort(bool isUserCancelled);
+
+ void checkIfOptionsConflict(const BSONObj& stateDoc) const final;
+
+private:
+ /**
+ * Initializes the _abortSource and generates a token from it to return back the caller.
+ * Should only be called once per lifetime.
+ */
+ CancellationToken _initAbortSource(const CancellationToken& stepdownToken);
+
+ /**
+ * Initializes the necessary components.
+ */
+ void _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Make sure that the necessary collections are created.
+ */
+ void _initializeCollections(const CancelableOperationContextFactory& cancelableOpCtxFactory);
+
+ /**
+ * Inserts the state document to storage.
+ */
+ ExecutorFuture<void> _persistStateDocument(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& cancelToken);
+
+ /**
+ * Deletes the state document from storage.
+ */
+ void _removeStateDocument(OperationContext* opCtx);
+
+ /**
+ * Performs the entire cloning process.
+ */
+ ExecutorFuture<void> _runUntilDoneCloning(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& stepdownToken);
+
+ /**
+ * Iterates over the documents from the source collection and convert them into global index
+ * entries.
+ */
+ ExecutorFuture<void> _clone(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& cancelToken,
+ const CancelableOperationContextFactory& cancelableOpCtxFactory);
+
+ /**
+ * Removes the side collections created by this cloner.
+ */
+ ExecutorFuture<void> _cleanup(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& stepdownToken);
+
+ /**
+ * Fetches documents from source collection and store them in a queue.
+ */
+ void _fetchNextBatch(OperationContext* opCtx);
+
+ /**
+ * Convert fetched documents to global index entries.
+ */
+ ExecutorFuture<void> _processBatch(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& cancelToken,
+ const CancelableOperationContextFactory& cancelableOpCtxFactory);
+
+ /**
+ * Create collection with the given namespace only if it doesn't exist.
+ */
+ void _ensureCollection(OperationContext* opCtx, const NamespaceString& nss);
+
+ ServiceContext* const _serviceContext;
+
+ // The primary-only service instance corresponding to the cloner instance. Not owned.
+ const GlobalIndexCloningService* const _cloningService;
+
+ // A separate executor different from the one supplied by the primary only service is needed
+ // because the one from POS can be shut down during step down. This will ensure that the
+ // operation context created from the cancelableOpCtxFactory can be interrupted when the cancel
+ // token is aborted during step down.
+ const std::shared_ptr<ThreadPool> _execForCancelableOpCtx;
+
+ boost::optional<resharding::RetryingCancelableOperationContextFactory>
+ _retryingCancelableOpCtxFactory;
+
+ Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningStateMachine::_mutex");
+
+ GlobalIndexClonerDoc _clonerState;
+
+ // Canceled when there is an unrecoverable error or stepdown.
+ boost::optional<CancellationSource> _abortSource;
+
+ std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> _fetcherFactory;
+ std::unique_ptr<GlobalIndexClonerFetcherInterface> _fetcher;
+ std::unique_ptr<GlobalIndexInserter> _inserter;
+
+ // Keeps track if there is still a posibility that we still have documents that needs to be
+ // fetched from the source collection.
+ bool _hasMoreToFetch{true};
+
+ std::queue<GlobalIndexClonerFetcher::FetchedEntry> _fetchedDocs;
+
+ SharedPromise<void> _completionPromise;
+ const std::unique_ptr<CloningExternalState> _externalState;
+};
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp
new file mode 100644
index 00000000000..187271df7cf
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp
@@ -0,0 +1,484 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/commands/list_collections_filter.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/op_observer/op_observer_noop.h"
+#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h"
+#include "mongo/db/repl/database_cloner_gen.h"
+#include "mongo/db/repl/primary_only_service_test_fixture.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/db/s/global_index/global_index_cloning_external_state.h"
+#include "mongo/db/s/global_index/global_index_cloning_service.h"
+#include "mongo/db/s/global_index/global_index_util.h"
+#include "mongo/db/session/logical_session_cache_noop.h"
+#include "mongo/db/session/session_catalog_mongod.h"
+#include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+namespace mongo {
+namespace global_index {
+namespace {
+
+const ShardId kRecipientShardId{"myShardId"};
+const NamespaceString kSourceNss{"sourcedb", "sourcecollection"};
+constexpr auto kSourceShardKey = "key"_sd;
+
+class GlobalIndexExternalStateForTest : public GlobalIndexCloningService::CloningExternalState {
+public:
+ ShardId myShardId(ServiceContext* serviceContext) const override {
+ return kRecipientShardId;
+ }
+
+ ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) const override {
+ invariant(nss == kSourceNss);
+
+ const OID epoch = OID::gen();
+ std::vector<ChunkType> chunks = {
+ ChunkType{_sourceUUID,
+ ChunkRange{BSON(kSourceShardKey << MINKEY), BSON(kSourceShardKey << MAXKEY)},
+ ChunkVersion({epoch, Timestamp(1, 1)}, {100, 0}),
+ _someDonorId}};
+
+ auto rt = RoutingTableHistory::makeNew(kSourceNss,
+ _sourceUUID,
+ BSON(kSourceShardKey << 1),
+ nullptr /* defaultCollator */,
+ false /* unique */,
+ std::move(epoch),
+ Timestamp(1, 1),
+ boost::none /* timeseriesFields */,
+ boost::none /* reshardingFields */,
+ boost::none /* chunkSizeBytes */,
+ true /* allowMigrations */,
+ chunks);
+
+ return ChunkManager(_someDonorId,
+ DatabaseVersion(UUID::gen(), Timestamp(1, 1)),
+ _makeStandaloneRoutingTableHistory(std::move(rt)),
+ boost::none /* clusterTime */);
+ }
+
+private:
+ RoutingTableHistoryValueHandle _makeStandaloneRoutingTableHistory(
+ RoutingTableHistory rt) const {
+ const auto version = rt.getVersion();
+ return RoutingTableHistoryValueHandle(
+ std::make_shared<RoutingTableHistory>(std::move(rt)),
+ ComparableChunkVersion::makeComparableChunkVersion(version));
+ }
+
+ const UUID _sourceUUID{UUID::gen()};
+ const ShardId _someDonorId{"otherShardId"};
+};
+
+class MockGlobalIndexClonerFetcher : public GlobalIndexClonerFetcherInterface {
+public:
+ void setResultList(std::list<FetchedEntry> newResults) {
+ _docs = std::move(newResults);
+ }
+
+ boost::optional<FetchedEntry> getNext(OperationContext* opCtx) override {
+ if (_docs.empty()) {
+ return boost::none;
+ }
+
+ auto ret = _docs.front();
+ _docs.pop_front();
+ return ret;
+ }
+
+private:
+ std::list<FetchedEntry> _docs;
+};
+
+class GlobalIndexCloningFetcherFactoryForTest : public GlobalIndexClonerFetcherFactoryInterface {
+public:
+ explicit GlobalIndexCloningFetcherFactoryForTest(MockGlobalIndexClonerFetcher* mockFetcher)
+ : _mockFetcher(mockFetcher) {}
+
+ std::unique_ptr<GlobalIndexClonerFetcherInterface> make(
+ NamespaceString nss,
+ UUID collUUId,
+ UUID indexUUID,
+ ShardId myShardId,
+ Timestamp minFetchTimestamp,
+ KeyPattern sourceShardKeyPattern,
+ KeyPattern globalIndexPattern) override {
+ return std::make_unique<MockGlobalIndexClonerFetcher>(*_mockFetcher);
+ }
+
+private:
+ MockGlobalIndexClonerFetcher* _mockFetcher;
+};
+
+class GlobalIndexCloningServiceForTest : public GlobalIndexCloningService {
+public:
+ explicit GlobalIndexCloningServiceForTest(ServiceContext* serviceContext,
+ MockGlobalIndexClonerFetcher* mockFetcher)
+ : GlobalIndexCloningService(serviceContext),
+ _serviceContext(serviceContext),
+ _mockFetcher(mockFetcher) {}
+
+ std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance(
+ BSONObj initialState) override {
+ return std::make_shared<GlobalIndexCloningService::CloningStateMachine>(
+ _serviceContext,
+ this,
+ std::make_unique<GlobalIndexExternalStateForTest>(),
+ std::make_unique<GlobalIndexCloningFetcherFactoryForTest>(_mockFetcher),
+ GlobalIndexClonerDoc::parse(IDLParserContext{"GlobalIndexCloningServiceForTest"},
+ initialState));
+ }
+
+private:
+ ServiceContext* _serviceContext;
+ MockGlobalIndexClonerFetcher* _mockFetcher;
+};
+
+class Blocker {
+public:
+ ~Blocker() {
+ stdx::unique_lock lk(_mutex);
+ _shouldBlock = false;
+ _cvBlocked.notify_all();
+ }
+
+ void blockIfActivated(OperationContext* opCtx) {
+ stdx::unique_lock lk(_mutex);
+ _blockedOnce = true;
+ opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return !_shouldBlock; });
+ }
+
+ void waitUntilBlockedOccurred(OperationContext* opCtx) {
+ stdx::unique_lock lk(_mutex);
+ opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return _blockedOnce; });
+ }
+
+ void block() {
+ stdx::unique_lock lk(_mutex);
+ _shouldBlock = true;
+ }
+
+ void unblock() {
+ stdx::unique_lock lk(_mutex);
+ _shouldBlock = false;
+ }
+
+private:
+ Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningServiceTestBlocker::_mutex");
+ stdx::condition_variable _cvBlocked;
+ bool _shouldBlock{false};
+ bool _blockedOnce{false};
+};
+
+class OpObserverForTest : public OpObserverNoop {
+public:
+ OpObserverForTest(Blocker* insertBlocker, Blocker* deleteBlocker)
+ : _insertBlocker(insertBlocker), _deleteBlocker(deleteBlocker) {}
+
+ void onInserts(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) override {
+ if (NamespaceString::kGlobalIndexClonerNamespace == coll->ns()) {
+ _insertBlocker->blockIfActivated(opCtx);
+ }
+ }
+
+ void onDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ StmtId stmtId,
+ const OplogDeleteEntryArgs& args) override {
+ if (NamespaceString::kGlobalIndexClonerNamespace == nss) {
+ _deleteBlocker->blockIfActivated(opCtx);
+ }
+ }
+
+private:
+ Blocker* _insertBlocker;
+ Blocker* _deleteBlocker;
+};
+
+GlobalIndexCloningService::InstanceID extractInstanceId(const GlobalIndexClonerDoc& doc) {
+ return BSON("_id" << doc.getIndexCollectionUUID());
+}
+
+using GlobalIndexStateMachine = GlobalIndexCloningServiceForTest::CloningStateMachine;
+
+class GlobalIndexClonerServiceTest : public repl::PrimaryOnlyServiceMongoDTest {
+public:
+ std::unique_ptr<repl::PrimaryOnlyService> makeService(ServiceContext* serviceContext) override {
+ return std::make_unique<GlobalIndexCloningServiceForTest>(serviceContext, &_mockFetcher);
+ }
+
+ void setUp() override {
+ repl::PrimaryOnlyServiceMongoDTest::setUp();
+
+ auto serviceContext = getServiceContext();
+ auto storageMock = std::make_unique<repl::StorageInterfaceMock>();
+ repl::StorageInterface::set(serviceContext, std::move(storageMock));
+
+ // The ReadWriteConcernDefaults decoration on the service context won't always be created,
+ // so we should manually instantiate it to ensure it exists in our tests.
+ ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
+
+ _opObserverRegistry->addObserver(
+ std::make_unique<OpObserverForTest>(&_stateDocInsertBlocker, &_stateDocDeleteBlocker));
+
+ // Create config.transactions collection
+ auto opCtx = serviceContext->makeOperationContext(Client::getCurrent());
+ DBDirectClient client(opCtx.get());
+ client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
+
+ MongoDSessionCatalog::set(
+ getServiceContext(),
+ std::make_unique<MongoDSessionCatalog>(
+ std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>()));
+
+ // Session cache is needed otherwise client session info will ignored.
+ LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
+
+ std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> fetcherResults;
+ fetcherResults.push_front(
+ {BSON("_id" << 10 << kSourceShardKey << 20), BSON(_indexKey << 30)});
+ replaceFetcherResultList(std::move(fetcherResults));
+
+ const auto& indexNs = globalIndexNss(kSourceNss, _indexName);
+ client.createCollection(indexNs.ns());
+ auto all = client.getCollectionInfos(indexNs.db().toString(),
+ BSON("name" << indexNs.coll().toString()));
+
+ ASSERT_EQ(1, all.size());
+ _indexCollectionUUID.emplace(uassertStatusOK(UUID::parse(all.front()["info"]["uuid"])));
+ }
+
+ void checkIndexCollection(OperationContext* opCtx) {
+ DBDirectClient client(opCtx);
+
+ MockGlobalIndexClonerFetcher fetcherCopy(_fetcherCopyForVerification);
+ while (auto next = fetcherCopy.getNext(opCtx)) {
+ FindCommandRequest query(globalIndexNss(kSourceNss, _indexName));
+ query.setFilter(BSON("_id" << next->indexKeyValues));
+
+ auto doc = client.findOne(query);
+ ASSERT_TRUE(!doc.isEmpty())
+ << "doc with index key: " << next->indexKeyValues
+ << " missing in global index output collection: " << dumpOutputColl(opCtx);
+ }
+ }
+
+ GlobalIndexClonerDoc makeStateDocument() {
+ return GlobalIndexClonerDoc(*_indexCollectionUUID,
+ kSourceNss,
+ _collectionUUID,
+ _indexName,
+ _indexSpec,
+ {},
+ GlobalIndexClonerStateEnum::kUnused);
+ }
+
+ bool doesCollectionExist(OperationContext* opCtx, const NamespaceString& nss) {
+ DBDirectClient client(opCtx);
+ auto collectionInfos = client.getCollectionInfos(
+ nss.db().toString(), ListCollectionsFilter::makeTypeCollectionFilter());
+
+ for (auto&& info : collectionInfos) {
+ auto coll =
+ repl::ListCollectionResult::parse(IDLParserContext("doesCollectionExist"), info);
+
+ if (coll.getName() == nss.coll()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ Blocker* getStateDocInsertBlocker() {
+ return &_stateDocInsertBlocker;
+ }
+
+ Blocker* getStateDocDeleteBlocker() {
+ return &_stateDocDeleteBlocker;
+ }
+
+ void replaceFetcherResultList(
+ std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> newResults) {
+ _mockFetcher.setResultList(std::move(newResults));
+ _fetcherCopyForVerification = _mockFetcher;
+ }
+
+ StringData indexKey() const {
+ return _indexKey;
+ }
+
+private:
+ std::string dumpOutputColl(OperationContext* opCtx) {
+ DBDirectClient client(opCtx);
+ FindCommandRequest query(globalIndexNss(kSourceNss, _indexName));
+
+ std::ostringstream outputStr;
+ auto res = client.find(query);
+
+ if (!res || !res->more()) {
+ return "<empty>";
+ }
+
+ outputStr << "docs: " << std::endl;
+ while (res->more()) {
+ auto doc = res->next();
+ outputStr << doc.toString() << std::endl;
+ }
+
+ return outputStr.str();
+ }
+
+ boost::optional<UUID> _indexCollectionUUID;
+ const UUID _collectionUUID{UUID::gen()};
+ const std::string _indexName{"global_x_1"};
+ const StringData _indexKey{"x"};
+ const BSONObj _indexSpec{BSON("key" << BSON(_indexKey << 1) << "unique" << true)};
+
+ ReadWriteConcernDefaultsLookupMock _lookupMock;
+ Blocker _stateDocInsertBlocker;
+ Blocker _stateDocDeleteBlocker;
+
+ MockGlobalIndexClonerFetcher _mockFetcher;
+ MockGlobalIndexClonerFetcher _fetcherCopyForVerification;
+};
+
+TEST_F(GlobalIndexClonerServiceTest, CloneInsertsToGlobalIndexCollection) {
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ auto future = cloner->getCompletionFuture();
+ future.get();
+
+ ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
+ checkIndexCollection(rawOpCtx);
+}
+
+TEST_F(GlobalIndexClonerServiceTest, ShouldBeSafeToRetryOnStepDown) {
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+
+ auto stateDocInsertBlocker = getStateDocInsertBlocker();
+ stateDocInsertBlocker->block();
+ auto stateDocDeleteBlocker = getStateDocDeleteBlocker();
+ stateDocDeleteBlocker->block();
+
+ {
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ stateDocInsertBlocker->waitUntilBlockedOccurred(rawOpCtx);
+ stepDown();
+
+ ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException);
+ }
+
+ stepUp(rawOpCtx);
+
+ {
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ stateDocInsertBlocker->unblock();
+ stateDocDeleteBlocker->waitUntilBlockedOccurred(rawOpCtx);
+ stepDown();
+
+ ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException);
+ }
+
+ stepUp(rawOpCtx);
+
+ // It is possible for the primary only service to run to completion and no longer exists.
+ {
+ auto cloner = GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc));
+ stateDocDeleteBlocker->unblock();
+ (*cloner)->getCompletionFuture().get();
+ }
+
+ checkIndexCollection(rawOpCtx);
+}
+
+TEST_F(GlobalIndexClonerServiceTest, ShouldBeAbleToConsumeMultipleBatchesWorthofDocs) {
+ std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> fetcherResults;
+
+ RAIIServerParameterControllerForTest batchSizeForTest(
+ "globalIndexClonerServiceFetchBatchMaxSizeBytes", 50);
+ std::string padding(50, 'x');
+
+ // Populate enough to have more than one batch worth of documents.
+ for (int x = 0; x < 4; x++) {
+ fetcherResults.push_front({BSON("_id" << x << kSourceShardKey << x),
+ BSON(indexKey() << (std::to_string(x) + padding))});
+ }
+ replaceFetcherResultList(std::move(fetcherResults));
+
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ auto future = cloner->getCompletionFuture();
+ future.get();
+
+ ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
+ checkIndexCollection(rawOpCtx);
+}
+
+TEST_F(GlobalIndexClonerServiceTest, ShouldWorkWithEmptyCollection) {
+ replaceFetcherResultList({});
+
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ auto future = cloner->getCompletionFuture();
+ future.get();
+
+ ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
+ checkIndexCollection(rawOpCtx);
+}
+
+} // namespace
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp
index eff9cd7073e..ac1a82a8cf4 100644
--- a/src/mongo/db/s/global_index/global_index_inserter.cpp
+++ b/src/mongo/db/s/global_index/global_index_inserter.cpp
@@ -32,6 +32,7 @@
#include <fmt/format.h>
#include "mongo/db/s/global_index/global_index_entry_gen.h"
+#include "mongo/db/s/global_index/global_index_util.h"
#include "mongo/db/transaction/transaction_api.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@@ -56,12 +57,11 @@ GlobalIndexInserter::GlobalIndexInserter(NamespaceString nss,
_executor(std::move(executor)) {}
NamespaceString GlobalIndexInserter::_skipIdNss() {
- return NamespaceString(NamespaceString::kConfigDb,
- "{}.globalIndex.{}.skipList"_format(_nss.coll(), _indexName));
+ return skipIdNss(_nss, _indexName);
}
NamespaceString GlobalIndexInserter::_globalIndexNss() {
- return NamespaceString(_nss.db(), "{}.globalIndex.{}"_format(_nss.coll(), _indexName));
+ return globalIndexNss(_nss, _indexName);
}
void GlobalIndexInserter::processDoc(OperationContext* opCtx,
diff --git a/src/mongo/db/s/global_index/global_index_inserter_test.cpp b/src/mongo/db/s/global_index/global_index_inserter_test.cpp
index 2b38bd9d674..51cff39ec44 100644
--- a/src/mongo/db/s/global_index/global_index_inserter_test.cpp
+++ b/src/mongo/db/s/global_index/global_index_inserter_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/global_index/global_index_entry_gen.h"
+#include "mongo/db/s/global_index/global_index_util.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/db/s/transaction_coordinator_service.h"
#include "mongo/db/session/logical_session_cache_noop.h"
@@ -102,8 +103,7 @@ public:
}
NamespaceString skipIdNss() const {
- return NamespaceString(NamespaceString::kConfigDb,
- "{}.globalIndex.{}.skipList"_format(_nss.coll(), _indexName));
+ return global_index::skipIdNss(_nss, _indexName);
}
NamespaceString globalIndexNss() const {
diff --git a/src/mongo/db/s/global_index/global_index_server_parameters.idl b/src/mongo/db/s/global_index/global_index_server_parameters.idl
new file mode 100644
index 00000000000..56a0ca88998
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_server_parameters.idl
@@ -0,0 +1,55 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# Server parameters for global index.
+
+global:
+ cpp_namespace: "mongo::global_index"
+
+imports:
+ - "mongo/db/basic_types.idl"
+
+server_parameters:
+ globalIndexClonerServiceMaxThreadCount:
+ description: The max number of threads in the global index cloner's thread pool.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: gGlobalIndexClonerServiceMaxThreadCount
+ default: 8
+ validator:
+ gte: 1
+ lte: 256
+
+ globalIndexClonerServiceFetchBatchMaxSizeBytes:
+ description: The max number of threads in the global index cloner's thread pool.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: gGlobalIndexClonerServiceFetchBatchMaxSizeBytes
+ default: 102400
+ validator:
+ gte: 1
diff --git a/src/mongo/db/s/global_index/global_index_util.cpp b/src/mongo/db/s/global_index/global_index_util.cpp
new file mode 100644
index 00000000000..306b02ba071
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_util.cpp
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/global_index/global_index_util.h"
+
+#include <fmt/format.h>
+
+using namespace fmt::literals;
+
+namespace mongo {
+namespace global_index {
+
+NamespaceString skipIdNss(const NamespaceString& nss, StringData indexName) {
+ return NamespaceString(nss.db(),
+ "{}.globalIndex.{}.skipList"_format(nss.coll(), indexName.toString()));
+}
+
+NamespaceString globalIndexNss(const NamespaceString& nss, StringData indexName) {
+ return NamespaceString(nss.db(), "{}.globalIndex.{}"_format(nss.coll(), indexName));
+}
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_util.h b/src/mongo/db/s/global_index/global_index_util.h
new file mode 100644
index 00000000000..f3fbc62b000
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_util.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+namespace global_index {
+
+/**
+ * Returns the namespace used by global indexes to store ids of documents that has already been
+ * inserted to the global index collection.
+ */
+NamespaceString skipIdNss(const NamespaceString& nss, StringData indexName);
+
+/**
+ * Returns the namespace of global index collection. This should be removed once the global index
+ * catalog api is available.
+ */
+NamespaceString globalIndexNss(const NamespaceString& nss, StringData indexName);
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_future_util.h b/src/mongo/db/s/resharding/resharding_future_util.h
index 0bb858a963c..005d8fceba5 100644
--- a/src/mongo/db/s/resharding/resharding_future_util.h
+++ b/src/mongo/db/s/resharding/resharding_future_util.h
@@ -183,7 +183,6 @@ public:
return resharding::WithAutomaticRetry([this, body]() { return body(_factory); });
}
-
private:
const CancelableOperationContextFactory _factory;
};