diff options
author | Randolph Tan <randolph@10gen.com> | 2022-07-14 13:45:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-14 14:37:30 +0000 |
commit | 00d0a908862771c4ded8d0492bfe3793affe6c10 (patch) | |
tree | a3a4c94d80869975856a93523193780f3e87b930 /src | |
parent | 5ecdd383565058346f1ade42b438c190b7f13ad2 (diff) | |
download | mongo-00d0a908862771c4ded8d0492bfe3793affe6c10.tar.gz |
SERVER-67556 Implement global index cloner
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/global_index/global_index_entry.idl | 48 | ||||
-rw-r--r-- | src/mongo/db/s/global_index/global_index_inserter.cpp | 120 | ||||
-rw-r--r-- | src/mongo/db/s/global_index/global_index_inserter.h | 70 | ||||
-rw-r--r-- | src/mongo/db/s/global_index/global_index_inserter_test.cpp | 232 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.h | 3 | ||||
-rw-r--r-- | src/mongo/logv2/log_component.h | 1 |
7 files changed, 476 insertions, 1 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index f98385fc858..7ecb530d088 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -57,6 +57,8 @@ env.Library( 'collection_sharding_state_factory_shard.cpp', 'commit_chunk_migration.idl', 'config_server_op_observer.cpp', + 'global_index/global_index_entry.idl', + 'global_index/global_index_inserter.cpp', 'global_index_metrics.cpp', 'metadata_manager.cpp', 'migration_chunk_cloner_source_legacy.cpp', @@ -561,6 +563,7 @@ env.CppUnitTest( 'dist_lock_catalog_mock.cpp', 'dist_lock_catalog_replset_test.cpp', 'dist_lock_manager_replset_test.cpp', + 'global_index/global_index_inserter_test.cpp', 'global_index_metrics_test.cpp', 'implicit_collection_creation_test.cpp', 'metadata_manager_test.cpp', diff --git a/src/mongo/db/s/global_index/global_index_entry.idl b/src/mongo/db/s/global_index/global_index_entry.idl new file mode 100644 index 00000000000..4c09dbad7b2 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_entry.idl @@ -0,0 +1,48 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file contains the schema for the document stored in the global index collection. + +global: + cpp_namespace: "mongo::global_index" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + GlobalIndexEntry: + description: "The document describing the index entry for global indexes." + strict: false + fields: + _id: + type: object + description: "The index representation of the original document." + cpp_name: "indexKeyValue" + documentKey: + type: object + description: "The union of _id and shard key values of the original document." diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp new file mode 100644 index 00000000000..a6fdff72993 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_inserter.cpp @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/global_index/global_index_inserter.h" + +#include <fmt/format.h> + +#include "mongo/db/s/global_index/global_index_entry_gen.h" +#include "mongo/db/transaction_api.h" +#include "mongo/logv2/log.h" +#include "mongo/util/fail_point.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kGlobalIndex + +using namespace fmt::literals; + +namespace mongo { +namespace global_index { +namespace { +MONGO_FAIL_POINT_DEFINE(globalIndexInserterPauseAfterReadingSkipCollection); +} // namespace + +GlobalIndexInserter::GlobalIndexInserter(NamespaceString nss, + StringData indexName, + UUID indexUUID, + std::shared_ptr<executor::TaskExecutor> executor) + : _nss(std::move(nss)), + _indexName(indexName.toString()), + _indexUUID(std::move(indexUUID)), + _executor(std::move(executor)) {} + +NamespaceString GlobalIndexInserter::_skipIdNss() { + return NamespaceString(NamespaceString::kConfigDb, + "{}.globalIndex.{}.skipList"_format(_nss.coll(), _indexName)); +} + +NamespaceString GlobalIndexInserter::_globalIndexNss() { + return NamespaceString(_nss.db(), "{}.globalIndex.{}"_format(_nss.coll(), _indexName)); +} + +void GlobalIndexInserter::processDoc(OperationContext* opCtx, + const BSONObj& indexKeyValues, + const BSONObj& documentKey) { + auto insertToGlobalIndexFn = [this, + service = opCtx->getServiceContext(), + indexKeyValues, + documentKey](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + FindCommandRequest skipIdQuery(_skipIdNss()); + skipIdQuery.setFilter(BSON("_id" << documentKey)); + skipIdQuery.setLimit(1); + + return txnClient.exhaustiveFind(skipIdQuery) + .thenRunOn(txnExec) + .then([this, service, indexKeyValues, documentKey, &txnClient, txnExec]( + const auto& skipIdDocResults) { + auto client = service->makeClient("globalIndexInserter"); + auto opCtx = service->makeOperationContext(client.get()); + + { + stdx::lock_guard<Client> lk(*client); + client->setSystemOperationKillableByStepdown(lk); + } + + globalIndexInserterPauseAfterReadingSkipCollection.pauseWhileSet(opCtx.get()); + + if (!skipIdDocResults.empty()) { + return SemiFuture<void>::makeReady(); + } + + write_ops::InsertCommandRequest globalIndexEntryInsert(_globalIndexNss()); + globalIndexEntryInsert.getWriteCommandRequestBase().setCollectionUUID(_indexUUID); + GlobalIndexEntry indexEntry(indexKeyValues, documentKey); + globalIndexEntryInsert.setDocuments({indexEntry.toBSON()}); + + return txnClient.runCRUDOp({globalIndexEntryInsert}, {}) + .thenRunOn(txnExec) + .then([this, documentKey, &txnClient](const auto& commandResponse) { + write_ops::InsertCommandRequest skipIdInsert(_skipIdNss()); + + skipIdInsert.setDocuments({BSON("_id" << documentKey)}); + return txnClient.runCRUDOp({skipIdInsert}, {}).ignoreValue(); + }) + .semi(); + }) + .semi(); + }; + + txn_api::SyncTransactionWithRetries txn(opCtx, _executor, nullptr); + txn.run(opCtx, insertToGlobalIndexFn); +} + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_inserter.h b/src/mongo/db/s/global_index/global_index_inserter.h new file mode 100644 index 00000000000..f818715532c --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_inserter.h @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" +#include "mongo/executor/task_executor.h" +#include "mongo/util/uuid.h" + +namespace mongo { +namespace global_index { + +/** + * Handles the insertion of index entries to the global index collection during the cloning phase. + */ +class GlobalIndexInserter { +public: + GlobalIndexInserter(NamespaceString nss, + StringData indexName, + UUID indexUUID, + std::shared_ptr<executor::TaskExecutor> executor); + + /** + * Performs the necessary checks and bookkeeping on inserting a new global index entry under a + * transaction. + */ + void processDoc(OperationContext* opCtx, + const BSONObj& indexKeyValues, + const BSONObj& documentKey); + +private: + NamespaceString _skipIdNss(); + NamespaceString _globalIndexNss(); + + const NamespaceString _nss; + const std::string _indexName; + const UUID _indexUUID; + + std::shared_ptr<executor::TaskExecutor> _executor; +}; + +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_inserter_test.cpp b/src/mongo/db/s/global_index/global_index_inserter_test.cpp new file mode 100644 index 00000000000..19dfb69bdf9 --- /dev/null +++ b/src/mongo/db/s/global_index/global_index_inserter_test.cpp @@ -0,0 +1,232 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/global_index/global_index_inserter.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_cache_noop.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/global_index/global_index_entry_gen.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/s/transaction_coordinator_service.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/fail_point.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +namespace mongo { +namespace global_index { +namespace { + +class GlobalIndexInserterTest : public ShardServerTestFixture { +public: + void setUp() override { + ShardServerTestFixture::setUp(); + + // Create config.transactions collection + auto opCtx = operationContext(); + DBDirectClient client(opCtx); + client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); + + LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); + + // Note: needed to initialize txn coordinator because first thing commit command does is + // cancelling coordinator if in a sharded environment. + TransactionCoordinatorService::get(operationContext()) + ->onShardingInitialization(operationContext(), true); + + // Use our own executor since the executor from the fixture is using NetworkInterfaceMock + // that uses a ClockSourceMock. This means that tasks that are scheduled to be run in the + // future will not run unless the clock is advanced manually. + _executor = makeTaskExecutorForCloner(); + + const auto& indexNs = globalIndexNss(); + client.createCollection(indexNs.ns()); + auto all = client.getCollectionInfos(indexNs.db().toString(), + BSON("name" << indexNs.coll().toString())); + + ASSERT_EQ(1, all.size()); + _indexUUID.emplace(uassertStatusOK(UUID::parse(all.front()["info"]["uuid"]))); + + client.createCollection(skipIdNss().ns()); + } + + void tearDown() override { + TransactionCoordinatorService::get(operationContext())->onStepDown(); + ShardServerTestFixture::tearDown(); + } + + const NamespaceString& nss() const { + return _nss; + } + + const std::string& indexName() const { + return _indexName; + } + + const UUID& indexUUID() const { + return *_indexUUID; + } + + NamespaceString skipIdNss() const { + return NamespaceString(NamespaceString::kConfigDb, + "{}.globalIndex.{}.skipList"_format(_nss.coll(), _indexName)); + } + + NamespaceString globalIndexNss() const { + return NamespaceString(_nss.db(), "{}.globalIndex.{}"_format(_nss.coll(), _indexName)); + } + + std::shared_ptr<executor::ThreadPoolTaskExecutor> getExecutor() { + return _executor; + } + +private: + std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutorForCloner() { + ThreadPool::Options threadPoolOptions; + threadPoolOptions.maxThreads = 1; + threadPoolOptions.threadNamePrefix = "TestGlobalIndexCloner-"; + threadPoolOptions.poolName = "TestGlobalIndexClonerThreadPool"; + + auto executor = std::make_shared<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(std::move(threadPoolOptions)), + executor::makeNetworkInterface("TestGlobalIndexClonerNetwork", nullptr, nullptr)); + executor->startup(); + + return executor; + } + + const NamespaceString _nss{"test", "user"}; + const std::string _indexName{"global_x"}; + + boost::optional<UUID> _indexUUID; + std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; +}; + +TEST_F(GlobalIndexInserterTest, ClonerUpdatesIndexEntryAndSkipIdCollection) { + GlobalIndexInserter cloner(nss(), indexName(), indexUUID(), getExecutor()); + + const auto indexKeyValues = BSON("x" << 34); + const auto documentKey = BSON("_id" << 12 << "x" << 34); + cloner.processDoc(operationContext(), indexKeyValues, documentKey); + + DBDirectClient client(operationContext()); + + FindCommandRequest indexEntryQuery(globalIndexNss()); + auto indexEntryDoc = client.findOne(indexEntryQuery); + ASSERT_BSONOBJ_EQ(BSON(GlobalIndexEntry::kIndexKeyValueFieldName + << indexKeyValues << GlobalIndexEntry::kDocumentKeyFieldName + << documentKey), + indexEntryDoc); + + FindCommandRequest skipIdQuery(skipIdNss()); + auto skipIdDoc = client.findOne(skipIdQuery); + ASSERT_BSONOBJ_EQ(BSON("_id" << documentKey), skipIdDoc); +} + +TEST_F(GlobalIndexInserterTest, ClonerSkipsDocumentIfInSkipCollection) { + GlobalIndexInserter cloner(nss(), indexName(), indexUUID(), getExecutor()); + + const auto indexKeyValues = BSON("x" << 34); + const auto documentKey = BSON("_id" << 12 << "x" << 34); + + DBDirectClient client(operationContext()); + write_ops::InsertCommandRequest skipIdInsert(skipIdNss()); + skipIdInsert.setDocuments({BSON("_id" << documentKey)}); + client.insert(skipIdInsert); + + cloner.processDoc(operationContext(), indexKeyValues, documentKey); + + FindCommandRequest indexEntryQuery(globalIndexNss()); + auto indexEntryDoc = client.findOne(indexEntryQuery); + ASSERT_TRUE(indexEntryDoc.isEmpty()); +} + +TEST_F(GlobalIndexInserterTest, ClonerRetriesWhenItEncountersWCE) { + GlobalIndexInserter cloner(nss(), indexName(), indexUUID(), getExecutor()); + + DBDirectClient client(operationContext()); + + auto clonerThread = ([&] { + FailPointEnableBlock fp("globalIndexInserterPauseAfterReadingSkipCollection"); + + const auto indexKeyValues = BSON("x" << 34); + const auto documentKey = BSON("_id" << 12 << "x" << 34); + + auto future = stdx::async(stdx::launch::async, [&] { + cloner.processDoc(operationContext(), indexKeyValues, documentKey); + }); + + fp->waitForTimesEntered(1); + + write_ops::InsertCommandRequest skipIdInsert(skipIdNss()); + GlobalIndexEntry indexEntry(indexKeyValues, documentKey); + skipIdInsert.setDocuments({BSON("_id" << documentKey)}); + client.insert(skipIdInsert); + + return future; + })(); + + clonerThread.get(); + + FindCommandRequest indexEntryQuery(globalIndexNss()); + auto indexEntryDoc = client.findOne(indexEntryQuery); + ASSERT_TRUE(indexEntryDoc.isEmpty()); +} + +// TODO: SERVER-67820 Enable after bug is fixed. +#if 0 +TEST_F(GlobalIndexInserterTest, ClonerThrowsIfIndexEntryAlreadyExists) { + GlobalIndexInserter cloner(ns(), indexName(), indexUUID(), getExecutor()); + + const auto indexKeyValues = BSON("x" << 34); + const auto documentKey = BSON("_id" << 12 << "x" << 34); + + DBDirectClient client(operationContext()); + write_ops::InsertCommandRequest globalIndexInsert(globalIndexNs()); + GlobalIndexEntry indexEntry(indexKeyValues, documentKey); + globalIndexInsert.setDocuments({indexEntry.toBSON()}); + client.insert(globalIndexInsert); + + ASSERT_THROWS_CODE(cloner.processDoc(operationContext(), indexKeyValues, documentKey), + DBException, + ErrorCodes::DuplicateKey); +} +#endif + +} // namespace +} // namespace global_index +} // namespace mongo diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h index 830ec9a7b2f..9a04c121eba 100644 --- a/src/mongo/db/transaction_api.h +++ b/src/mongo/db/transaction_api.h @@ -106,7 +106,8 @@ public: * The given stmtIds are included in the sent command. If the API's transaction was spawned on * behalf of a retryable write, the statement ids must be unique for each write in the * transaction as the underlying servers will save history for each id the same as for a - * retryable write. A write can opt out of this by sending a -1 statement id, which is ignored. + * retryable write. A write can opt out of this by sending a -1 statement id or an empty vector, + * which is ignored. * * If a sent statement id had already been seen for this transaction, the write with that id * won't apply a second time and instead returns its response from its original execution. That diff --git a/src/mongo/logv2/log_component.h b/src/mongo/logv2/log_component.h index 1a492521738..65647b00771 100644 --- a/src/mongo/logv2/log_component.h +++ b/src/mongo/logv2/log_component.h @@ -57,6 +57,7 @@ namespace mongo::logv2 { X(kControl, , "control" , "CONTROL" , kDefault) \ X(kExecutor, , "executor" , "EXECUTOR", kDefault) \ X(kGeo, , "geo" , "GEO" , kDefault) \ + X(kGlobalIndex, , "globalIndex" , "GBL_IDX" , kDefault) \ X(kIndex, , "index" , "INDEX" , kDefault) \ X(kNetwork, , "network" , "NETWORK" , kDefault) \ X(kProcessHealth, , "processHealth" , "HEALTH" , kDefault) \ |