diff options
author | Suganthi Mani <suganthi.mani@mongodb.com> | 2020-02-28 15:54:31 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-02 08:02:54 +0000 |
commit | 4628f264e60fd69cd09388e6fca0d3dd1b82f14c (patch) | |
tree | bca64008b3010bfb3500c2e6b4dabfb28d5d8862 /src/mongo/db | |
parent | a5582fa42435116ab05efcabddf17219fbd573d6 (diff) | |
download | mongo-4628f264e60fd69cd09388e6fca0d3dd1b82f14c.tar.gz |
SERVER-39071 Implements commit quorum for two phase index builds.
Diffstat (limited to 'src/mongo/db')
50 files changed, 1396 insertions, 271 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f959c2520ae..131cfc1a228 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -765,6 +765,7 @@ env.Library( LIBDEPS_PRIVATE=[ 'transaction', '$BUILD_DIR/mongo/db/commands/mongod_fcv', + "$BUILD_DIR/mongo/db/catalog/commit_quorum_options", ], ) @@ -826,7 +827,10 @@ env.Library( 'db_raii', 'index_build_entry_helpers', '$BUILD_DIR/mongo/db/catalog/collection_catalog', + '$BUILD_DIR/mongo/db/catalog/index_build_entry_idl', '$BUILD_DIR/mongo/db/catalog/index_timestamp_helper', + "$BUILD_DIR/mongo/executor/task_executor_interface", + "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl", ], ) @@ -857,6 +861,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/timestamp_block', '$BUILD_DIR/mongo/db/s/sharding_api_d', '$BUILD_DIR/mongo/util/fail_point', + "$BUILD_DIR/mongo/executor/task_executor_interface", ], ) diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index 47293e12e09..96e09b4d028 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -56,6 +56,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) final {} void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {} diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index ab0b9549256..6f14a2acd73 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -134,6 +134,9 @@ env.Library( "$BUILD_DIR/mongo/db/repl/oplog_entry", "$BUILD_DIR/mongo/rpc/command_status", ], + LIBDEPS_PRIVATE=[ + 'commit_quorum_options', + ], ) env.Library( diff --git a/src/mongo/db/catalog/commit_quorum.idl b/src/mongo/db/catalog/commit_quorum.idl index 92fac200b7b..a7a03c07f7d 100644 --- a/src/mongo/db/catalog/commit_quorum.idl +++ b/src/mongo/db/catalog/commit_quorum.idl @@ -37,5 +37,5 @@ types: description: "CommitQuorumOptions defines the required quorum for the index builds to commit." cpp_type: "mongo::CommitQuorumOptions" - serializer: "mongo::CommitQuorumOptions::append" + serializer: "mongo::CommitQuorumOptions::appendToBuilder" deserializer: "mongo::CommitQuorumOptions::deserializerForIDL" diff --git a/src/mongo/db/catalog/commit_quorum_options.cpp b/src/mongo/db/catalog/commit_quorum_options.cpp index 254682e9eea..f04eeb5c96c 100644 --- a/src/mongo/db/catalog/commit_quorum_options.cpp +++ b/src/mongo/db/catalog/commit_quorum_options.cpp @@ -80,15 +80,15 @@ CommitQuorumOptions CommitQuorumOptions::deserializerForIDL( BSONObj CommitQuorumOptions::toBSON() const { BSONObjBuilder builder; - append(kCommitQuorumField, &builder); + appendToBuilder(kCommitQuorumField, &builder); return builder.obj(); } -void CommitQuorumOptions::append(StringData fieldName, BSONObjBuilder* builder) const { +void CommitQuorumOptions::appendToBuilder(StringData fieldName, BSONObjBuilder* builder) const { if (mode.empty()) { - builder->append(kCommitQuorumField, numNodes); + builder->append(fieldName, numNodes); } else { - builder->append(kCommitQuorumField, mode); + builder->append(fieldName, mode); } } diff --git a/src/mongo/db/catalog/commit_quorum_options.h b/src/mongo/db/catalog/commit_quorum_options.h index 1dfc5bf8369..784332070b0 100644 --- a/src/mongo/db/catalog/commit_quorum_options.h +++ b/src/mongo/db/catalog/commit_quorum_options.h @@ -78,7 +78,7 @@ public: BSONObj toBSON() const; // Appends the BSON representation of this object. - void append(StringData fieldName, BSONObjBuilder* builder) const; + void appendToBuilder(StringData fieldName, BSONObjBuilder* builder) const; // The 'commitQuorum' parameter to define the required quorum for the index builds to commit. // The 'mode' represents the string format and takes precedence over the number format diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp index 4c43cfff485..60f5ae79b57 100644 --- a/src/mongo/db/catalog/drop_collection.cpp +++ b/src/mongo/db/catalog/drop_collection.cpp @@ -161,6 +161,7 @@ Status _abortIndexBuildsAndDropCollection(OperationContext* opCtx, while (true) { // Send the abort signal to any active index builds on the collection. indexBuildsCoord->abortCollectionIndexBuildsNoWait( + opCtx, collectionUUID, str::stream() << "Collection " << coll->ns() << "(" << collectionUUID << ") is being dropped"); diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp index 7d6f01eb9dd..64681932fd1 100644 --- a/src/mongo/db/catalog/drop_database.cpp +++ b/src/mongo/db/catalog/drop_database.cpp @@ -177,7 +177,8 @@ Status _dropDatabase(OperationContext* opCtx, const std::string& dbName, bool ab // are none left when we retrieve the exclusive database lock again. while (indexBuildsCoord->inProgForDb(dbName)) { // Sends the abort signal to all the active index builders for this database. - indexBuildsCoord->abortDatabaseIndexBuildsNoWait(dbName, "dropDatabase command"); + indexBuildsCoord->abortDatabaseIndexBuildsNoWait( + opCtx, dbName, "dropDatabase command"); // Now that the abort signals were sent out to the active index builders for this // database, we need to release the lock temporarily to allow those index builders diff --git a/src/mongo/db/catalog/drop_indexes.cpp b/src/mongo/db/catalog/drop_indexes.cpp index dc568301a88..ddf2185ab42 100644 --- a/src/mongo/db/catalog/drop_indexes.cpp +++ b/src/mongo/db/catalog/drop_indexes.cpp @@ -159,7 +159,7 @@ std::vector<UUID> abortIndexBuildByIndexNamesNoWait(OperationContext* opCtx, boost::optional<UUID> buildUUID = IndexBuildsCoordinator::get(opCtx)->abortIndexBuildByIndexNamesNoWait( - opCtx, collection->uuid(), indexNames, Timestamp(), "dropIndexes command"); + opCtx, collection->uuid(), indexNames, std::string("dropIndexes command")); if (buildUUID) { return {*buildUUID}; } @@ -220,7 +220,7 @@ std::vector<UUID> abortActiveIndexBuilders(OperationContext* opCtx, if (indexNames.front() == "*") { return IndexBuildsCoordinator::get(opCtx)->abortCollectionIndexBuildsNoWait( - collection->uuid(), "dropIndexes command"); + opCtx, collection->uuid(), "dropIndexes command"); } return abortIndexBuildByIndexNamesNoWait(opCtx, collection, indexNames); diff --git a/src/mongo/db/catalog/index_build_oplog_entry.cpp b/src/mongo/db/catalog/index_build_oplog_entry.cpp index 4512f43af9b..a89cd0ce6ea 100644 --- a/src/mongo/db/catalog/index_build_oplog_entry.cpp +++ b/src/mongo/db/catalog/index_build_oplog_entry.cpp @@ -41,6 +41,7 @@ StatusWith<IndexBuildOplogEntry> IndexBuildOplogEntry::parse(const repl::OplogEn // { // < "startIndexBuild" | "commitIndexBuild" | "abortIndexBuild" > : "coll", // "indexBuildUUID" : <UUID>, + // "commitQuorum" : [<int>|<std::string>] // only required for 'startIndexBuild' // "indexes" : [ // { // "key" : { @@ -80,11 +81,30 @@ StatusWith<IndexBuildOplogEntry> IndexBuildOplogEntry::parse(const repl::OplogEn if (buildUUIDElem.eoo()) { return {ErrorCodes::BadValue, str::stream() << "Missing required field 'indexBuildUUID'"}; } + auto swBuildUUID = UUID::parse(buildUUIDElem); if (!swBuildUUID.isOK()) { return swBuildUUID.getStatus().withContext("Error parsing 'indexBuildUUID'"); } + boost::optional<CommitQuorumOptions> commitQuorum; + if (repl::OplogEntry::CommandType::kStartIndexBuild == commandType) { + auto commitQuorumElem = obj.getField(CommitQuorumOptions::kCommitQuorumField); + if (commitQuorumElem.eoo()) { + return {ErrorCodes::BadValue, + str::stream() << "Missing required field '" + << CommitQuorumOptions::kCommitQuorumField << "'"}; + } + + commitQuorum = CommitQuorumOptions(); + auto status = commitQuorum->parse(commitQuorumElem); + if (!status.isOK()) { + return status.withContext(str::stream() << "Error parsing '" + << CommitQuorumOptions::kCommitQuorumField + << "': " << status.reason()); + } + } + auto indexesElem = obj.getField("indexes"); if (indexesElem.eoo()) { return {ErrorCodes::BadValue, str::stream() << "Missing required field 'indexes'"}; @@ -132,6 +152,7 @@ StatusWith<IndexBuildOplogEntry> IndexBuildOplogEntry::parse(const repl::OplogEn commandType, commandName.toString(), swBuildUUID.getValue(), + commitQuorum, indexNames, indexSpecs, cause}; diff --git a/src/mongo/db/catalog/index_build_oplog_entry.h b/src/mongo/db/catalog/index_build_oplog_entry.h index 284afbc669e..1f0d0075965 100644 --- a/src/mongo/db/catalog/index_build_oplog_entry.h +++ b/src/mongo/db/catalog/index_build_oplog_entry.h @@ -32,6 +32,7 @@ #include <vector> #include "mongo/base/status_with.h" +#include "mongo/db/catalog/commit_quorum_options.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/util/uuid.h" @@ -48,6 +49,7 @@ public: repl::OplogEntry::CommandType commandType; std::string commandName; UUID buildUUID; + boost::optional<CommitQuorumOptions> commitQuorum; std::vector<std::string> indexNames; std::vector<BSONObj> indexSpecs; boost::optional<Status> cause; diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp index 26ba9358a98..6aaa602a075 100644 --- a/src/mongo/db/catalog/rename_collection_test.cpp +++ b/src/mongo/db/catalog/rename_collection_test.cpp @@ -85,6 +85,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) override; void onCommitIndexBuild(OperationContext* opCtx, @@ -179,9 +180,11 @@ void OpObserverMock::onStartIndexBuild(OperationContext* opCtx, CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) { _logOp(opCtx, nss, "startIndex"); - OpObserverNoop::onStartIndexBuild(opCtx, nss, collUUID, indexBuildUUID, indexes, fromMigrate); + OpObserverNoop::onStartIndexBuild( + opCtx, nss, collUUID, indexBuildUUID, indexes, commitQuorum, fromMigrate); } void OpObserverMock::onCommitIndexBuild(OperationContext* opCtx, diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index cfd795a3b52..4f444b9f755 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -404,8 +404,16 @@ void Cloner::copyIndexes(OperationContext* opCtx, MultiIndexBlock::OnInitFn onInitFn; if (opCtx->writesAreReplicated() && buildUUID) { onInitFn = [&](std::vector<BSONObj>& specs) { - opObserver->onStartIndexBuild( - opCtx, to_collection, collection->uuid(), *buildUUID, specs, fromMigrate); + // Since, we don't use IndexBuildsCoordinatorMongod thread pool to build indexes, + // it's ok to set the commit quorum option as 1. Also, this is currently only get + // called in rollback via refetch. So, onStartIndexBuild() call will be a no-op. + opObserver->onStartIndexBuild(opCtx, + to_collection, + collection->uuid(), + *buildUUID, + specs, + CommitQuorumOptions(1), + fromMigrate); return Status::OK(); }; } else { diff --git a/src/mongo/db/collection_index_builds_tracker.cpp b/src/mongo/db/collection_index_builds_tracker.cpp index 439a29b9be1..4fd216d0edd 100644 --- a/src/mongo/db/collection_index_builds_tracker.cpp +++ b/src/mongo/db/collection_index_builds_tracker.cpp @@ -95,14 +95,16 @@ std::vector<UUID> CollectionIndexBuildsTracker::getIndexBuildUUIDs(WithLock) con void CollectionIndexBuildsTracker::runOperationOnAllBuilds( WithLock lk, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::function<void(WithLock, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::shared_ptr<ReplIndexBuildState> replIndexBuildState, const std::string& reason)> func, const std::string& reason) noexcept { for (auto it = _buildStateByBuildUUID.begin(); it != _buildStateByBuildUUID.end(); ++it) { - func(lk, indexBuildsManager, it->second, reason); + func(lk, opCtx, indexBuildsManager, it->second, reason); } } diff --git a/src/mongo/db/collection_index_builds_tracker.h b/src/mongo/db/collection_index_builds_tracker.h index 234e6ccfcb5..2042be873d5 100644 --- a/src/mongo/db/collection_index_builds_tracker.h +++ b/src/mongo/db/collection_index_builds_tracker.h @@ -84,8 +84,10 @@ public: */ void runOperationOnAllBuilds( WithLock, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::function<void(WithLock, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::shared_ptr<ReplIndexBuildState> replIndexBuildState, const std::string& reason)> func, diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 5211d3854a2..6b8ecff4517 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -322,6 +322,7 @@ env.Library( 'rename_collection_idl', 'test_commands_enabled', 'write_commands_common', + "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl", ], ) @@ -557,7 +558,10 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/query/query_test_service_context', 'map_reduce_agg', - ] + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl', + ], ) env.CppUnitTest( diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index eb2864eec05..70004a1c10e 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -60,6 +60,7 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/server_options.h" +#include "mongo/db/storage/two_phase_index_build_knobs_gen.h" #include "mongo/db/views/view_catalog.h" #include "mongo/logv2/log.h" #include "mongo/platform/compiler.h" @@ -205,7 +206,9 @@ void appendFinalIndexFieldsToResult(int numIndexesBefore, result.append(kNoteFieldName, "index already exists"); } - commitQuorum->append("commitQuorum", &result); + // commitQuorum will be populated only when two phase index build is enabled. + if (commitQuorum) + commitQuorum->appendToBuilder(kCommitQuorumFieldName, &result); } @@ -275,21 +278,32 @@ Status validateTTLOptions(OperationContext* opCtx, const BSONObj& cmdObj) { boost::optional<CommitQuorumOptions> parseAndGetCommitQuorum(OperationContext* opCtx, const BSONObj& cmdObj) { auto replCoord = repl::ReplicationCoordinator::get(opCtx); + auto twoPhaseindexBuildEnabled = IndexBuildsCoordinator::supportsTwoPhaseIndexBuild(); + auto commitQuorumEnabled = (enableIndexBuildCommitQuorum) ? true : false; if (cmdObj.hasField(kCommitQuorumFieldName)) { uassert(ErrorCodes::BadValue, str::stream() << "Standalones can't specify commitQuorum", replCoord->isReplEnabled()); + uassert(ErrorCodes::BadValue, + str::stream() << "commitQuorum is supported only for two phase index builds with " + "majority commit quorum support enabled ", + (twoPhaseindexBuildEnabled && commitQuorumEnabled)); CommitQuorumOptions commitQuorum; uassertStatusOK(commitQuorum.parse(cmdObj.getField(kCommitQuorumFieldName))); return commitQuorum; - } else { + } + + if (twoPhaseindexBuildEnabled) { // Retrieve the default commit quorum if one wasn't passed in, which consists of all // data-bearing nodes. - int numDataBearingMembers = - replCoord->isReplEnabled() ? replCoord->getConfig().getNumDataBearingMembers() : 1; + int numDataBearingMembers = (replCoord->isReplEnabled() && commitQuorumEnabled) + ? replCoord->getConfig().getNumDataBearingMembers() + : 1; return CommitQuorumOptions(numDataBearingMembers); } + + return boost::none; } /** @@ -625,12 +639,7 @@ bool runCreateIndexesWithCoordinator(OperationContext* opCtx, // this be a no-op. // Use a null abort timestamp because the index build will generate its own timestamp // on cleanup. - indexBuildsCoord->abortIndexBuildByBuildUUIDNoWait( - opCtx, - buildUUID, - Timestamp(), - str::stream() << "Index build interrupted: " << buildUUID << ": " - << interruptionEx.toString()); + indexBuildsCoord->abortIndexBuildOnError(opCtx, buildUUID, interruptionEx.toStatus()); LOGV2(20443, "Index build aborted: {buildUUID}", "buildUUID"_attr = buildUUID); throw; @@ -650,14 +659,7 @@ bool runCreateIndexesWithCoordinator(OperationContext* opCtx, throw; } - // Use a null abort timestamp because the index build will generate a ghost timestamp - // for the single-phase build on cleanup. - indexBuildsCoord->abortIndexBuildByBuildUUIDNoWait( - opCtx, - buildUUID, - Timestamp(), - str::stream() << "Index build interrupted due to change in replication state: " - << buildUUID << ": " << ex.toString()); + indexBuildsCoord->abortIndexBuildOnError(opCtx, buildUUID, ex.toStatus()); LOGV2(20446, "Index build aborted due to NotMaster error: {buildUUID}", "buildUUID"_attr = buildUUID); diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp index d648ee1bdc4..4385b99dad4 100644 --- a/src/mongo/db/commands/mr_test.cpp +++ b/src/mongo/db/commands/mr_test.cpp @@ -299,6 +299,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) override; /** @@ -354,6 +355,7 @@ void MapReduceOpObserver::onStartIndexBuild(OperationContext* opCtx, CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) { for (auto&& obj : indexes) { indexesCreated.push_back(obj.getOwned()); diff --git a/src/mongo/db/commands/vote_commit_index_build_command.cpp b/src/mongo/db/commands/vote_commit_index_build_command.cpp index cd4ee962525..8c8baedd139 100644 --- a/src/mongo/db/commands/vote_commit_index_build_command.cpp +++ b/src/mongo/db/commands/vote_commit_index_build_command.cpp @@ -36,6 +36,7 @@ #include "mongo/db/commands/vote_commit_index_build_gen.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/logv2/log.h" namespace mongo { namespace { @@ -70,19 +71,35 @@ public: using InvocationBase::InvocationBase; void typedRun(OperationContext* opCtx) { - uassertStatusOK(IndexBuildsCoordinator::get(opCtx)->voteCommitIndexBuild( - request().getCommandParameter(), request().getHostAndPort())); - - // Must set the latest op time on the OperationContext in case no write was needed in - // voteCommitIndexBuild above, i.e. if this command is called several times before it is - // successful regarding write concern. - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + + const auto& cmd = request(); + LOGV2_DEBUG(3856208, + 1, + "Received voteCommitIndexBuild request for index build: {builduuid}, from " + "host: {host} ", + "builduuid"_attr = cmd.getCommandParameter(), + "host"_attr = cmd.getHostAndPort().toString()); + auto voteStatus = IndexBuildsCoordinator::get(opCtx)->voteCommitIndexBuild( + opCtx, cmd.getCommandParameter(), cmd.getHostAndPort()); + + // No need to wait for majority write concern if we fail to persist the voter's info. + uassertStatusOK(voteStatus); + + auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + // Update the client's last optime to last oplog entry's opTime. + // This case can hit only if the member has already voted for that index build. So, to + // make sure the voter's info won't be rolled back, we wait for the oplog's last entry's + // opTime to be majority replicated. + if (lastOpBeforeRun == lastOpAfterRun) { + repl::ReplClientInfo::forClient(opCtx->getClient()) + .setLastOpToSystemLastOpTime(opCtx); + } } private: NamespaceString ns() const override { - MONGO_UNREACHABLE; - return {}; + return NamespaceString(request().getDbName(), ""); } bool supportsWriteConcern() const override { diff --git a/src/mongo/db/database_index_builds_tracker.cpp b/src/mongo/db/database_index_builds_tracker.cpp index 313f5fa9a5b..de107e94ecc 100644 --- a/src/mongo/db/database_index_builds_tracker.cpp +++ b/src/mongo/db/database_index_builds_tracker.cpp @@ -59,14 +59,16 @@ void DatabaseIndexBuildsTracker::removeIndexBuild(WithLock, const UUID& buildUUI void DatabaseIndexBuildsTracker::runOperationOnAllBuilds( WithLock lk, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::function<void(WithLock, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::shared_ptr<ReplIndexBuildState> replIndexBuildState, const std::string& reason)> func, const std::string& reason) { for (auto it = _allIndexBuilds.begin(); it != _allIndexBuilds.end(); ++it) { - func(lk, indexBuildsManager, it->second, reason); + func(lk, opCtx, indexBuildsManager, it->second, reason); } } diff --git a/src/mongo/db/database_index_builds_tracker.h b/src/mongo/db/database_index_builds_tracker.h index 8b2eb4ea474..603ef1c70a0 100644 --- a/src/mongo/db/database_index_builds_tracker.h +++ b/src/mongo/db/database_index_builds_tracker.h @@ -71,8 +71,10 @@ public: */ void runOperationOnAllBuilds( WithLock, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::function<void(WithLock, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::shared_ptr<ReplIndexBuildState> replIndexBuildState, const std::string& reason)> func, diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 8ad65f8d2f3..47d13e8d46b 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -215,16 +215,24 @@ void Helpers::upsert(OperationContext* opCtx, BSONElement e = o["_id"]; verify(e.type()); BSONObj id = e.wrap(); + upsert(opCtx, ns, id, o, fromMigrate); +} +void Helpers::upsert(OperationContext* opCtx, + const string& ns, + const BSONObj& filter, + const BSONObj& updateMod, + bool fromMigrate) { OldClientContext context(opCtx, ns); const NamespaceString requestNs(ns); UpdateRequest request(requestNs); - request.setQuery(id); - request.setUpdateModification(o); + request.setQuery(filter); + request.setUpdateModification(updateMod); request.setUpsert(); request.setFromMigration(fromMigrate); + request.setYieldPolicy(PlanExecutor::NO_YIELD); update(opCtx, context.db(), request); } diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h index 246130853ff..7b606768d02 100644 --- a/src/mongo/db/dbhelpers.h +++ b/src/mongo/db/dbhelpers.h @@ -123,6 +123,18 @@ struct Helpers { const BSONObj& o, bool fromMigrate = false); + /** + * Performs an upsert of 'updateMod' if we don't match the given 'filter'. + * Callers are expected to hold the collection lock. + * Note: Query yielding is turned off, so both read and writes are performed + * on the same storage snapshot. + */ + static void upsert(OperationContext* opCtx, + const std::string& ns, + const BSONObj& filter, + const BSONObj& updateMod, + bool fromMigrate = false); + // TODO: this should be somewhere else probably /* Takes object o, and returns a new object with the * same field elements but the names stripped out. diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 21478a822e5..eb1972bc49a 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -56,6 +56,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) final {} void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {} diff --git a/src/mongo/db/index_build_entry_helpers.cpp b/src/mongo/db/index_build_entry_helpers.cpp index 1dce7f5b2c4..47666998c67 100644 --- a/src/mongo/db/index_build_entry_helpers.cpp +++ b/src/mongo/db/index_build_entry_helpers.cpp @@ -60,7 +60,8 @@ namespace mongo { namespace { -Status upsert(OperationContext* opCtx, IndexBuildEntry indexBuildEntry) { +Status upsert(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry) { + return writeConflictRetry(opCtx, "upsertIndexBuildEntry", NamespaceString::kIndexBuildEntryNamespace.ns(), @@ -85,6 +86,72 @@ Status upsert(OperationContext* opCtx, IndexBuildEntry indexBuildEntry) { }); } +std::pair<const BSONObj, const BSONObj> buildIndexBuildEntryFilterAndUpdate( + IndexBuildEntry& indexBuildEntry, + boost::optional<CommitQuorumOptions> currentCommitQuorum = boost::none) { + // Construct the filter. + const auto buildUUID = + BSON(IndexBuildEntry::kBuildUUIDFieldName << indexBuildEntry.getBuildUUID()); + const auto collectionUUID = + BSON(IndexBuildEntry::kCollectionUUIDFieldName << indexBuildEntry.getCollectionUUID()); + const auto indexNameList = + BSON(IndexBuildEntry::kIndexNamesFieldName << indexBuildEntry.getIndexNames()); + BSONObjBuilder commitQuorumFilter; + if (!currentCommitQuorum) { + currentCommitQuorum = indexBuildEntry.getCommitQuorum(); + } + currentCommitQuorum->appendToBuilder(IndexBuildEntry::kCommitQuorumFieldName, + &commitQuorumFilter); + const auto filter = BSON("$and" << BSON_ARRAY(buildUUID << collectionUUID << indexNameList + << commitQuorumFilter.obj())); + // Construct the update. + BSONObjBuilder updateMod; + BSONObjBuilder commitQuorumUpdate; + indexBuildEntry.getCommitQuorum().appendToBuilder(IndexBuildEntry::kCommitQuorumFieldName, + &commitQuorumUpdate); + // If the update commit quorum is same as the value on-disk, we don't update it. + updateMod.append("$set", commitQuorumUpdate.obj()); + + // '$addToSet' to prevent any duplicate entries written to "commitReadyMembers" field. + if (auto commitReadyMembers = indexBuildEntry.getCommitReadyMembers()) { + BSONArrayBuilder arrayBuilder; + for (const auto& item : commitReadyMembers.get()) { + arrayBuilder.append(item.toString()); + } + const auto commitReadyMemberList = BSON(IndexBuildEntry::kCommitReadyMembersFieldName + << BSON("$each" << arrayBuilder.arr())); + updateMod.append("$addToSet", commitReadyMemberList); + } + + return {filter, updateMod.obj()}; +} + +Status upsert(OperationContext* opCtx, const BSONObj& filter, const BSONObj& updateMod) { + return writeConflictRetry(opCtx, + "upsertIndexBuildEntry", + NamespaceString::kIndexBuildEntryNamespace.ns(), + [&]() -> Status { + AutoGetCollection autoCollection( + opCtx, NamespaceString::kIndexBuildEntryNamespace, MODE_IX); + Collection* collection = autoCollection.getCollection(); + if (!collection) { + str::stream ss; + ss << "Collection not found: " + << NamespaceString::kIndexBuildEntryNamespace.ns(); + return Status(ErrorCodes::NamespaceNotFound, ss); + } + + WriteUnitOfWork wuow(opCtx); + Helpers::upsert(opCtx, + NamespaceString::kIndexBuildEntryNamespace.ns(), + filter, + updateMod, + /*fromMigrate=*/false); + wuow.commit(); + return Status::OK(); + }); +} + } // namespace namespace indexbuildentryhelpers { @@ -118,7 +185,24 @@ void ensureIndexBuildEntriesNamespaceExists(OperationContext* opCtx) { }); } -Status addIndexBuildEntry(OperationContext* opCtx, IndexBuildEntry indexBuildEntry) { +Status persistCommitReadyMemberInfo(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry) { + invariant(indexBuildEntry.getCommitReadyMembers()); + + auto [filter, updateMod] = buildIndexBuildEntryFilterAndUpdate(indexBuildEntry); + return upsert(opCtx, filter, updateMod); +} + +Status setIndexCommitQuorum(OperationContext* opCtx, + IndexBuildEntry& indexBuildEntry, + CommitQuorumOptions currentCommitQuorum) { + invariant(!indexBuildEntry.getCommitReadyMembers()); + + auto [filter, updateMod] = + buildIndexBuildEntryFilterAndUpdate(indexBuildEntry, currentCommitQuorum); + return upsert(opCtx, filter, updateMod); +} + +Status addIndexBuildEntry(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry) { return writeConflictRetry(opCtx, "addIndexBuildEntry", NamespaceString::kIndexBuildEntryNamespace.ns(), @@ -303,7 +387,6 @@ Status addCommitReadyMember(OperationContext* opCtx, UUID indexBuildUUID, HostAn indexBuildEntry.setCommitReadyMembers(newCommitReadyMembers); return upsert(opCtx, indexBuildEntry); } - return Status::OK(); } diff --git a/src/mongo/db/index_build_entry_helpers.h b/src/mongo/db/index_build_entry_helpers.h index e0591077784..8d8e04a9be3 100644 --- a/src/mongo/db/index_build_entry_helpers.h +++ b/src/mongo/db/index_build_entry_helpers.h @@ -70,6 +70,26 @@ namespace indexbuildentryhelpers { void ensureIndexBuildEntriesNamespaceExists(OperationContext* opCtx); /** + * Persist the host and port information about the replica set members that have voted to commit an + * index build into config.system.indexBuilds collection. If the member info is already present in + * the collection for that index build, then we don't do any updates and don't generate any errors. + * + * Returns an error if collection is missing. + */ +Status persistCommitReadyMemberInfo(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry); + +/** + * Persist the new commit quorum value for an index build into config.system.indexBuilds collection. + * We will update the commit quorum value only if 'currentCommitQuorum' matches the value read from + * the "config.system.indexBuilds" collection for that index build. + * + * Returns an error if collection is missing. + */ +Status setIndexCommitQuorum(OperationContext* opCtx, + IndexBuildEntry& indexBuildEntry, + CommitQuorumOptions currentCommitQuorum); + +/** * Writes the 'indexBuildEntry' to the disk. * * An IndexBuildEntry should be stored on the disk during the duration of the index build process @@ -78,7 +98,7 @@ void ensureIndexBuildEntriesNamespaceExists(OperationContext* opCtx); * Returns 'DuplicateKey' error code if a document already exists on the disk with the same * 'indexBuildUUID'. */ -Status addIndexBuildEntry(OperationContext* opCtx, IndexBuildEntry indexBuildEntry); +Status addIndexBuildEntry(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry); /** * Removes the IndexBuildEntry from the disk. diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp index 64cd255f1df..989f453e21a 100644 --- a/src/mongo/db/index_builds_coordinator.cpp +++ b/src/mongo/db/index_builds_coordinator.cpp @@ -134,25 +134,19 @@ bool shouldBuildIndexesOnEmptyCollectionSinglePhased(OperationContext* opCtx, return collection->isEmpty(opCtx); } -/** - * Returns true if we should wait for a commitIndexBuild or abortIndexBuild oplog entry during oplog - * application. +/* + * Determines whether to skip the index build state transition check. + * Index builder not using ReplIndexBuildState::waitForNextAction to signal primary and secondaries + * to commit or abort signal will violate index build state transition i.e, they can move from + * prepareAbort to Committed and from Committed to prepareAbort. So, we should skip state transition + * verification. Otherwise, we would invariant. */ -bool shouldWaitForCommitOrAbort(OperationContext* opCtx, const ReplIndexBuildState& replState) { - if (IndexBuildProtocol::kTwoPhase != replState.protocol) { - return false; - } - - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->getSettings().usingReplSets()) { +bool shouldSkipIndexBuildStateTransitionCheck(OperationContext* opCtx, + IndexBuildProtocol protocol) { + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord->getSettings().usingReplSets() && protocol == IndexBuildProtocol::kTwoPhase) { return false; } - - const NamespaceStringOrUUID dbAndUUID(replState.dbName, replState.collectionUUID); - if (replCoord->canAcceptWritesFor(opCtx, dbAndUUID)) { - return false; - } - return true; } @@ -161,7 +155,7 @@ bool shouldWaitForCommitOrAbort(OperationContext* opCtx, const ReplIndexBuildSta */ void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, - const ReplIndexBuildState& replState, + ReplIndexBuildState& replState, bool replSetAndNotPrimaryAtStart) { const auto& buildUUID = replState.buildUUID; @@ -174,6 +168,11 @@ void onCommitIndexBuild(OperationContext* opCtx, const auto& collUUID = replState.collectionUUID; const auto& indexSpecs = replState.indexSpecs; auto fromMigrate = false; + auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replState.protocol); + { + stdx::unique_lock<Latch> lk(replState.mutex); + replState.indexBuildState.setState(IndexBuildState::kCommitted, skipCheck); + } // Since two phase index builds are allowed to survive replication state transitions, we should // check if the node is currently a primary before attempting to write to the oplog. @@ -196,7 +195,7 @@ void onCommitIndexBuild(OperationContext* opCtx, */ void onAbortIndexBuild(OperationContext* opCtx, const NamespaceString& nss, - const ReplIndexBuildState& replState, + ReplIndexBuildState& replState, const Status& cause) { if (!serverGlobalParams.featureCompatibility.isVersionInitialized()) { return; @@ -212,28 +211,52 @@ void onAbortIndexBuild(OperationContext* opCtx, auto opObserver = opCtx->getServiceContext()->getOpObserver(); auto collUUID = replState.collectionUUID; auto fromMigrate = false; + auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replState.protocol); + { + stdx::unique_lock<Latch> lk(replState.mutex); + replState.indexBuildState.setState(IndexBuildState::kAborted, skipCheck); + } opObserver->onAbortIndexBuild( opCtx, nss, collUUID, replState.buildUUID, replState.indexSpecs, cause, fromMigrate); } /** * Aborts the index build identified by the provided 'replIndexBuildState'. - * - * Sets a signal on the coordinator's repl index build state if the builder does not yet exist in - * the manager. + * It gets called by drop database/collection/index command. */ void abortIndexBuild(WithLock lk, + OperationContext* opCtx, IndexBuildsManager* indexBuildsManager, std::shared_ptr<ReplIndexBuildState> replIndexBuildState, const std::string& reason) { - bool res = indexBuildsManager->abortIndexBuild(replIndexBuildState->buildUUID, reason); - if (res) { + auto protocol = replIndexBuildState->protocol; + stdx::unique_lock<Latch> replStateLock(replIndexBuildState->mutex); + if (protocol == IndexBuildProtocol::kTwoPhase && + replIndexBuildState->waitForNextAction->getFuture().isReady()) { + const auto nextAction = replIndexBuildState->waitForNextAction->getFuture().get(); + invariant(nextAction == IndexBuildAction::kCommitQuorumSatisfied || + nextAction == IndexBuildAction::kPrimaryAbort); + // Index build coordinator already received a signal to commit or abort. So, it's ok + // to return and wait for the index build to complete. The index build coordinator + // will not perform the signaled action (i.e, will not commit or abort the index build) + // only when the node steps down. When the node steps down, the caller of this function, + // drop commands (user operation) will also get interrupted. So, we no longer need to + // abort the index build on step down. return; } - // The index builder was not found in the manager, so it only exists in the coordinator. In this - // case, set the abort signal on the coordinator index build state. - replIndexBuildState->aborted = true; - replIndexBuildState->abortReason = reason; + + auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replIndexBuildState->protocol); + // Set the state on replIndexBuildState and indexBuildsManager. And, then signal the value. It's + // important we do all these 3 things in a critical section by holding mutex lock. + replIndexBuildState->indexBuildState.setState( + IndexBuildState::kPrepareAbort, skipCheck, boost::none, reason); + indexBuildsManager->abortIndexBuild(replIndexBuildState->buildUUID, reason); + + if (protocol == IndexBuildProtocol::kTwoPhase) { + // Only for 2 phase we need to use signaling logic. + // Promise can be set only once. + replIndexBuildState->waitForNextAction->emplaceValue(IndexBuildAction::kPrimaryAbort); + } } /** @@ -334,8 +357,8 @@ IndexBuildsCoordinator* IndexBuildsCoordinator::get(ServiceContext* serviceConte return indexBuildsCoordinator.get(); } -IndexBuildsCoordinator* IndexBuildsCoordinator::get(OperationContext* operationContext) { - return get(operationContext->getServiceContext()); +IndexBuildsCoordinator* IndexBuildsCoordinator::get(OperationContext* OperationContext) { + return get(OperationContext->getServiceContext()); } IndexBuildsCoordinator::~IndexBuildsCoordinator() { @@ -491,6 +514,23 @@ Status IndexBuildsCoordinator::_startIndexBuildForRecovery(OperationContext* opC return Status::OK(); } +std::string IndexBuildsCoordinator::_indexBuildActionToString(IndexBuildAction action) { + if (action == IndexBuildAction::kNoAction) { + return "No action"; + } else if (action == IndexBuildAction::kOplogCommit) { + return "Oplog commit"; + } else if (action == IndexBuildAction::kOplogAbort) { + return "Oplog abort"; + } else if (action == IndexBuildAction::kRollbackAbort) { + return "Rollback abort"; + } else if (action == IndexBuildAction::kPrimaryAbort) { + return "Primary abort"; + } else if (action == IndexBuildAction::kCommitQuorumSatisfied) { + return "Commit quorum Satisfied"; + } + MONGO_UNREACHABLE; +} + void IndexBuildsCoordinator::waitForAllIndexBuildsToStopForShutdown() { stdx::unique_lock<Latch> lk(_mutex); @@ -506,6 +546,7 @@ void IndexBuildsCoordinator::waitForAllIndexBuildsToStopForShutdown() { } std::vector<UUID> IndexBuildsCoordinator::_abortCollectionIndexBuilds(stdx::unique_lock<Latch>& lk, + OperationContext* opCtx, const UUID& collectionUUID, const std::string& reason, bool shouldWait) { @@ -520,7 +561,7 @@ std::vector<UUID> IndexBuildsCoordinator::_abortCollectionIndexBuilds(stdx::uniq std::vector<UUID> buildUUIDs = collIndexBuildsIt->second->getIndexBuildUUIDs(lk); collIndexBuildsIt->second->runOperationOnAllBuilds( - lk, &_indexBuildsManager, abortIndexBuild, reason); + lk, opCtx, &_indexBuildsManager, abortIndexBuild, reason); if (!shouldWait) { return buildUUIDs; @@ -533,21 +574,23 @@ std::vector<UUID> IndexBuildsCoordinator::_abortCollectionIndexBuilds(stdx::uniq return buildUUIDs; } -void IndexBuildsCoordinator::abortCollectionIndexBuilds(const UUID& collectionUUID, +void IndexBuildsCoordinator::abortCollectionIndexBuilds(OperationContext* opCtx, + const UUID& collectionUUID, const std::string& reason) { stdx::unique_lock<Latch> lk(_mutex); const bool shouldWait = true; - _abortCollectionIndexBuilds(lk, collectionUUID, reason, shouldWait); + _abortCollectionIndexBuilds(lk, opCtx, collectionUUID, reason, shouldWait); } std::vector<UUID> IndexBuildsCoordinator::abortCollectionIndexBuildsNoWait( - const UUID& collectionUUID, const std::string& reason) { + OperationContext* opCtx, const UUID& collectionUUID, const std::string& reason) { stdx::unique_lock<Latch> lk(_mutex); const bool shouldWait = false; - return _abortCollectionIndexBuilds(lk, collectionUUID, reason, shouldWait); + return _abortCollectionIndexBuilds(lk, opCtx, collectionUUID, reason, shouldWait); } void IndexBuildsCoordinator::_abortDatabaseIndexBuilds(stdx::unique_lock<Latch>& lk, + OperationContext* opCtx, const StringData& db, const std::string& reason, bool shouldWait) { @@ -560,7 +603,8 @@ void IndexBuildsCoordinator::_abortDatabaseIndexBuilds(stdx::unique_lock<Latch>& "About to abort all index builders running for collections in the given database", "database"_attr = db); - dbIndexBuilds->runOperationOnAllBuilds(lk, &_indexBuildsManager, abortIndexBuild, reason); + dbIndexBuilds->runOperationOnAllBuilds( + lk, opCtx, &_indexBuildsManager, abortIndexBuild, reason); if (!shouldWait) { return; @@ -571,17 +615,20 @@ void IndexBuildsCoordinator::_abortDatabaseIndexBuilds(stdx::unique_lock<Latch>& dbIndexBuilds->waitUntilNoIndexBuildsRemain(lk); } -void IndexBuildsCoordinator::abortDatabaseIndexBuilds(StringData db, const std::string& reason) { +void IndexBuildsCoordinator::abortDatabaseIndexBuilds(OperationContext* opCtx, + StringData db, + const std::string& reason) { stdx::unique_lock<Latch> lk(_mutex); const bool shouldWait = true; - _abortDatabaseIndexBuilds(lk, db, reason, shouldWait); + _abortDatabaseIndexBuilds(lk, opCtx, db, reason, shouldWait); } -void IndexBuildsCoordinator::abortDatabaseIndexBuildsNoWait(StringData db, +void IndexBuildsCoordinator::abortDatabaseIndexBuildsNoWait(OperationContext* opCtx, + StringData db, const std::string& reason) { stdx::unique_lock<Latch> lk(_mutex); const bool shouldWait = false; - _abortDatabaseIndexBuilds(lk, db, reason, shouldWait); + _abortDatabaseIndexBuilds(lk, opCtx, db, reason, shouldWait); } namespace { @@ -599,7 +646,8 @@ void IndexBuildsCoordinator::applyStartIndexBuild(OperationContext* opCtx, const auto nss = getNsFromUUID(opCtx, collUUID); IndexBuildsCoordinator::IndexBuildOptions indexBuildOptions; - invariant(!indexBuildOptions.commitQuorum); + invariant(oplogEntry.commitQuorum); + indexBuildOptions.commitQuorum = oplogEntry.commitQuorum.get(); indexBuildOptions.replSetAndNotPrimaryAtStart = true; auto indexBuildsCoord = IndexBuildsCoordinator::get(opCtx); @@ -636,6 +684,9 @@ void IndexBuildsCoordinator::applyCommitIndexBuild(OperationContext* opCtx, // If the index build was not found, we must restart the build. For some reason the index // build has already been aborted on this node. This is possible in certain infrequent race // conditions with stepdown, shutdown, and user interruption. + // Also, it can be because, when this node was previously in + // initial sync state and this index build was in progress on sync source. And, initial sync + // does not start the in progress index builds. LOGV2(20653, "Could not find an active index build with UUID {buildUUID} while processing a " "commitIndexBuild oplog entry. Restarting the index build on " @@ -648,6 +699,10 @@ void IndexBuildsCoordinator::applyCommitIndexBuild(OperationContext* opCtx, IndexBuildsCoordinator::IndexBuildOptions indexBuildOptions; indexBuildOptions.replSetAndNotPrimaryAtStart = true; + // It's ok to set the commitQuorum value as 0, as we have already received the + // commitIndexBuild oplog entry. No way in future, this index build will be coordinated by + // this node. + indexBuildOptions.commitQuorum = CommitQuorumOptions(0); // This spawns a new thread and returns immediately. auto fut = uassertStatusOK(indexBuildsCoord->startIndexBuild( @@ -671,12 +726,33 @@ void IndexBuildsCoordinator::applyCommitIndexBuild(OperationContext* opCtx, } auto replState = uassertStatusOK(indexBuildsCoord->_getIndexBuild(buildUUID)); - { + + while (true) { stdx::unique_lock<Latch> lk(replState->mutex); - replState->isCommitReady = true; - replState->commitTimestamp = opCtx->recoveryUnit()->getCommitTimestamp(); - replState->condVar.notify_all(); + if (replState->waitForNextAction->getFuture().isReady()) { + // If future wait is made uninterruptible, then the shutdown can stuck behind + // oplog applier if the indexBuildCoordinator thread died after interruption on + // shutdown. And, commitIndexBuild oplog entry will stuck waiting for reset of the + // promise. + const auto nextAction = replState->waitForNextAction->getFuture().get(opCtx); + invariant(nextAction == IndexBuildAction::kCommitQuorumSatisfied); + // Retry until the current promise result is consumed by the index builder thread and + // a new empty promise got created by the indexBuildscoordinator thread. + // Don't hammer it. + sleepmillis(1); + continue; + } + auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replState->protocol); + replState->indexBuildState.setState(IndexBuildState::kPrepareCommit, + skipCheck, + opCtx->recoveryUnit()->getCommitTimestamp()); + // Promise can be set only once. + // We can't skip signaling here if a signal is already set because the previous commit or + // abort signal might have been sent to handle for primary case. + replState->waitForNextAction->emplaceValue(IndexBuildAction::kOplogCommit); + break; } + auto fut = replState->sharedPromise.getFuture(); LOGV2(20654, "Index build joined after commit: {buildUUID}: {fut_waitNoThrow_opCtx}", @@ -702,24 +778,40 @@ void IndexBuildsCoordinator::applyAbortIndexBuild(OperationContext* opCtx, << buildUUID, !opCtx->recoveryUnit()->getCommitTimestamp().isNull()); + std::string abortReason(str::stream() + << "abortIndexBuild oplog entry encountered: " << *oplogEntry.cause); auto indexBuildsCoord = IndexBuildsCoordinator::get(opCtx); - indexBuildsCoord->abortIndexBuildByBuildUUID( - opCtx, - buildUUID, - opCtx->recoveryUnit()->getCommitTimestamp(), - str::stream() << "abortIndexBuild oplog entry encountered: " << *oplogEntry.cause); + indexBuildsCoord->abortIndexBuildByBuildUUID(opCtx, + buildUUID, + IndexBuildAction::kOplogAbort, + opCtx->recoveryUnit()->getCommitTimestamp(), + abortReason); +} + +void IndexBuildsCoordinator::abortIndexBuildOnError(OperationContext* opCtx, + const UUID& buildUUID, + Status abortStatus) { + // Use a null abort timestamp because the index build will generate a ghost timestamp + // for the single-phase build on cleanup. + std::string abortReason(str::stream() << "Index build interrupted: " << buildUUID << ": " + << abortStatus.toString()); + abortIndexBuildByBuildUUIDNoWait( + opCtx, buildUUID, IndexBuildAction::kPrimaryAbort, boost::none, abortReason); } void IndexBuildsCoordinator::abortIndexBuildByBuildUUID(OperationContext* opCtx, const UUID& buildUUID, - Timestamp abortTimestamp, - const std::string& reason) { - if (!abortIndexBuildByBuildUUIDNoWait(opCtx, buildUUID, abortTimestamp, reason)) { + IndexBuildAction signalAction, + boost::optional<Timestamp> abortTimestamp, + boost::optional<std::string> reason) { + if (!abortIndexBuildByBuildUUIDNoWait(opCtx, buildUUID, signalAction, abortTimestamp, reason)) { return; } - auto replState = invariant(_getIndexBuild(buildUUID), - str::stream() << "Abort timestamp: " << abortTimestamp.toString()); + auto replState = + invariant(_getIndexBuild(buildUUID), + str::stream() << "Abort timestamp: " + << abortTimestamp.get_value_or(Timestamp()).toString()); auto fut = replState->sharedPromise.getFuture(); LOGV2(20655, @@ -732,8 +824,7 @@ boost::optional<UUID> IndexBuildsCoordinator::abortIndexBuildByIndexNamesNoWait( OperationContext* opCtx, const UUID& collectionUUID, const std::vector<std::string>& indexNames, - Timestamp abortTimestamp, - const std::string& reason) { + boost::optional<std::string> reason) { boost::optional<UUID> buildUUID; auto indexBuilds = _getIndexBuilds(); auto onIndexBuild = [&](std::shared_ptr<ReplIndexBuildState> replState) { @@ -756,8 +847,11 @@ boost::optional<UUID> IndexBuildsCoordinator::abortIndexBuildByIndexNamesNoWait( "collectionUUID"_attr = collectionUUID, "replState_indexNames_front"_attr = replState->indexNames.front()); - if (this->abortIndexBuildByBuildUUIDNoWait( - opCtx, replState->buildUUID, abortTimestamp, reason)) { + if (this->abortIndexBuildByBuildUUIDNoWait(opCtx, + replState->buildUUID, + IndexBuildAction::kPrimaryAbort, + boost::none, + reason)) { buildUUID = replState->buildUUID; } }; @@ -792,31 +886,74 @@ bool IndexBuildsCoordinator::hasIndexBuilder(OperationContext* opCtx, return foundIndexBuilder; } -bool IndexBuildsCoordinator::abortIndexBuildByBuildUUIDNoWait(OperationContext* opCtx, - const UUID& buildUUID, - Timestamp abortTimestamp, - const std::string& reason) { - _indexBuildsManager.abortIndexBuild(buildUUID, reason); +bool IndexBuildsCoordinator::abortIndexBuildByBuildUUIDNoWait( + OperationContext* opCtx, + const UUID& buildUUID, + IndexBuildAction signalAction, + boost::optional<Timestamp> abortTimestamp, + boost::optional<std::string> reason) { + // We need to avoid race between commit and abort index build. + while (true) { + // It is possible to receive an abort for a non-existent index build. Abort should always + // succeed, so suppress the error. + auto replStateResult = _getIndexBuild(buildUUID); + if (!replStateResult.isOK()) { + LOGV2(20656, + "ignoring error while aborting index build {buildUUID}: " + "{replStateResult_getStatus}", + "buildUUID"_attr = buildUUID, + "replStateResult_getStatus"_attr = replStateResult.getStatus()); + return false; + } - // It is possible to receive an abort for a non-existent index build. Abort should always - // succeed, so suppress the error. - auto replStateResult = _getIndexBuild(buildUUID); - if (!replStateResult.isOK()) { - LOGV2(20656, - "ignoring error while aborting index build {buildUUID}: {replStateResult_getStatus}", - "buildUUID"_attr = buildUUID, - "replStateResult_getStatus"_attr = replStateResult.getStatus()); - return false; - } + auto replState = replStateResult.getValue(); + auto protocol = replState->protocol; - auto replState = replStateResult.getValue(); - { stdx::unique_lock<Latch> lk(replState->mutex); - replState->aborted = true; - replState->abortTimestamp = abortTimestamp; - replState->abortReason = reason; - replState->condVar.notify_all(); + if (protocol == IndexBuildProtocol::kTwoPhase && + replState->waitForNextAction->getFuture().isReady()) { + const auto nextAction = replState->waitForNextAction->getFuture().get(opCtx); + invariant(nextAction == IndexBuildAction::kCommitQuorumSatisfied || + nextAction == IndexBuildAction::kPrimaryAbort); + + // Index build coordinator already received a signal to commit or abort. So, it's ok + // to return and wait for the index build to complete if we are trying to signal + // 'kPrimaryAbort'. The index build coordinator will not perform the signaled action + // (i.e, will not commit or abort the index build) only when the node steps down. When + // the node steps down, the caller of this function, dropIndexes/createIndexes command + // (user operation) will also get interrupted. So, we no longer need to abort the index + // build on step down. + // + // Currently dropIndexes command calls this function with the + // collection lock held in IX mode, So, there are possibilities, we might block the + // index build from completing, leading to 3 way deadlocks involving step down, + // dropIndexes command, IndexBuildCoordinator thread. + if (signalAction == IndexBuildAction::kPrimaryAbort) + return true; + + // Retry until the current promise result is consumed by the index builder thread and + // a new empty promise got created by the indexBuildscoordinator thread. Or, until the + // index build got torn down after index build commit. + // Don't hammer it. + sleepmillis(1); + continue; + } + + auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replState->protocol); + // Set the state on replState and _indexBuildsManager. And, then signal the value. It's + // important we do all these 3 things in a critical section by holding mutex lock. + replState->indexBuildState.setState( + IndexBuildState::kPrepareAbort, skipCheck, abortTimestamp, reason); + _indexBuildsManager.abortIndexBuild(buildUUID, reason.get_value_or("")); + + if (protocol == IndexBuildProtocol::kTwoPhase) { + // Only for 2 phase we need to use signaling logic. + // Promise can be set only once. + replState->waitForNextAction->emplaceValue(signalAction); + } + break; } + return true; } @@ -850,8 +987,16 @@ std::size_t IndexBuildsCoordinator::getActiveIndexBuildCount(OperationContext* o void IndexBuildsCoordinator::onStepUp(OperationContext* opCtx) { LOGV2(20657, "IndexBuildsCoordinator::onStepUp - this node is stepping up to primary"); + // This would create an empty table even for FCV 4.2 to handle case where a primary node started + // with FCV 4.2, and then upgraded FCV 4.4. + ensureIndexBuildEntriesNamespaceExists(opCtx); + auto indexBuilds = _getIndexBuilds(); auto onIndexBuild = [this, opCtx](std::shared_ptr<ReplIndexBuildState> replState) { + if (IndexBuildProtocol::kTwoPhase != replState->protocol) { + return; + } + // TODO(SERVER-44654): re-enable failover support for unique indexes. if (containsUniqueIndexes(replState->indexSpecs)) { // We abort unique index builds on step-up on the new primary, as opposed to on @@ -863,21 +1008,25 @@ void IndexBuildsCoordinator::onStepUp(OperationContext* opCtx) { // oplog entry. // Do not wait for the index build to exit, because it may reacquire locks that are not // available until stepUp completes. - abortIndexBuildByBuildUUIDNoWait( - opCtx, replState->buildUUID, Timestamp(), "unique indexes do not support failover"); + std::string abortReason("unique indexes do not support failover"); + abortIndexBuildByBuildUUIDNoWait(opCtx, + replState->buildUUID, + IndexBuildAction::kPrimaryAbort, + boost::none, + abortReason); return; } - stdx::unique_lock<Latch> lk(replState->mutex); - if (!replState->aborted) { - // Leave commit timestamp as null. We will be writing a commitIndexBuild oplog entry now - // that we are primary and using the timestamp from the oplog entry to update the mdb - // catalog. - invariant(replState->commitTimestamp.isNull(), replState->buildUUID.toString()); - invariant(!replState->isCommitReady, replState->buildUUID.toString()); - replState->isCommitReady = true; - replState->condVar.notify_all(); + { + stdx::unique_lock<Latch> lk(replState->mutex); + // After Sending the abort this might have stepped down and stepped back up. + if (replState->indexBuildState.isAbortPrepared() && + !replState->waitForNextAction->getFuture().isReady()) { + replState->waitForNextAction->emplaceValue(IndexBuildAction::kPrimaryAbort); + return; + } } + _signalIfCommitQuorumIsSatisfied(opCtx, replState); }; forEachIndexBuild(indexBuilds, "IndexBuildsCoordinator::onStepUp - "_sd, onIndexBuild); } @@ -888,33 +1037,49 @@ IndexBuilds IndexBuildsCoordinator::onRollback(OperationContext* opCtx) { IndexBuilds buildsAborted; auto indexBuilds = _getIndexBuilds(); - auto onIndexBuild = - [this, opCtx, &buildsAborted](std::shared_ptr<ReplIndexBuildState> replState) { - if (IndexBuildProtocol::kSinglePhase == replState->protocol) { - LOGV2(20659, - "IndexBuildsCoordinator::onRollback - not aborting single phase index build: " - "{replState_buildUUID}", - "replState_buildUUID"_attr = replState->buildUUID); - return; - } - const std::string reason = "rollback"; - - IndexBuildDetails aborted{replState->collectionUUID}; - // Record the index builds aborted due to rollback. This allows any rollback algorithm - // to efficiently restart all unfinished index builds without having to scan all indexes - // in all collections. - for (auto spec : replState->indexSpecs) { - aborted.indexSpecs.emplace_back(spec.getOwned()); - } - buildsAborted.insert({replState->buildUUID, aborted}); + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + auto onIndexBuild = [&](std::shared_ptr<ReplIndexBuildState> replState) { + if (IndexBuildProtocol::kSinglePhase == replState->protocol) { + LOGV2(3856209, + "IndexBuildsCoordinator::onRollback - not aborting single phase index build: " + "{builduuid} ", + "builduuid"_attr = replState->buildUUID); + return; + } + if (IndexBuildProtocol::kSinglePhase == replState->protocol) { + LOGV2(20659, + "IndexBuildsCoordinator::onRollback - not aborting single phase index build: " + "{replState_buildUUID}", + "replState_buildUUID"_attr = replState->buildUUID); + return; + } + const std::string reason = "rollback"; + + IndexBuildDetails aborted{replState->collectionUUID}; + // Record the index builds aborted due to rollback. This allows any rollback algorithm + // to efficiently restart all unfinished index builds without having to scan all indexes + // in all collections. + for (auto spec : replState->indexSpecs) { + aborted.indexSpecs.emplace_back(spec.getOwned()); + } + buildsAborted.insert({replState->buildUUID, aborted}); - // Leave abort timestamp as null. This will unblock the index build and allow it to - // complete without cleaning up. Subsequently, the rollback algorithm can decide how to - // undo the index build depending on the state of the oplog. Waits for index build - // thread to exit. - abortIndexBuildByBuildUUID(opCtx, replState->buildUUID, Timestamp(), reason); - }; + // Leave abort timestamp as null. This will unblock the index build and allow it to + // complete without cleaning up. Subsequently, the rollback algorithm can decide how to + // undo the index build depending on the state of the oplog. + // Signals the kRollbackAbort and then waits for the thread to join. + abortIndexBuildByBuildUUID( + opCtx, replState->buildUUID, IndexBuildAction::kRollbackAbort, boost::none, reason); + + // Now, cancel if there are any active callback handle waiting for the remote + // "voteCommitIndexBuild" command's response. + stdx::unique_lock<Latch> lk(replState->mutex); + if (replState->voteCmdCbkHandle.isValid()) { + replCoord->cancelCbkHandle(replState->voteCmdCbkHandle); + } + }; forEachIndexBuild(indexBuilds, "IndexBuildsCoordinator::onRollback - "_sd, onIndexBuild); + return buildsAborted; } @@ -1108,7 +1273,12 @@ void IndexBuildsCoordinator::createIndexes(OperationContext* opCtx, if (!supportsTwoPhaseIndexBuild()) { return; } - opObserver->onStartIndexBuild(opCtx, nss, collectionUUID, buildUUID, specs, fromMigrate); + // Since, we don't use IndexBuildsCoordinatorMongod thread pool to build system indexes, + // it's ok to set the commit quorum option as 1. Also, this is currently only get + // called during system index creation on startup. So, onStartIndexBuild() call will be a + // no-op. + opObserver->onStartIndexBuild( + opCtx, nss, collectionUUID, buildUUID, specs, CommitQuorumOptions(1), fromMigrate); opObserver->onCommitIndexBuild(opCtx, nss, collectionUUID, buildUUID, specs, fromMigrate); }; uassertStatusOK(_indexBuildsManager.commitIndexBuild( @@ -1201,16 +1371,17 @@ Status IndexBuildsCoordinator::_registerIndexBuild( { // We have to lock the mutex in order to read the committed/aborted state. stdx::unique_lock<Latch> lk(existingIndexBuild->mutex); - if (existingIndexBuild->isCommitReady) { - ss << " (ready to commit with timestamp: " - << existingIndexBuild->commitTimestamp.toString() << ")"; - } else if (existingIndexBuild->aborted) { - ss << " (aborted with reason: " << existingIndexBuild->abortReason - << " and timestamp: " << existingIndexBuild->abortTimestamp.toString() - << ")"; + ss << " index build state: " << existingIndexBuild->indexBuildState.toString(); + if (auto ts = existingIndexBuild->indexBuildState.getTimestamp()) { + ss << ", timestamp: " << ts->toString(); + } + if (existingIndexBuild->indexBuildState.isSet(IndexBuildState::kPrepareAbort | + IndexBuildState::kAborted)) { + if (auto abortReason = + existingIndexBuild->indexBuildState.getAbortReason()) { + ss << ", abort reason: " << abortReason.get(); + } aborted = true; - } else { - ss << " (in-progress)"; } } std::string msg = ss; @@ -1394,15 +1565,28 @@ IndexBuildsCoordinator::PostSetupAction IndexBuildsCoordinator::_setUpIndexBuild MultiIndexBlock::OnInitFn onInitFn; if (IndexBuildProtocol::kTwoPhase == replState->protocol) { + // Change the startIndexBuild Oplog entry. // Two-phase index builds write a different oplog entry than the default behavior which // writes a no-op just to generate an optime. onInitFn = [&](std::vector<BSONObj>& specs) { + if (!replCoord->canAcceptWritesFor(opCtx, nss)) { + // Not primary. + return Status::OK(); + } + + stdx::unique_lock<Latch> lk(replState->mutex); + // Need to run this in repl mutex, as we want to the commitQuorum value. And, generate + // the startIndexBuild oplog entry with mutex lock held. We basically don't want + // something like this, SetIndexCommitQuorum command changes the commit quorum from 3 + // to 5. And, the startIndexBuild resets the commit quorum value to be 3 on secondaries. + invariant(replState->commitQuorum, "Commit quorum required for two phase index build"); opCtx->getServiceContext()->getOpObserver()->onStartIndexBuild( opCtx, nss, replState->collectionUUID, replState->buildUUID, replState->indexSpecs, + replState->commitQuorum.get(), false /* fromMigrate */); return Status::OK(); @@ -1602,6 +1786,14 @@ void IndexBuildsCoordinator::_cleanUpTwoPhaseAfterFailure( const Status& status) { if (status == ErrorCodes::InterruptedAtShutdown) { + // Promise should be set at least once before it's getting destroyed. Else it would + // invariant. + { + stdx::unique_lock<Latch> lk(replState->mutex); + if (!replState->waitForNextAction->getFuture().isReady()) { + replState->waitForNextAction->emplaceValue(IndexBuildAction::kNoAction); + } + } // Leave it as-if kill -9 happened. Startup recovery will restart the index build. _indexBuildsManager.abortIndexBuildWithoutCleanup( opCtx, collection, replState->buildUUID, "shutting down"); @@ -1628,14 +1820,15 @@ void IndexBuildsCoordinator::_cleanUpTwoPhaseAfterFailure( // build from the oplog entry unless the index build did not fail due to processing an // abortIndexBuild oplog entry. This is the case if we were aborted due to rollback. stdx::unique_lock<Latch> lk(replState->mutex); - invariant(replState->aborted, replState->buildUUID.toString()); - Timestamp abortIndexBuildTimestamp = replState->abortTimestamp; + invariant(replState->indexBuildState.isAbortPrepared(), + replState->buildUUID.toString()); + auto abortIndexBuildTimestamp = replState->indexBuildState.getTimestamp(); // If we were aborted and no abort timestamp is set, then we should leave the index // build unfinished. This can happen during rollback because we are not primary and // cannot generate an optime to timestamp the index build abort. We rely on the // rollback process to correct this state. - if (abortIndexBuildTimestamp.isNull()) { + if (!abortIndexBuildTimestamp) { _indexBuildsManager.abortIndexBuildWithoutCleanup( opCtx, collection, replState->buildUUID, "no longer primary"); _indexBuildsManager.tearDownIndexBuild( @@ -1647,7 +1840,7 @@ void IndexBuildsCoordinator::_cleanUpTwoPhaseAfterFailure( unlockRSTLForIndexCleanup(opCtx); Lock::CollectionLock collLock(opCtx, nss, MODE_X); - TimestampBlock tsBlock(opCtx, abortIndexBuildTimestamp); + TimestampBlock tsBlock(opCtx, abortIndexBuildTimestamp.get()); _indexBuildsManager.tearDownIndexBuild( opCtx, collection, replState->buildUUID, MultiIndexBlock::kNoopOnCleanUpFn); return; @@ -1802,7 +1995,9 @@ void IndexBuildsCoordinator::_buildIndexTwoPhase( _scanCollectionAndInsertKeysIntoSorter(opCtx, replState, exclusiveCollectionLock); _insertKeysFromSideTablesWithoutBlockingWrites(opCtx, replState); - auto commitIndexBuildTimestamp = _waitForCommitOrAbort(opCtx, replState); + _signalPrimaryForCommitReadiness(opCtx, replState); + auto commitIndexBuildTimestamp = _waitForNextIndexBuildAction(opCtx, replState); + _insertKeysFromSideTablesAndCommit( opCtx, replState, indexBuildOptions, exclusiveCollectionLock, commitIndexBuildTimestamp); } @@ -1896,51 +2091,6 @@ void IndexBuildsCoordinator::_insertKeysFromSideTablesWithoutBlockingWrites( } /** - * Waits for commit or abort signal from primary. - */ -Timestamp IndexBuildsCoordinator::_waitForCommitOrAbort( - OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) { - Timestamp commitIndexBuildTimestamp; - if (shouldWaitForCommitOrAbort(opCtx, *replState)) { - LOGV2(20668, - "Index build waiting for commit or abort before completing final phase: " - "{replState_buildUUID}", - "replState_buildUUID"_attr = replState->buildUUID); - - // Yield locks and storage engine resources before blocking. - opCtx->recoveryUnit()->abandonSnapshot(); - Lock::TempRelease release(opCtx->lockState()); - invariant(!opCtx->lockState()->isLocked(), - str::stream() - << "failed to yield locks for index build while waiting for commit or abort: " - << replState->buildUUID); - - stdx::unique_lock<Latch> lk(replState->mutex); - auto isReadyToCommitOrAbort = [rs = replState] { return rs->isCommitReady || rs->aborted; }; - opCtx->waitForConditionOrInterrupt(replState->condVar, lk, isReadyToCommitOrAbort); - - if (replState->isCommitReady) { - LOGV2(20669, - "Committing index build", - "buildUUID"_attr = replState->buildUUID, - "commitTimestamp"_attr = replState->commitTimestamp, - "collectionUUID"_attr = replState->collectionUUID); - commitIndexBuildTimestamp = replState->commitTimestamp; - invariant(!replState->aborted, replState->buildUUID.toString()); - } else if (replState->aborted) { - LOGV2(20670, - "Aborting index build", - "buildUUID"_attr = replState->buildUUID, - "abortTimestamp"_attr = replState->abortTimestamp, - "abortReason"_attr = replState->abortReason, - "collectionUUID"_attr = replState->collectionUUID); - invariant(!replState->isCommitReady, replState->buildUUID.toString()); - } - } - return commitIndexBuildTimestamp; -} - -/** * Third phase is catching up on all the writes that occurred during the first two phases. * Accepts a commit timestamp for the index (null if not available). */ diff --git a/src/mongo/db/index_builds_coordinator.h b/src/mongo/db/index_builds_coordinator.h index 18baf0d8fcf..801952ac5bb 100644 --- a/src/mongo/db/index_builds_coordinator.h +++ b/src/mongo/db/index_builds_coordinator.h @@ -48,6 +48,8 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl_index_build_state.h" #include "mongo/db/storage/durable_catalog.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/concurrency/with_lock.h" @@ -198,7 +200,9 @@ public: * provided 'reason' will be used in the error message that the index builders return to their * callers. */ - void abortCollectionIndexBuilds(const UUID& collectionUUID, const std::string& reason); + void abortCollectionIndexBuilds(OperationContext* opCtx, + const UUID& collectionUUID, + const std::string& reason); /** * Signals all of the index builds on the specified collection to abort and returns the build @@ -206,7 +210,8 @@ public: * provided 'reason' will be used in the error message that the index builders return to their * callers. */ - std::vector<UUID> abortCollectionIndexBuildsNoWait(const UUID& collectionUUID, + std::vector<UUID> abortCollectionIndexBuildsNoWait(OperationContext* opCtx, + const UUID& collectionUUID, const std::string& reason); /** @@ -214,21 +219,33 @@ public: * builds are no longer running. The provided 'reason' will be used in the error message that * the index builders return to their callers. */ - void abortDatabaseIndexBuilds(StringData db, const std::string& reason); + void abortDatabaseIndexBuilds(OperationContext* opCtx, + StringData db, + const std::string& reason); /** * Signals all of the index builds on the specified database to abort. The provided 'reason' * will be used in the error message that the index builders return to their callers. */ - void abortDatabaseIndexBuildsNoWait(StringData db, const std::string& reason); + void abortDatabaseIndexBuildsNoWait(OperationContext* opCtx, + StringData db, + const std::string& reason); + + /** + * Aborts an index build by index build UUID. This gets called when the index build on primary + * failed due to interruption or replica set state change. + * It's a wrapper function to abortIndexBuildByBuildUUIDNoWait(). + */ + void abortIndexBuildOnError(OperationContext* opCtx, const UUID& buildUUID, Status abortStatus); /** * Aborts an index build by index build UUID. Returns when the index build thread exits. */ void abortIndexBuildByBuildUUID(OperationContext* opCtx, const UUID& buildUUID, - Timestamp abortTimestamp, - const std::string& reason); + IndexBuildAction signalAction, + boost::optional<Timestamp> abortTimestamp = boost::none, + boost::optional<std::string> reason = boost::none); /** * Aborts an index build by index build UUID. Does not wait for the index build thread to @@ -236,9 +253,9 @@ public: */ bool abortIndexBuildByBuildUUIDNoWait(OperationContext* opCtx, const UUID& buildUUID, - Timestamp abortTimestamp, - const std::string& reason); - + IndexBuildAction signalAction, + boost::optional<Timestamp> abortTimestamp = boost::none, + boost::optional<std::string> reason = boost::none); /** * Aborts an index build by its index name(s). This will only abort in-progress index builds if * all of the indexes are specified that a single builder is building together. When an @@ -249,8 +266,7 @@ public: OperationContext* opCtx, const UUID& collectionUUID, const std::vector<std::string>& indexNames, - Timestamp abortTimestamp, - const std::string& reason); + boost::optional<std::string> reason = boost::none); /** * Returns true if there is an index builder building the given index names on a collection. @@ -281,9 +297,13 @@ public: IndexBuilds onRollback(OperationContext* opCtx); /** - * TODO: This is not yet implemented. + * Handles the 'VoteCommitIndexBuild' command request. + * Writes the host and port information of the replica set member that has voted to commit an + * index build into config.system.indexBuilds collection. */ - virtual Status voteCommitIndexBuild(const UUID& buildUUID, const HostAndPort& hostAndPort) = 0; + virtual Status voteCommitIndexBuild(OperationContext* opCtx, + const UUID& buildUUID, + const HostAndPort& hostAndPort) = 0; /** * Sets a new commit quorum on an index build that manages 'indexNames' on collection 'nss'. @@ -574,7 +594,34 @@ protected: OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState); /** - * Waits for commit or abort signal from primary. + * Reads the commit ready members list for index build UUID in 'replState' from + * "config.system.indexBuilds" collection. And, signals the index builder thread on primary to + * commit the index build if the number of voters have satisfied the commit quorum for that + * index build. Sets the ReplIndexBuildState::waitForNextAction promise value to be + * IndexBuildAction::kCommitQuorumSatisfied. + */ + virtual void _signalIfCommitQuorumIsSatisfied( + OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) = 0; + + /** + * Signals the primary to commit the index build by sending "voteCommitIndexBuild" command + * request to it with write concern 'majority', then waits for that command's response. And, + * command gets retried on error. This function gets called after the second draining phase of + * index build. + */ + virtual void _signalPrimaryForCommitReadiness( + OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) = 0; + + /** + * Both primary and secondaries will wait on 'ReplIndexBuildState::waitForNextAction' future for + * commit or abort index build signal. + * On primary: + * - Commit signal can be sent either by voteCommitIndexBuild command or stepup. + * - Abort signal can be sent either by createIndexes command thread on user interruption or + * drop indexes/databases/collection commands. + * On secondaries: + * - Commit signal can be sent only by oplog applier. + * - Abort signal on secondaries can be sent by oplog applier, bgSync on rollback. * * On completion, this function returns a timestamp, which may be null, that may be used to * update the mdb catalog as we commit the index build. The commit index build timestamp is @@ -583,14 +630,17 @@ protected: * are currently a primary, in which case we do not need to wait any external signal to commit * the index build. */ - Timestamp _waitForCommitOrAbort(OperationContext* opCtx, - std::shared_ptr<ReplIndexBuildState> replState); + virtual Timestamp _waitForNextIndexBuildAction( + OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) = 0; + + std::string _indexBuildActionToString(IndexBuildAction action); /** * Third phase is catching up on all the writes that occurred during the first two phases. - * Accepts a commit timestamp for the index, which could be null. See _waitForCommitOrAbort() - * comments. This timestamp is used only for committing the index, which sets the ready flag to - * true, to the catalog; it is not used for the catch-up writes during the final drain phase. + * Accepts a commit timestamp for the index, which could be null. See + * _waitForNextIndexBuildAction() comments. This timestamp is used only for committing the + * index, which sets the ready flag to true, to the catalog; it is not used for the catch-up + * writes during the final drain phase. */ void _insertKeysFromSideTablesAndCommit( OperationContext* opCtx, @@ -628,6 +678,7 @@ protected: * UUIDs of the aborted index builders */ std::vector<UUID> _abortCollectionIndexBuilds(stdx::unique_lock<Latch>& lk, + OperationContext* opCtx, const UUID& collectionUUID, const std::string& reason, bool shouldWait); @@ -636,6 +687,7 @@ protected: * Helper for 'abortDatabaseIndexBuilds' and 'abortDatabaseIndexBuildsNoWait'. */ void _abortDatabaseIndexBuilds(stdx::unique_lock<Latch>& lk, + OperationContext* opCtx, const StringData& db, const std::string& reason, bool shouldWait); diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp index bf06151e5f7..f59f8da6082 100644 --- a/src/mongo/db/index_builds_coordinator_mongod.cpp +++ b/src/mongo/db/index_builds_coordinator_mongod.cpp @@ -34,12 +34,19 @@ #include "mongo/db/index_builds_coordinator_mongod.h" #include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/catalog/index_build_entry_gen.h" +#include "mongo/db/concurrency/locker.h" +#include "mongo/db/concurrency/replication_state_transition_lock_guard.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/index_build_entry_helpers.h" #include "mongo/db/operation_context.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/two_phase_index_build_knobs_gen.h" +#include "mongo/executor/task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" #include "mongo/util/str.h" @@ -166,6 +173,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, // written, or an error has been encountered otherwise. auto [startPromise, startFuture] = makePromiseFuture<void>(); + auto replState = invariant(_getIndexBuild(buildUUID)); _threadPool.schedule([ this, @@ -245,10 +253,371 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, return replState->sharedPromise.getFuture(); } -Status IndexBuildsCoordinatorMongod::voteCommitIndexBuild(const UUID& buildUUID, +Status IndexBuildsCoordinatorMongod::voteCommitIndexBuild(OperationContext* opCtx, + const UUID& buildUUID, const HostAndPort& hostAndPort) { - // TODO: not yet implemented. - return Status::OK(); + + auto swReplState = _getIndexBuild(buildUUID); + if (!swReplState.isOK()) { + // Index build might have got torn down. + return swReplState.getStatus(); + } + + Status upsertStatus = Status::OK(); + std::vector<HostAndPort> members; + members.push_back(hostAndPort); + auto replState = swReplState.getValue(); + + { + stdx::unique_lock<Latch> lk(replState->mutex); + // This indicates the index build was successfully able to commit or abort, and about to + // write 'commitIndexBuild' or 'abortIndexBuild' oplog entry. In such case, we should throw + // a retryable error code to secondary and not try to persist the votes. Otherwise a + // deadlock can happen if a commit/abortIndexBuild oplog entry is followed by write to + // "config.system.indexBuilds" collection. In that case, voteCommitIndexBuild cmd on primary + // will be waiting for the system.indexBuilds write to be majority replicated. But, then, + // secondary oplog will be stuck waiting on commit/abortIndexBuild oplog entry. And, + // commit/abortIndexBuild oplog entry will be waiting on the secondary indexBuildCoordinator + // thread to join. But, the indexBuildCoordinator thread will be waiting for the + // voteCommitIndexBuild response. + if (replState->indexBuildState.isSet(IndexBuildState::kCommitted | + IndexBuildState::kAborted)) { + return Status{ErrorCodes::CommandFailed, + str::stream() + << "Index build state : " << replState->indexBuildState.toString()}; + } + + invariant(replState->commitQuorum); + IndexBuildEntry indexbuildEntry(buildUUID, + replState->collectionUUID, + replState->commitQuorum.get(), + replState->indexNames); + indexbuildEntry.setCommitReadyMembers(members); + + // Persist the vote with replState mutex lock held to make sure that node will not write the + // commit/abortIndexBuild oplog entry. + upsertStatus = persistCommitReadyMemberInfo(opCtx, indexbuildEntry); + // 'DuplicateKey' error indicates that the commit quorum value read from replState does not + // match on-disk commit quorum value. Since, we persist the vote with replState mutex lock + // held, there is no way this can happen. We basically don't want something like this, + // SetIndexCommitQuorum command changes the commit quorum from 3 to 5. And, the + // voteCommitIndexBuild resets the commit quorum value to be 3 while updating the voter's + // info. + invariant(upsertStatus.code() != ErrorCodes::DuplicateKey); + } + + if (upsertStatus.isOK()) { + _signalIfCommitQuorumIsSatisfied(opCtx, replState); + } + return upsertStatus; +} + +void IndexBuildsCoordinatorMongod::_signalIfCommitQuorumIsSatisfied( + OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) { + auto sendCommitQuorumSatisfiedSignal = [&]() { + stdx::unique_lock<Latch> lk(replState->mutex); + if (!replState->waitForNextAction->getFuture().isReady()) { + replState->waitForNextAction->emplaceValue(IndexBuildAction::kCommitQuorumSatisfied); + } else { + // This implies we already got a commit or abort signal by other ways. This might have + // been signaled earlier with kPrimaryAbort or kCommitQuorumSatisfied. Or, it's also + // possible the node got stepped down and received kOplogCommit/koplogAbort or got + // kRollbackAbort. So, it's ok to skip signaling. + auto action = replState->waitForNextAction->getFuture().get(opCtx); + + LOGV2(3856200, + "Not signaling \"{signalAction}\" as it was previously signaled with " + "\"{signalActionSet}\" for index build: {buildUUID}", + "signalAction"_attr = + _indexBuildActionToString(IndexBuildAction::kCommitQuorumSatisfied), + "signalActionSet"_attr = _indexBuildActionToString(action), + "buildUUID"_attr = replState->buildUUID); + } + }; + + if (!enableIndexBuildCommitQuorum) { + // When enableIndexBuildCommitQuorum is turned off, we should not use + // system.indexBuilds collection to decide whether the commit quorum is satisfied or not. + sendCommitQuorumSatisfiedSignal(); + return; + } + + // Read the index builds entry from config.system.indexBuilds collection. + auto swIndexBuildEntry = getIndexBuildEntry(opCtx, replState->buildUUID); + auto status = swIndexBuildEntry.getStatus(); + // This can occur when no vote got received and stepup tries to check if commit quorum is + // satisfied. + if (status == ErrorCodes::NoMatchingDocument) + return; + invariant(status.isOK()); + + auto indexBuildEntry = swIndexBuildEntry.getValue(); + + auto voteMemberList = indexBuildEntry.getCommitReadyMembers(); + invariant(voteMemberList); + int voteCount = voteMemberList->size(); + + auto commitQuorum = indexBuildEntry.getCommitQuorum(); + int requiredQuorumCount = commitQuorum.numNodes; + if (commitQuorum.mode == CommitQuorumOptions::kMajority) { + requiredQuorumCount = + repl::ReplicationCoordinator::get(opCtx)->getConfig().getWriteMajority(); + } + + if (voteCount >= requiredQuorumCount) { + LOGV2(3856201, + "Index build Commit Quorum Satisfied: {indexBuildEntry}", + "indexBuildEntry"_attr = indexBuildEntry.toBSON()); + sendCommitQuorumSatisfiedSignal(); + } +} + +bool IndexBuildsCoordinatorMongod::_checkVoteCommitIndexCmdSucceeded(const BSONObj& response) { + auto commandStatus = getStatusFromCommandResult(response); + auto wcStatus = getWriteConcernStatusFromCommandResult(response); + if (commandStatus.isOK() && wcStatus.isOK()) { + return true; + } + LOGV2(3856202, + "'voteCommitIndexBuild' command failed with response : {response}", + "response"_attr = response); + return false; +} + +void IndexBuildsCoordinatorMongod::_signalPrimaryForCommitReadiness( + OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->getSettings().usingReplSets()) { + // Standalones does not support commit quorum. + return; + } + + if (!enableIndexBuildCommitQuorum) { + // No need to vote instead directly signal primary to commit the index build. + invariant(opCtx->lockState()->isRSTLLocked()); + const NamespaceStringOrUUID dbAndUUID(replState->dbName, replState->collectionUUID); + if (replCoord->canAcceptWritesFor(opCtx, dbAndUUID)) { + _signalIfCommitQuorumIsSatisfied(opCtx, replState); + } + return; + } + + // Yield locks and storage engine resources before blocking. + opCtx->recoveryUnit()->abandonSnapshot(); + Lock::TempRelease release(opCtx->lockState()); + invariant(!opCtx->lockState()->isRSTLLocked()); + + Backoff exponentialBackoff(Seconds(1), Seconds(2)); + + auto onRemoteCmdScheduled = [&](executor::TaskExecutor::CallbackHandle handle) { + stdx::unique_lock<Latch> lk(replState->mutex); + auto future = replState->waitForNextAction->getFuture(); + // Don't set the callback handle if we have been signaled with kRollbackAbort. + // Otherwise, it can violate liveness property. Consider a case, where the bgsync + // thread has signaled aborted and waits for the secondary indexBuildCoordinator + // thread to join. But, the indexBuildCoordinator thread will be waiting for the + // remote "voteCommitIndexBuild" command's response. And, the primary will be + // waiting for 'voteCommitIndexBuild' command's write to be majority replicated. But, + // gets stuck waiting for the rollback node to transition out to secondary. + if (future.isReady() && future.get(opCtx) == IndexBuildAction::kRollbackAbort) { + replCoord->cancelCbkHandle(handle); + } else { + invariant(!replState->voteCmdCbkHandle.isValid()); + replState->voteCmdCbkHandle = handle; + } + }; + + auto onRemoteCmdComplete = [&](executor::TaskExecutor::CallbackHandle) { + stdx::unique_lock<Latch> lk(replState->mutex); + replState->voteCmdCbkHandle = executor::TaskExecutor::CallbackHandle(); + }; + + auto needToVote = [&]() -> bool { + stdx::unique_lock<Latch> lk(replState->mutex); + // Needs comment. + return !replState->waitForNextAction->getFuture().isReady() ? true : false; + }; + + auto convertToNonFatalStatus = [&](Status origStatus) -> Status { + auto errCode = ErrorCodes::InterruptedAtShutdown; + + stdx::unique_lock<Latch> lk(replState->mutex); + if (replState->indexBuildState.isAbortPrepared()) { + errCode = ErrorCodes::IndexBuildAborted; + } + + return Status{errCode, origStatus.reason()}; + }; + + + // Retry 'voteCommitIndexBuild' command on error until we have been signaled either with commit + // or abort. This way, we can make sure majority of nodes will never stop voting and wait for + // commit or abort signal until they have received commit or abort signal. + while (needToVote()) { + // Don't hammer the network. + sleepFor(exponentialBackoff.nextSleep()); + // When index build started during startup recovery can try to get it's address when + // rsConfig is uninitialized. So, retry till it gets initialized. Also, it's important, when + // we retry, we check if we have received commit or abort signal to ensure liveness. For + // e.g., consider a case where index build gets restarted on startup recovery and + // indexBuildsCoordinator thread waits for valid address w/o checking commit or abort + // signal. Now, things can go wrong if we try to replay commitIndexBuild oplog entry for + // that index build on startup recovery. Oplog applier would get stuck waiting on the + // indexBuildsCoordinator thread. As a result, we won't be able to transition to secondary + // state, get stuck on startup state. + auto myAddress = replCoord->getMyHostAndPort(); + if (myAddress.empty()) { + continue; + } + auto const voteCmdRequest = + BSON("voteCommitIndexBuild" << replState->buildUUID << "hostAndPort" + << myAddress.toString() << "writeConcern" + << BSON("w" + << "majority")); + + BSONObj voteCmdResponse; + try { + voteCmdResponse = replCoord->runCmdOnPrimaryAndAwaitResponse( + opCtx, "admin", voteCmdRequest, onRemoteCmdScheduled, onRemoteCmdComplete); + } catch (DBException& ex) { + if (ex.isA<ErrorCategory::ShutdownError>() || + ex.isA<ErrorCategory::CancelationError>()) { + // This includes error like ErrorCodes::CallbackCanceled, + // ErrorCodes::ShutdownInProgress We might have either received + // ErrorCodes::CallbackCanceled due to rollback or shutdown. converting the status + // to non-fatal + uassertStatusOK(convertToNonFatalStatus(ex.toStatus())); + } + // All other error including network errors should be retried. + continue; + } + + // Command error and write concern error have to be retried. + if (_checkVoteCommitIndexCmdSucceeded(voteCmdResponse)) { + break; + } + } + return; +} + +Timestamp IndexBuildsCoordinatorMongod::_waitForNextIndexBuildAction( + OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) { + Timestamp commitIndexBuildTimestamp; + + invariant(replState->protocol == IndexBuildProtocol::kTwoPhase); + + // standalones doesn't need to wait for commit or abort index build oplog entry. + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->getSettings().usingReplSets()) { + return commitIndexBuildTimestamp; + }; + + // Yield locks and storage engine resources before blocking. + opCtx->recoveryUnit()->abandonSnapshot(); + Lock::TempRelease release(opCtx->lockState()); + + LOGV2(3856203, + "Index build waiting for next action before completing final phase: {buildUUID}", + "buildUUID"_attr = replState->buildUUID); + + while (true) { + // Future wait should ignore state transition. + invariant(!opCtx->lockState()->isRSTLLocked(), + str::stream() + << "failed to yield locks for index build while waiting for commit or abort: " + << replState->buildUUID); + + // future wait should get interrupted if the node shutdowns. + const auto nextAction = replState->waitForNextAction->getFuture().get(opCtx); + LOGV2(3856204, + "Index build received signal for build uuid: {buildUUID} , action: {action}", + "buildUUID"_attr = replState->buildUUID, + "action"_attr = _indexBuildActionToString(nextAction)); + + bool needsToRetryWait = false; + + // Reacquire RSTL lock + repl::ReplicationStateTransitionLockGuard rstl(opCtx, MODE_IX); + const NamespaceStringOrUUID dbAndUUID(replState->dbName, replState->collectionUUID); + auto isMaster = replCoord->canAcceptWritesFor(opCtx, dbAndUUID); + + stdx::unique_lock<Latch> lk(replState->mutex); + switch (nextAction) { + case IndexBuildAction::kOplogCommit: + // Sanity check + // This signal can be received during primary (drain phase), secondary, + // startup( startup recovery) and startup2 (initial sync). + invariant(!isMaster && replState->indexBuildState.isCommitPrepared(), + str::stream() + << "Index build: " << replState->buildUUID + << ", index build state: " << replState->indexBuildState.toString()); + invariant(replState->indexBuildState.getTimestamp(), + replState->buildUUID.toString()); + // set the commit timestamp + commitIndexBuildTimestamp = replState->indexBuildState.getTimestamp().get(); + LOGV2(3856205, + "Committing index build", + "buildUUID"_attr = replState->buildUUID, + "commitTimestamp"_attr = replState->indexBuildState.getTimestamp().get(), + "collectionUUID"_attr = replState->collectionUUID); + break; + case IndexBuildAction::kOplogAbort: + // Sanity check + // This signal can be received during primary (drain phase), secondary, + // startup( startup recovery) and startup2 (initial sync). + invariant(!isMaster && replState->indexBuildState.isAbortPrepared(), + str::stream() + << "Index build: " << replState->buildUUID + << ", index build state: " << replState->indexBuildState.toString()); + invariant(replState->indexBuildState.getTimestamp() && + replState->indexBuildState.getAbortReason(), + replState->buildUUID.toString()); + LOGV2(3856206, + "Aborting index build", + "buildUUID"_attr = replState->buildUUID, + "abortTimestamp"_attr = replState->indexBuildState.getTimestamp().get(), + "abortReason"_attr = replState->indexBuildState.getAbortReason().get(), + "collectionUUID"_attr = replState->collectionUUID); + case IndexBuildAction::kRollbackAbort: + // Currently, We abort the index build before transitioning the state to rollback. + // So, we can check if the node state is rollback. + break; + case IndexBuildAction::kPrimaryAbort: + // There are chances when the index build got aborted, it only existed in the + // coordinator, So, we missed marking the index build aborted on manager. So, it's + // important, we exit from here if we are still primary. Otherwise, the index build + // gets committed, though our index build was marked aborted. + if (isMaster) { + uassertStatusOK(Status( + ErrorCodes::IndexBuildAborted, + str::stream() + << "Index build aborted for index build: " << replState->buildUUID + << " , abort reason:" + << replState->indexBuildState.getAbortReason().get_value_or(""))); + } + case IndexBuildAction::kCommitQuorumSatisfied: + if (!isMaster) { + // Reset the promise as the node has stepped down, + // wait for the new primary to coordinate the index build and send the new + // signal/action. + LOGV2(3856207, + "No longer primary, so will be waiting again for next action before " + "completing final phase: {buildUUID}", + "buildUUID"_attr = replState->buildUUID); + replState->waitForNextAction = + std::make_unique<SharedPromise<IndexBuildAction>>(); + needsToRetryWait = true; + } + break; + default: + MONGO_UNREACHABLE; + } + + if (!needsToRetryWait) { + break; + } + } + return commitIndexBuildTimestamp; } Status IndexBuildsCoordinatorMongod::setCommitQuorum(OperationContext* opCtx, diff --git a/src/mongo/db/index_builds_coordinator_mongod.h b/src/mongo/db/index_builds_coordinator_mongod.h index 20691de7751..46ce97a3c8c 100644 --- a/src/mongo/db/index_builds_coordinator_mongod.h +++ b/src/mongo/db/index_builds_coordinator_mongod.h @@ -78,7 +78,9 @@ public: IndexBuildProtocol protocol, IndexBuildOptions indexBuildOptions) override; - Status voteCommitIndexBuild(const UUID& buildUUID, const HostAndPort& hostAndPort) override; + Status voteCommitIndexBuild(OperationContext* opCtx, + const UUID& buildUUID, + const HostAndPort& hostAndPort) override; Status setCommitQuorum(OperationContext* opCtx, const NamespaceString& nss, @@ -123,6 +125,20 @@ private: */ void _refreshReplStateFromPersisted(OperationContext* opCtx, const UUID& buildUUID); + /** + * Process voteCommitIndexBuild command's response. + */ + bool _checkVoteCommitIndexCmdSucceeded(const BSONObj& response); + + void _signalIfCommitQuorumIsSatisfied(OperationContext* opCtx, + std::shared_ptr<ReplIndexBuildState> replState); + + void _signalPrimaryForCommitReadiness(OperationContext* opCtx, + std::shared_ptr<ReplIndexBuildState> replState) override; + + Timestamp _waitForNextIndexBuildAction(OperationContext* opCtx, + std::shared_ptr<ReplIndexBuildState> replState) override; + // Thread pool on which index builds are run. ThreadPool _threadPool; }; diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index e75669a01cb..377ef343e8a 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -34,6 +34,7 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_options.h" +#include "mongo/db/catalog/commit_quorum_options.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/rollback.h" #include "mongo/db/s/collection_sharding_state.h" @@ -99,6 +100,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) = 0; /** diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 30f460db2a7..f458b0f6477 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -273,12 +273,15 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx, CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) { BSONObjBuilder oplogEntryBuilder; oplogEntryBuilder.append("startIndexBuild", nss.coll()); indexBuildUUID.appendToBuilder(&oplogEntryBuilder, "indexBuildUUID"); + commitQuorum.appendToBuilder(CommitQuorumOptions::kCommitQuorumField, &oplogEntryBuilder); + BSONArrayBuilder indexesArr(oplogEntryBuilder.subarrayStart("indexes")); for (auto indexDoc : indexes) { indexesArr.append(indexDoc); diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 51d6ebd6c99..9019a253acd 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -52,6 +52,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) final; void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final; diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index e197b34b70c..b66303045a6 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -144,8 +144,9 @@ TEST_F(OpObserverTest, StartIndexBuildExpectedOplogEntry) { { AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); + CommitQuorumOptions commitQuorum(1); opObserver.onStartIndexBuild( - opCtx.get(), nss, uuid, indexBuildUUID, specs, false /*fromMigrate*/); + opCtx.get(), nss, uuid, indexBuildUUID, specs, commitQuorum, false /*fromMigrate*/); wunit.commit(); } @@ -153,6 +154,7 @@ TEST_F(OpObserverTest, StartIndexBuildExpectedOplogEntry) { BSONObjBuilder startIndexBuildBuilder; startIndexBuildBuilder.append("startIndexBuild", nss.coll()); indexBuildUUID.appendToBuilder(&startIndexBuildBuilder, "indexBuildUUID"); + startIndexBuildBuilder.append("commitQuorum", 1); BSONArrayBuilder indexesArr(startIndexBuildBuilder.subarrayStart("indexes")); indexesArr.append(specX); indexesArr.append(specA); diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index 5c5816e7c0b..0a1542ea50a 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -46,6 +46,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) override {} void onStartIndexBuildSinglePhase(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 2470baa74cc..ec7fe1908c1 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -73,10 +73,12 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) override { ReservedTimes times{opCtx}; for (auto& o : _observers) { - o->onStartIndexBuild(opCtx, nss, collUUID, indexBuildUUID, indexes, fromMigrate); + o->onStartIndexBuild( + opCtx, nss, collUUID, indexBuildUUID, indexes, commitQuorum, fromMigrate); } } diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index e10b9cf4d33..11db337f35c 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -41,6 +41,7 @@ #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/split_horizon.h" #include "mongo/db/repl/sync_source_selector.h" +#include "mongo/executor/task_executor.h" #include "mongo/rpc/topology_version_gen.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -1007,6 +1008,28 @@ public: */ virtual HostAndPort getCurrentPrimaryHostAndPort() const = 0; + /* + * Cancels the callback referenced in the given callback handle. + * This function expects the activeHandle to be valid. + */ + virtual void cancelCbkHandle(executor::TaskExecutor::CallbackHandle activeHandle) = 0; + + using OnRemoteCmdScheduledFn = std::function<void(executor::TaskExecutor::CallbackHandle)>; + using OnRemoteCmdCompleteFn = std::function<void(executor::TaskExecutor::CallbackHandle)>; + /** + * Runs the given command 'cmdObj' on primary and waits till the response for that command is + * received. If the node is primary, then the command will be executed using DBDirectClient to + * avoid tcp network calls. Otherwise, the node will execute the remote command using the repl + * task executor (AsyncDBClient). + * - 'OnRemoteCmdScheduled' will be called once the remote command is scheduled. + * - 'OnRemoteCmdComplete' will be called once the response for the remote command is received. + */ + virtual BSONObj runCmdOnPrimaryAndAwaitResponse(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + OnRemoteCmdScheduledFn onRemoteCmdScheduled, + OnRemoteCmdCompleteFn onRemoteCmdComplete) = 0; + protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 1f460a3fdbc..235128a2656 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -54,6 +54,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/replication_state_transition_lock_guard.h" #include "mongo/db/curop_failpoint_helpers.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/kill_sessions_local.h" @@ -2105,6 +2106,90 @@ HostAndPort ReplicationCoordinatorImpl::getCurrentPrimaryHostAndPort() const { return primary ? primary->getHostAndPort() : HostAndPort(); } +void ReplicationCoordinatorImpl::cancelCbkHandle(CallbackHandle activeHandle) { + _replExecutor->cancel(activeHandle); +} + +BSONObj ReplicationCoordinatorImpl::_runCmdOnSelfOnAlternativeClient(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) { + + auto client = opCtx->getServiceContext()->makeClient("DBDirectClientCmd"); + // We want the command's opCtx that gets executed via DBDirectClient to be interruptible + // so that we don't block state transitions. Callers of this function might run opCtx + // in an uninterruptible mode. To be on safer side, run the command in AlternativeClientRegion, + // to make sure that the command's opCtx is interruptible. + AlternativeClientRegion acr(client); + auto uniqueNewOpCtx = cc().makeOperationContext(); + { + stdx::lock_guard<Client> lk(cc()); + cc().setSystemOperationKillable(lk); + } + + DBDirectClient dbClient(uniqueNewOpCtx.get()); + const auto commandResponse = dbClient.runCommand(OpMsgRequest::fromDBAndBody(dbName, cmdObj)); + + return commandResponse->getCommandReply(); +} + +BSONObj ReplicationCoordinatorImpl::runCmdOnPrimaryAndAwaitResponse( + OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + OnRemoteCmdScheduledFn onRemoteCmdScheduled, + OnRemoteCmdCompleteFn onRemoteCmdComplete) { + // Sanity check + invariant(!opCtx->lockState()->isRSTLLocked()); + + repl::ReplicationStateTransitionLockGuard rstl(opCtx, MODE_IX); + + if (getMemberState().primary()) { + if (canAcceptWritesForDatabase(opCtx, dbName)) { + // Run command using DBDirectClient to avoid tcp connection. + return _runCmdOnSelfOnAlternativeClient(opCtx, dbName, cmdObj); + } + // Node is primary but it's not in a state to accept non-local writes because it might be in + // the catchup or draining phase. So, try releasing and reacquiring RSTL lock so that we + // give chance for the node to finish executing signalDrainComplete() and become master. + uassertStatusOK( + Status{ErrorCodes::NotMaster, "Node is in primary state but can't accept writes."}); + } + + // Node is not primary, so we will run the remote command via AsyncDBClient. To use + // AsyncDBClient, we will be using repl task executor. + const auto primary = getCurrentPrimaryHostAndPort(); + if (primary.empty()) { + uassertStatusOK(Status{ErrorCodes::CommandFailed, "Primary is unknown/down."}); + } + + executor::RemoteCommandRequest request(primary, dbName, cmdObj, nullptr); + executor::RemoteCommandResponse cbkResponse( + Status{ErrorCodes::InternalError, "Uninitialized value"}); + + // Schedule the remote command. + auto&& scheduleResult = _replExecutor->scheduleRemoteCommand( + request, [&cbkResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbk) { + cbkResponse = cbk.response; + }); + + uassertStatusOK(scheduleResult.getStatus()); + CallbackHandle cbkHandle = scheduleResult.getValue(); + + onRemoteCmdScheduled(cbkHandle); + // Before, we wait for the remote command response, it's important we release the rstl lock to + // ensure that the state transition can happen during the wait period. Else, it can lead to + // deadlock. Consider a case, where the remote command waits for majority write concern. But, we + // are not able to transition our state to secondary (steady state replication). + rstl.release(); + + // Wait for the response in an interruptible mode. + _replExecutor->wait(cbkHandle, opCtx); + + onRemoteCmdComplete(cbkHandle); + uassertStatusOK(cbkResponse.status); + return cbkResponse.data; +} + void ReplicationCoordinatorImpl::_killConflictingOpsOnStepUpAndStepDown( AutoGetRstlForStepUpStepDown* arsc, ErrorCodes::Error reason) { const OperationContext* rstlOpCtx = arsc->getOpCtx(); @@ -2644,7 +2729,11 @@ int ReplicationCoordinatorImpl::getMyId() const { } HostAndPort ReplicationCoordinatorImpl::getMyHostAndPort() const { - stdx::lock_guard<Latch> lock(_mutex); + stdx::unique_lock<Latch> lk(_mutex); + + if (_selfIndex == -1) { + return HostAndPort(); + } return _rsConfig.getMemberAt(_selfIndex).getHostAndPort(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index a45bc225ff0..b7b474fc786 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -364,6 +364,14 @@ public: virtual HostAndPort getCurrentPrimaryHostAndPort() const override; + void cancelCbkHandle(executor::TaskExecutor::CallbackHandle activeHandle) override; + + BSONObj runCmdOnPrimaryAndAwaitResponse(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + OnRemoteCmdScheduledFn onRemoteCmdScheduled, + OnRemoteCmdCompleteFn onRemoteCmdComplete) override; + // ================== Test support API =================== /** @@ -1385,6 +1393,13 @@ private: */ int64_t _nextRandomInt64_inlock(int64_t limit); + /** + * Runs the command using DBDirectClient and returns the response received for that command. + */ + BSONObj _runCmdOnSelfOnAlternativeClient(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj); + // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index f6983bcfe68..9a9fe7dc7f3 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -603,5 +603,18 @@ HostAndPort ReplicationCoordinatorMock::getCurrentPrimaryHostAndPort() const { return HostAndPort(); } +void ReplicationCoordinatorMock::cancelCbkHandle( + executor::TaskExecutor::CallbackHandle activeHandle) { + MONGO_UNREACHABLE; +} + +BSONObj ReplicationCoordinatorMock::runCmdOnPrimaryAndAwaitResponse( + OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + OnRemoteCmdScheduledFn onRemoteCmdScheduled, + OnRemoteCmdCompleteFn onRemoteCmdComplete) { + return BSON("ok" << 1); +} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index ddecc044a53..ad82eb065ce 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -343,6 +343,13 @@ public: virtual HostAndPort getCurrentPrimaryHostAndPort() const override; + void cancelCbkHandle(executor::TaskExecutor::CallbackHandle activeHandle) override; + BSONObj runCmdOnPrimaryAndAwaitResponse(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + OnRemoteCmdScheduledFn onRemoteCmdScheduled, + OnRemoteCmdCompleteFn onRemoteCmdComplete) override; + private: ServiceContext* const _service; ReplSettings _settings; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index 0cdebd3464e..480a233c240 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -503,5 +503,19 @@ HostAndPort ReplicationCoordinatorNoOp::getCurrentPrimaryHostAndPort() const { MONGO_UNREACHABLE; } +void ReplicationCoordinatorNoOp::cancelCbkHandle( + executor::TaskExecutor::CallbackHandle activeHandle) { + MONGO_UNREACHABLE; +} + +BSONObj ReplicationCoordinatorNoOp::runCmdOnPrimaryAndAwaitResponse( + OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + OnRemoteCmdScheduledFn onRemoteCmdScheduled, + OnRemoteCmdCompleteFn onRemoteCmdComplete) { + MONGO_UNREACHABLE; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index 329d5b5fe14..452a96161c7 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -197,7 +197,6 @@ public: Status checkReplEnabledForCommand(BSONObjBuilder*) final; - HostAndPort chooseNewSyncSource(const OpTime&) final; void blacklistSyncSource(const HostAndPort&, Date_t) final; @@ -279,6 +278,14 @@ public: HostAndPort getCurrentPrimaryHostAndPort() const override; + void cancelCbkHandle(executor::TaskExecutor::CallbackHandle activeHandle) override; + + BSONObj runCmdOnPrimaryAndAwaitResponse(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + OnRemoteCmdScheduledFn onRemoteCmdScheduled, + OnRemoteCmdCompleteFn onRemoteCmdComplete) override; + private: ServiceContext* const _service; }; diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 0cf86695206..b24e22bb12b 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -101,7 +101,7 @@ OplogInterfaceMock::Operation makeStartIndexBuildOplogEntry(Collection* collecti BSONObj spec, int time) { auto entry = BSON("startIndexBuild" << collection->ns().coll() << "indexBuildUUID" << buildUUID - << "indexes" << BSON_ARRAY(spec)); + << "indexes" << BSON_ARRAY(spec) << "commitQuorum" << 1); return std::make_pair(BSON("ts" << Timestamp(Seconds(time), 0) << "op" << "c" @@ -1061,7 +1061,7 @@ TEST_F(RSRollbackTest, RollbackCommitIndexBuild) { // Kill the index build we just restarted so the fixture can shut down. IndexBuildsCoordinator::get(_opCtx.get()) - ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, Timestamp(), ""); + ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, IndexBuildAction::kRollbackAbort); } TEST_F(RSRollbackTest, RollbackAbortIndexBuild) { @@ -1105,7 +1105,7 @@ TEST_F(RSRollbackTest, RollbackAbortIndexBuild) { // Kill the index build we just restarted so the fixture can shut down. IndexBuildsCoordinator::get(_opCtx.get()) - ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, Timestamp(), ""); + ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, IndexBuildAction::kRollbackAbort); } TEST_F(RSRollbackTest, AbortedIndexBuildsAreRestarted) { @@ -1154,7 +1154,7 @@ TEST_F(RSRollbackTest, AbortedIndexBuildsAreRestarted) { // Kill the index build we just restarted so the fixture can shut down. IndexBuildsCoordinator::get(_opCtx.get()) - ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, Timestamp(), ""); + ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, IndexBuildAction::kRollbackAbort); } TEST_F(RSRollbackTest, AbortedIndexBuildsAreNotRestartedWhenStartIsRolledBack) { diff --git a/src/mongo/db/repl_index_build_state.h b/src/mongo/db/repl_index_build_state.h index e75ca9ee41c..f5f7023b660 100644 --- a/src/mongo/db/repl_index_build_state.h +++ b/src/mongo/db/repl_index_build_state.h @@ -39,6 +39,7 @@ #include "mongo/db/catalog/commit_quorum_options.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/namespace_string.h" +#include "mongo/executor/task_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/future.h" #include "mongo/util/net/hostandport.h" @@ -60,6 +61,152 @@ enum class IndexBuildProtocol { kTwoPhase }; +// Indicates the type of abort or commit signal that will be received by primary and secondaries. +enum class IndexBuildAction { + /** + * Does nothing. And, we set on shutdown. + */ + kNoAction, + /** + * Commit signal set by oplog applier. + */ + kOplogCommit, + /** + * Abort signal set by oplog applier. + */ + kOplogAbort, + /** + * Abort signal set on rollback. + */ + kRollbackAbort, + /** + * Abort signal set by createIndexes cmd or by drop databases/collections/indexes cmds + */ + kPrimaryAbort, + /** + * Commit signal set by "voteCommitIndexBuild" cmd and step up. + */ + kCommitQuorumSatisfied +}; + +/** + * Represents the index build state. + * Valid State transition for primary: + * =================================== + * kNone ---> kAborted. + * kNone ---> kPrepareAbort ---> kAborted. + * + * Valid State transition for secondaries: + * ======================================= + * kNone ---> kPrepareCommit ---> kCommitted. + * kNone ---> kPrepareAbort ---> kAborted. + */ +class IndexBuildState { +public: + enum StateFlag { + kNone = 1 << 0, + /** + * Below state indicates that indexBuildscoordinator thread was externally asked either to + * commit or abort. Oplog applier, rollback, createIndexes command and drop + * databases/collections/indexes cmds can change this state to kPrepareCommit or + * kPrepareAbort. + */ + kPrepareCommit = 1 << 1, + kPrepareAbort = 1 << 2, + /** + * Below state indicates that index build was successfully able to commit or abort. And, + * it's yet to generate the commitIndexBuild or abortIndexBuild oplog entry respectively. + */ + kCommitted = 1 << 3, + kAborted = 1 << 4 + }; + + using StateSet = int; + bool isSet(StateSet stateSet) const { + return _state & stateSet; + } + + bool checkIfValidTransition(StateFlag newState) { + if (_state == kNone || (_state == kPrepareCommit && newState == kCommitted) || + (_state == kPrepareAbort && newState == kAborted)) { + return true; + } + return false; + } + + void setState(StateFlag state, + bool skipCheck, + boost::optional<Timestamp> timestamp = boost::none, + boost::optional<std::string> abortReason = boost::none) { + if (!skipCheck) { + invariant(checkIfValidTransition(state), + str::stream() << "current state :" << toString(_state) + << ", new state: " << toString(state)); + } + _state = state; + if (timestamp) + _timestamp = timestamp; + if (abortReason) { + invariant(_state == kPrepareAbort); + _abortReason = abortReason; + } + } + + bool isCommitPrepared() const { + return _state == kPrepareCommit; + } + + bool isCommitted() const { + return _state == kCommitted; + } + + bool isAbortPrepared() const { + return _state == kPrepareAbort; + } + + bool isAborted() const { + return _state == kAborted; + } + + boost::optional<Timestamp> getTimestamp() const { + return _timestamp; + } + + boost::optional<std::string> getAbortReason() const { + return _abortReason; + } + + std::string toString() const { + return toString(_state); + } + + static std::string toString(StateFlag state) { + switch (state) { + case kNone: + return "in-progress"; + case kPrepareCommit: + return "Prepare commit"; + case kCommitted: + return "Committed"; + case kPrepareAbort: + return "Prepare abort"; + case kAborted: + return "Aborted"; + } + MONGO_UNREACHABLE; + } + +private: + // Represents the index build state. + StateFlag _state = kNone; + // Timestamp will be populated only if the node is secondary. + // It represents the commit or abort timestamp communicated via + // commitIndexBuild and abortIndexBuild oplog entry. + boost::optional<Timestamp> _timestamp; + // Reason for abort reason. + boost::optional<std::string> _abortReason; +}; + /** * Tracks the cross replica set progress of a particular index build identified by a build UUID. * @@ -80,7 +227,11 @@ struct ReplIndexBuildState { indexNames(extractIndexNames(specs)), indexSpecs(specs), protocol(protocol), - commitQuorum(commitQuorum) {} + commitQuorum(commitQuorum) { + if (IndexBuildProtocol::kTwoPhase == protocol) { + waitForNextAction = std::make_unique<SharedPromise<IndexBuildAction>>(); + } + } // Uniquely identifies this index build across replica set members. const UUID buildUUID; @@ -127,20 +278,14 @@ struct ReplIndexBuildState { // SharedSemiFuture(s). SharedPromise<IndexCatalogStats> sharedPromise; - // Set to true on a secondary on receipt of a commitIndexBuild oplog entry. - bool isCommitReady = false; - Timestamp commitTimestamp; + // Primary and secondaries gets their commit or abort signal via this promise future pair. + std::unique_ptr<SharedPromise<IndexBuildAction>> waitForNextAction; - // There is a period of time where the index build is registered on the coordinator, but an - // index builder does not yet exist. Since a signal cannot be set on the index builder at that - // time, it must be saved here. - bool aborted = false; - Timestamp abortTimestamp; - std::string abortReason = ""; + // Maintains the state of the index build. + IndexBuildState indexBuildState; - // The coordinator for the index build will wait upon this when awaiting an external signal, - // such as commit or commit readiness signals. - stdx::condition_variable condVar; + // Represents the callback handle for scheduled remote command "voteCommitIndexBuild". + executor::TaskExecutor::CallbackHandle voteCmdCbkHandle; private: std::vector<std::string> extractIndexNames(const std::vector<BSONObj>& specs) { diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index d274dfef21a..40b24d10bd6 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -56,6 +56,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) override {} void onStartIndexBuildSinglePhase(OperationContext* opCtx, diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 939125ba75f..476098424d0 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -471,6 +471,7 @@ void ShardServerOpObserver::onStartIndexBuild(OperationContext* opCtx, CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) { abortOngoingMigrationIfNeeded(opCtx, nss); }; diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 2b16ac88150..f3e659b437d 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -57,6 +57,7 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, + const CommitQuorumOptions& commitQuorum, bool fromMigrate) override; void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) override; diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 485a1ae4f4b..520a2229470 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -430,13 +430,21 @@ env.Library( '$BUILD_DIR/mongo/db/server_options_core', ], ) +env.Library( + target='two_phase_index_build_knobs_idl', + source=[ + env.Idlc('two_phase_index_build_knobs.idl')[0], + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/idl/server_parameter', + ], +) env.Library( target='storage_engine_impl', source=[ 'storage_engine_impl.cpp', 'kv/temporary_kv_record_store.cpp', # TODO: SERVER-41892 Avoid source under kv sub-directory - env.Idlc('two_phase_index_build_knobs.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -451,6 +459,7 @@ env.Library( '$BUILD_DIR/mongo/db/storage/storage_repair_observer', '$BUILD_DIR/mongo/db/catalog/collection_catalog_helper', '$BUILD_DIR/mongo/idl/server_parameter', + 'two_phase_index_build_knobs_idl', ], ) diff --git a/src/mongo/db/storage/two_phase_index_build_knobs.idl b/src/mongo/db/storage/two_phase_index_build_knobs.idl index 1b29ea40481..d23c7670603 100644 --- a/src/mongo/db/storage/two_phase_index_build_knobs.idl +++ b/src/mongo/db/storage/two_phase_index_build_knobs.idl @@ -40,9 +40,9 @@ server_parameters: default: true # Commit quorum option is supported only for two phase index builds. - enableIndexBuildMajorityCommitQuorum: - description: "Support for using majority commit quorum for two phase index builds." + enableIndexBuildCommitQuorum: + description: "Support for using commit quorum for two phase index builds." set_at: startup cpp_vartype: bool - cpp_varname: "enableIndexBuildMajorityCommitQuorum" + cpp_varname: "enableIndexBuildCommitQuorum" default: false |