summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2022-07-14 13:45:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-14 14:37:30 +0000
commit00d0a908862771c4ded8d0492bfe3793affe6c10 (patch)
treea3a4c94d80869975856a93523193780f3e87b930 /src
parent5ecdd383565058346f1ade42b438c190b7f13ad2 (diff)
downloadmongo-00d0a908862771c4ded8d0492bfe3793affe6c10.tar.gz
SERVER-67556 Implement global index cloner
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/global_index/global_index_entry.idl48
-rw-r--r--src/mongo/db/s/global_index/global_index_inserter.cpp120
-rw-r--r--src/mongo/db/s/global_index/global_index_inserter.h70
-rw-r--r--src/mongo/db/s/global_index/global_index_inserter_test.cpp232
-rw-r--r--src/mongo/db/transaction_api.h3
-rw-r--r--src/mongo/logv2/log_component.h1
7 files changed, 476 insertions, 1 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index f98385fc858..7ecb530d088 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -57,6 +57,8 @@ env.Library(
'collection_sharding_state_factory_shard.cpp',
'commit_chunk_migration.idl',
'config_server_op_observer.cpp',
+ 'global_index/global_index_entry.idl',
+ 'global_index/global_index_inserter.cpp',
'global_index_metrics.cpp',
'metadata_manager.cpp',
'migration_chunk_cloner_source_legacy.cpp',
@@ -561,6 +563,7 @@ env.CppUnitTest(
'dist_lock_catalog_mock.cpp',
'dist_lock_catalog_replset_test.cpp',
'dist_lock_manager_replset_test.cpp',
+ 'global_index/global_index_inserter_test.cpp',
'global_index_metrics_test.cpp',
'implicit_collection_creation_test.cpp',
'metadata_manager_test.cpp',
diff --git a/src/mongo/db/s/global_index/global_index_entry.idl b/src/mongo/db/s/global_index/global_index_entry.idl
new file mode 100644
index 00000000000..4c09dbad7b2
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_entry.idl
@@ -0,0 +1,48 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This file contains the schema for the document stored in the global index collection.
+
+global:
+ cpp_namespace: "mongo::global_index"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ GlobalIndexEntry:
+ description: "The document describing the index entry for global indexes."
+ strict: false
+ fields:
+ _id:
+ type: object
+ description: "The index representation of the original document."
+ cpp_name: "indexKeyValue"
+ documentKey:
+ type: object
+ description: "The union of _id and shard key values of the original document."
diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp
new file mode 100644
index 00000000000..a6fdff72993
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_inserter.cpp
@@ -0,0 +1,120 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/global_index/global_index_inserter.h"
+
+#include <fmt/format.h>
+
+#include "mongo/db/s/global_index/global_index_entry_gen.h"
+#include "mongo/db/transaction_api.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/fail_point.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kGlobalIndex
+
+using namespace fmt::literals;
+
+namespace mongo {
+namespace global_index {
+namespace {
+MONGO_FAIL_POINT_DEFINE(globalIndexInserterPauseAfterReadingSkipCollection);
+} // namespace
+
+GlobalIndexInserter::GlobalIndexInserter(NamespaceString nss,
+ StringData indexName,
+ UUID indexUUID,
+ std::shared_ptr<executor::TaskExecutor> executor)
+ : _nss(std::move(nss)),
+ _indexName(indexName.toString()),
+ _indexUUID(std::move(indexUUID)),
+ _executor(std::move(executor)) {}
+
+NamespaceString GlobalIndexInserter::_skipIdNss() {
+ return NamespaceString(NamespaceString::kConfigDb,
+ "{}.globalIndex.{}.skipList"_format(_nss.coll(), _indexName));
+}
+
+NamespaceString GlobalIndexInserter::_globalIndexNss() {
+ return NamespaceString(_nss.db(), "{}.globalIndex.{}"_format(_nss.coll(), _indexName));
+}
+
+void GlobalIndexInserter::processDoc(OperationContext* opCtx,
+ const BSONObj& indexKeyValues,
+ const BSONObj& documentKey) {
+ auto insertToGlobalIndexFn = [this,
+ service = opCtx->getServiceContext(),
+ indexKeyValues,
+ documentKey](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
+ FindCommandRequest skipIdQuery(_skipIdNss());
+ skipIdQuery.setFilter(BSON("_id" << documentKey));
+ skipIdQuery.setLimit(1);
+
+ return txnClient.exhaustiveFind(skipIdQuery)
+ .thenRunOn(txnExec)
+ .then([this, service, indexKeyValues, documentKey, &txnClient, txnExec](
+ const auto& skipIdDocResults) {
+ auto client = service->makeClient("globalIndexInserter");
+ auto opCtx = service->makeOperationContext(client.get());
+
+ {
+ stdx::lock_guard<Client> lk(*client);
+ client->setSystemOperationKillableByStepdown(lk);
+ }
+
+ globalIndexInserterPauseAfterReadingSkipCollection.pauseWhileSet(opCtx.get());
+
+ if (!skipIdDocResults.empty()) {
+ return SemiFuture<void>::makeReady();
+ }
+
+ write_ops::InsertCommandRequest globalIndexEntryInsert(_globalIndexNss());
+ globalIndexEntryInsert.getWriteCommandRequestBase().setCollectionUUID(_indexUUID);
+ GlobalIndexEntry indexEntry(indexKeyValues, documentKey);
+ globalIndexEntryInsert.setDocuments({indexEntry.toBSON()});
+
+ return txnClient.runCRUDOp({globalIndexEntryInsert}, {})
+ .thenRunOn(txnExec)
+ .then([this, documentKey, &txnClient](const auto& commandResponse) {
+ write_ops::InsertCommandRequest skipIdInsert(_skipIdNss());
+
+ skipIdInsert.setDocuments({BSON("_id" << documentKey)});
+ return txnClient.runCRUDOp({skipIdInsert}, {}).ignoreValue();
+ })
+ .semi();
+ })
+ .semi();
+ };
+
+ txn_api::SyncTransactionWithRetries txn(opCtx, _executor, nullptr);
+ txn.run(opCtx, insertToGlobalIndexFn);
+}
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_inserter.h b/src/mongo/db/s/global_index/global_index_inserter.h
new file mode 100644
index 00000000000..f818715532c
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_inserter.h
@@ -0,0 +1,70 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/util/uuid.h"
+
+namespace mongo {
+namespace global_index {
+
+/**
+ * Handles the insertion of index entries to the global index collection during the cloning phase.
+ */
+class GlobalIndexInserter {
+public:
+ GlobalIndexInserter(NamespaceString nss,
+ StringData indexName,
+ UUID indexUUID,
+ std::shared_ptr<executor::TaskExecutor> executor);
+
+ /**
+ * Performs the necessary checks and bookkeeping on inserting a new global index entry under a
+ * transaction.
+ */
+ void processDoc(OperationContext* opCtx,
+ const BSONObj& indexKeyValues,
+ const BSONObj& documentKey);
+
+private:
+ NamespaceString _skipIdNss();
+ NamespaceString _globalIndexNss();
+
+ const NamespaceString _nss;
+ const std::string _indexName;
+ const UUID _indexUUID;
+
+ std::shared_ptr<executor::TaskExecutor> _executor;
+};
+
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_inserter_test.cpp b/src/mongo/db/s/global_index/global_index_inserter_test.cpp
new file mode 100644
index 00000000000..19dfb69bdf9
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_inserter_test.cpp
@@ -0,0 +1,232 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/global_index/global_index_inserter.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_cache_noop.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/s/global_index/global_index_entry_gen.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/s/transaction_coordinator_service.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/fail_point.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+namespace mongo {
+namespace global_index {
+namespace {
+
+class GlobalIndexInserterTest : public ShardServerTestFixture {
+public:
+ void setUp() override {
+ ShardServerTestFixture::setUp();
+
+ // Create config.transactions collection
+ auto opCtx = operationContext();
+ DBDirectClient client(opCtx);
+ client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
+
+ LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
+
+ // Note: needed to initialize txn coordinator because first thing commit command does is
+ // cancelling coordinator if in a sharded environment.
+ TransactionCoordinatorService::get(operationContext())
+ ->onShardingInitialization(operationContext(), true);
+
+ // Use our own executor since the executor from the fixture is using NetworkInterfaceMock
+ // that uses a ClockSourceMock. This means that tasks that are scheduled to be run in the
+ // future will not run unless the clock is advanced manually.
+ _executor = makeTaskExecutorForCloner();
+
+ const auto& indexNs = globalIndexNss();
+ client.createCollection(indexNs.ns());
+ auto all = client.getCollectionInfos(indexNs.db().toString(),
+ BSON("name" << indexNs.coll().toString()));
+
+ ASSERT_EQ(1, all.size());
+ _indexUUID.emplace(uassertStatusOK(UUID::parse(all.front()["info"]["uuid"])));
+
+ client.createCollection(skipIdNss().ns());
+ }
+
+ void tearDown() override {
+ TransactionCoordinatorService::get(operationContext())->onStepDown();
+ ShardServerTestFixture::tearDown();
+ }
+
+ const NamespaceString& nss() const {
+ return _nss;
+ }
+
+ const std::string& indexName() const {
+ return _indexName;
+ }
+
+ const UUID& indexUUID() const {
+ return *_indexUUID;
+ }
+
+ NamespaceString skipIdNss() const {
+ return NamespaceString(NamespaceString::kConfigDb,
+ "{}.globalIndex.{}.skipList"_format(_nss.coll(), _indexName));
+ }
+
+ NamespaceString globalIndexNss() const {
+ return NamespaceString(_nss.db(), "{}.globalIndex.{}"_format(_nss.coll(), _indexName));
+ }
+
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> getExecutor() {
+ return _executor;
+ }
+
+private:
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutorForCloner() {
+ ThreadPool::Options threadPoolOptions;
+ threadPoolOptions.maxThreads = 1;
+ threadPoolOptions.threadNamePrefix = "TestGlobalIndexCloner-";
+ threadPoolOptions.poolName = "TestGlobalIndexClonerThreadPool";
+
+ auto executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(std::move(threadPoolOptions)),
+ executor::makeNetworkInterface("TestGlobalIndexClonerNetwork", nullptr, nullptr));
+ executor->startup();
+
+ return executor;
+ }
+
+ const NamespaceString _nss{"test", "user"};
+ const std::string _indexName{"global_x"};
+
+ boost::optional<UUID> _indexUUID;
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
+};
+
+TEST_F(GlobalIndexInserterTest, ClonerUpdatesIndexEntryAndSkipIdCollection) {
+ GlobalIndexInserter cloner(nss(), indexName(), indexUUID(), getExecutor());
+
+ const auto indexKeyValues = BSON("x" << 34);
+ const auto documentKey = BSON("_id" << 12 << "x" << 34);
+ cloner.processDoc(operationContext(), indexKeyValues, documentKey);
+
+ DBDirectClient client(operationContext());
+
+ FindCommandRequest indexEntryQuery(globalIndexNss());
+ auto indexEntryDoc = client.findOne(indexEntryQuery);
+ ASSERT_BSONOBJ_EQ(BSON(GlobalIndexEntry::kIndexKeyValueFieldName
+ << indexKeyValues << GlobalIndexEntry::kDocumentKeyFieldName
+ << documentKey),
+ indexEntryDoc);
+
+ FindCommandRequest skipIdQuery(skipIdNss());
+ auto skipIdDoc = client.findOne(skipIdQuery);
+ ASSERT_BSONOBJ_EQ(BSON("_id" << documentKey), skipIdDoc);
+}
+
+TEST_F(GlobalIndexInserterTest, ClonerSkipsDocumentIfInSkipCollection) {
+ GlobalIndexInserter cloner(nss(), indexName(), indexUUID(), getExecutor());
+
+ const auto indexKeyValues = BSON("x" << 34);
+ const auto documentKey = BSON("_id" << 12 << "x" << 34);
+
+ DBDirectClient client(operationContext());
+ write_ops::InsertCommandRequest skipIdInsert(skipIdNss());
+ skipIdInsert.setDocuments({BSON("_id" << documentKey)});
+ client.insert(skipIdInsert);
+
+ cloner.processDoc(operationContext(), indexKeyValues, documentKey);
+
+ FindCommandRequest indexEntryQuery(globalIndexNss());
+ auto indexEntryDoc = client.findOne(indexEntryQuery);
+ ASSERT_TRUE(indexEntryDoc.isEmpty());
+}
+
+TEST_F(GlobalIndexInserterTest, ClonerRetriesWhenItEncountersWCE) {
+ GlobalIndexInserter cloner(nss(), indexName(), indexUUID(), getExecutor());
+
+ DBDirectClient client(operationContext());
+
+ auto clonerThread = ([&] {
+ FailPointEnableBlock fp("globalIndexInserterPauseAfterReadingSkipCollection");
+
+ const auto indexKeyValues = BSON("x" << 34);
+ const auto documentKey = BSON("_id" << 12 << "x" << 34);
+
+ auto future = stdx::async(stdx::launch::async, [&] {
+ cloner.processDoc(operationContext(), indexKeyValues, documentKey);
+ });
+
+ fp->waitForTimesEntered(1);
+
+ write_ops::InsertCommandRequest skipIdInsert(skipIdNss());
+ GlobalIndexEntry indexEntry(indexKeyValues, documentKey);
+ skipIdInsert.setDocuments({BSON("_id" << documentKey)});
+ client.insert(skipIdInsert);
+
+ return future;
+ })();
+
+ clonerThread.get();
+
+ FindCommandRequest indexEntryQuery(globalIndexNss());
+ auto indexEntryDoc = client.findOne(indexEntryQuery);
+ ASSERT_TRUE(indexEntryDoc.isEmpty());
+}
+
+// TODO: SERVER-67820 Enable after bug is fixed.
+#if 0
+TEST_F(GlobalIndexInserterTest, ClonerThrowsIfIndexEntryAlreadyExists) {
+ GlobalIndexInserter cloner(ns(), indexName(), indexUUID(), getExecutor());
+
+ const auto indexKeyValues = BSON("x" << 34);
+ const auto documentKey = BSON("_id" << 12 << "x" << 34);
+
+ DBDirectClient client(operationContext());
+ write_ops::InsertCommandRequest globalIndexInsert(globalIndexNs());
+ GlobalIndexEntry indexEntry(indexKeyValues, documentKey);
+ globalIndexInsert.setDocuments({indexEntry.toBSON()});
+ client.insert(globalIndexInsert);
+
+ ASSERT_THROWS_CODE(cloner.processDoc(operationContext(), indexKeyValues, documentKey),
+ DBException,
+ ErrorCodes::DuplicateKey);
+}
+#endif
+
+} // namespace
+} // namespace global_index
+} // namespace mongo
diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h
index 830ec9a7b2f..9a04c121eba 100644
--- a/src/mongo/db/transaction_api.h
+++ b/src/mongo/db/transaction_api.h
@@ -106,7 +106,8 @@ public:
* The given stmtIds are included in the sent command. If the API's transaction was spawned on
* behalf of a retryable write, the statement ids must be unique for each write in the
* transaction as the underlying servers will save history for each id the same as for a
- * retryable write. A write can opt out of this by sending a -1 statement id, which is ignored.
+ * retryable write. A write can opt out of this by sending a -1 statement id or an empty vector,
+ * which is ignored.
*
* If a sent statement id had already been seen for this transaction, the write with that id
* won't apply a second time and instead returns its response from its original execution. That
diff --git a/src/mongo/logv2/log_component.h b/src/mongo/logv2/log_component.h
index 1a492521738..65647b00771 100644
--- a/src/mongo/logv2/log_component.h
+++ b/src/mongo/logv2/log_component.h
@@ -57,6 +57,7 @@ namespace mongo::logv2 {
X(kControl, , "control" , "CONTROL" , kDefault) \
X(kExecutor, , "executor" , "EXECUTOR", kDefault) \
X(kGeo, , "geo" , "GEO" , kDefault) \
+ X(kGlobalIndex, , "globalIndex" , "GBL_IDX" , kDefault) \
X(kIndex, , "index" , "INDEX" , kDefault) \
X(kNetwork, , "network" , "NETWORK" , kDefault) \
X(kProcessHealth, , "processHealth" , "HEALTH" , kDefault) \