summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSuganthi Mani <suganthi.mani@mongodb.com>2020-02-28 15:54:31 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-02 08:02:54 +0000
commit4628f264e60fd69cd09388e6fca0d3dd1b82f14c (patch)
treebca64008b3010bfb3500c2e6b4dabfb28d5d8862 /src/mongo/db
parenta5582fa42435116ab05efcabddf17219fbd573d6 (diff)
downloadmongo-4628f264e60fd69cd09388e6fca0d3dd1b82f14c.tar.gz
SERVER-39071 Implements commit quorum for two phase index builds.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript5
-rw-r--r--src/mongo/db/auth/auth_op_observer.h1
-rw-r--r--src/mongo/db/catalog/SConscript3
-rw-r--r--src/mongo/db/catalog/commit_quorum.idl2
-rw-r--r--src/mongo/db/catalog/commit_quorum_options.cpp8
-rw-r--r--src/mongo/db/catalog/commit_quorum_options.h2
-rw-r--r--src/mongo/db/catalog/drop_collection.cpp1
-rw-r--r--src/mongo/db/catalog/drop_database.cpp3
-rw-r--r--src/mongo/db/catalog/drop_indexes.cpp4
-rw-r--r--src/mongo/db/catalog/index_build_oplog_entry.cpp21
-rw-r--r--src/mongo/db/catalog/index_build_oplog_entry.h2
-rw-r--r--src/mongo/db/catalog/rename_collection_test.cpp5
-rw-r--r--src/mongo/db/cloner.cpp12
-rw-r--r--src/mongo/db/collection_index_builds_tracker.cpp4
-rw-r--r--src/mongo/db/collection_index_builds_tracker.h2
-rw-r--r--src/mongo/db/commands/SConscript6
-rw-r--r--src/mongo/db/commands/create_indexes.cpp38
-rw-r--r--src/mongo/db/commands/mr_test.cpp2
-rw-r--r--src/mongo/db/commands/vote_commit_index_build_command.cpp35
-rw-r--r--src/mongo/db/database_index_builds_tracker.cpp4
-rw-r--r--src/mongo/db/database_index_builds_tracker.h2
-rw-r--r--src/mongo/db/dbhelpers.cpp12
-rw-r--r--src/mongo/db/dbhelpers.h12
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h1
-rw-r--r--src/mongo/db/index_build_entry_helpers.cpp89
-rw-r--r--src/mongo/db/index_build_entry_helpers.h22
-rw-r--r--src/mongo/db/index_builds_coordinator.cpp500
-rw-r--r--src/mongo/db/index_builds_coordinator.h90
-rw-r--r--src/mongo/db/index_builds_coordinator_mongod.cpp375
-rw-r--r--src/mongo/db/index_builds_coordinator_mongod.h18
-rw-r--r--src/mongo/db/op_observer.h2
-rw-r--r--src/mongo/db/op_observer_impl.cpp3
-rw-r--r--src/mongo/db/op_observer_impl.h1
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp4
-rw-r--r--src/mongo/db/op_observer_noop.h1
-rw-r--r--src/mongo/db/op_observer_registry.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator.h23
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp91
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h15
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp13
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp14
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h9
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp8
-rw-r--r--src/mongo/db/repl_index_build_state.h171
-rw-r--r--src/mongo/db/s/config_server_op_observer.h1
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp1
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h1
-rw-r--r--src/mongo/db/storage/SConscript11
-rw-r--r--src/mongo/db/storage/two_phase_index_build_knobs.idl6
50 files changed, 1396 insertions, 271 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index f959c2520ae..131cfc1a228 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -765,6 +765,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'transaction',
'$BUILD_DIR/mongo/db/commands/mongod_fcv',
+ "$BUILD_DIR/mongo/db/catalog/commit_quorum_options",
],
)
@@ -826,7 +827,10 @@ env.Library(
'db_raii',
'index_build_entry_helpers',
'$BUILD_DIR/mongo/db/catalog/collection_catalog',
+ '$BUILD_DIR/mongo/db/catalog/index_build_entry_idl',
'$BUILD_DIR/mongo/db/catalog/index_timestamp_helper',
+ "$BUILD_DIR/mongo/executor/task_executor_interface",
+ "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl",
],
)
@@ -857,6 +861,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/timestamp_block',
'$BUILD_DIR/mongo/db/s/sharding_api_d',
'$BUILD_DIR/mongo/util/fail_point',
+ "$BUILD_DIR/mongo/executor/task_executor_interface",
],
)
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index 47293e12e09..96e09b4d028 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -56,6 +56,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) final {}
void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {}
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index ab0b9549256..6f14a2acd73 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -134,6 +134,9 @@ env.Library(
"$BUILD_DIR/mongo/db/repl/oplog_entry",
"$BUILD_DIR/mongo/rpc/command_status",
],
+ LIBDEPS_PRIVATE=[
+ 'commit_quorum_options',
+ ],
)
env.Library(
diff --git a/src/mongo/db/catalog/commit_quorum.idl b/src/mongo/db/catalog/commit_quorum.idl
index 92fac200b7b..a7a03c07f7d 100644
--- a/src/mongo/db/catalog/commit_quorum.idl
+++ b/src/mongo/db/catalog/commit_quorum.idl
@@ -37,5 +37,5 @@ types:
description: "CommitQuorumOptions defines the required quorum for the index builds to
commit."
cpp_type: "mongo::CommitQuorumOptions"
- serializer: "mongo::CommitQuorumOptions::append"
+ serializer: "mongo::CommitQuorumOptions::appendToBuilder"
deserializer: "mongo::CommitQuorumOptions::deserializerForIDL"
diff --git a/src/mongo/db/catalog/commit_quorum_options.cpp b/src/mongo/db/catalog/commit_quorum_options.cpp
index 254682e9eea..f04eeb5c96c 100644
--- a/src/mongo/db/catalog/commit_quorum_options.cpp
+++ b/src/mongo/db/catalog/commit_quorum_options.cpp
@@ -80,15 +80,15 @@ CommitQuorumOptions CommitQuorumOptions::deserializerForIDL(
BSONObj CommitQuorumOptions::toBSON() const {
BSONObjBuilder builder;
- append(kCommitQuorumField, &builder);
+ appendToBuilder(kCommitQuorumField, &builder);
return builder.obj();
}
-void CommitQuorumOptions::append(StringData fieldName, BSONObjBuilder* builder) const {
+void CommitQuorumOptions::appendToBuilder(StringData fieldName, BSONObjBuilder* builder) const {
if (mode.empty()) {
- builder->append(kCommitQuorumField, numNodes);
+ builder->append(fieldName, numNodes);
} else {
- builder->append(kCommitQuorumField, mode);
+ builder->append(fieldName, mode);
}
}
diff --git a/src/mongo/db/catalog/commit_quorum_options.h b/src/mongo/db/catalog/commit_quorum_options.h
index 1dfc5bf8369..784332070b0 100644
--- a/src/mongo/db/catalog/commit_quorum_options.h
+++ b/src/mongo/db/catalog/commit_quorum_options.h
@@ -78,7 +78,7 @@ public:
BSONObj toBSON() const;
// Appends the BSON representation of this object.
- void append(StringData fieldName, BSONObjBuilder* builder) const;
+ void appendToBuilder(StringData fieldName, BSONObjBuilder* builder) const;
// The 'commitQuorum' parameter to define the required quorum for the index builds to commit.
// The 'mode' represents the string format and takes precedence over the number format
diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp
index 4c43cfff485..60f5ae79b57 100644
--- a/src/mongo/db/catalog/drop_collection.cpp
+++ b/src/mongo/db/catalog/drop_collection.cpp
@@ -161,6 +161,7 @@ Status _abortIndexBuildsAndDropCollection(OperationContext* opCtx,
while (true) {
// Send the abort signal to any active index builds on the collection.
indexBuildsCoord->abortCollectionIndexBuildsNoWait(
+ opCtx,
collectionUUID,
str::stream() << "Collection " << coll->ns() << "(" << collectionUUID
<< ") is being dropped");
diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp
index 7d6f01eb9dd..64681932fd1 100644
--- a/src/mongo/db/catalog/drop_database.cpp
+++ b/src/mongo/db/catalog/drop_database.cpp
@@ -177,7 +177,8 @@ Status _dropDatabase(OperationContext* opCtx, const std::string& dbName, bool ab
// are none left when we retrieve the exclusive database lock again.
while (indexBuildsCoord->inProgForDb(dbName)) {
// Sends the abort signal to all the active index builders for this database.
- indexBuildsCoord->abortDatabaseIndexBuildsNoWait(dbName, "dropDatabase command");
+ indexBuildsCoord->abortDatabaseIndexBuildsNoWait(
+ opCtx, dbName, "dropDatabase command");
// Now that the abort signals were sent out to the active index builders for this
// database, we need to release the lock temporarily to allow those index builders
diff --git a/src/mongo/db/catalog/drop_indexes.cpp b/src/mongo/db/catalog/drop_indexes.cpp
index dc568301a88..ddf2185ab42 100644
--- a/src/mongo/db/catalog/drop_indexes.cpp
+++ b/src/mongo/db/catalog/drop_indexes.cpp
@@ -159,7 +159,7 @@ std::vector<UUID> abortIndexBuildByIndexNamesNoWait(OperationContext* opCtx,
boost::optional<UUID> buildUUID =
IndexBuildsCoordinator::get(opCtx)->abortIndexBuildByIndexNamesNoWait(
- opCtx, collection->uuid(), indexNames, Timestamp(), "dropIndexes command");
+ opCtx, collection->uuid(), indexNames, std::string("dropIndexes command"));
if (buildUUID) {
return {*buildUUID};
}
@@ -220,7 +220,7 @@ std::vector<UUID> abortActiveIndexBuilders(OperationContext* opCtx,
if (indexNames.front() == "*") {
return IndexBuildsCoordinator::get(opCtx)->abortCollectionIndexBuildsNoWait(
- collection->uuid(), "dropIndexes command");
+ opCtx, collection->uuid(), "dropIndexes command");
}
return abortIndexBuildByIndexNamesNoWait(opCtx, collection, indexNames);
diff --git a/src/mongo/db/catalog/index_build_oplog_entry.cpp b/src/mongo/db/catalog/index_build_oplog_entry.cpp
index 4512f43af9b..a89cd0ce6ea 100644
--- a/src/mongo/db/catalog/index_build_oplog_entry.cpp
+++ b/src/mongo/db/catalog/index_build_oplog_entry.cpp
@@ -41,6 +41,7 @@ StatusWith<IndexBuildOplogEntry> IndexBuildOplogEntry::parse(const repl::OplogEn
// {
// < "startIndexBuild" | "commitIndexBuild" | "abortIndexBuild" > : "coll",
// "indexBuildUUID" : <UUID>,
+ // "commitQuorum" : [<int>|<std::string>] // only required for 'startIndexBuild'
// "indexes" : [
// {
// "key" : {
@@ -80,11 +81,30 @@ StatusWith<IndexBuildOplogEntry> IndexBuildOplogEntry::parse(const repl::OplogEn
if (buildUUIDElem.eoo()) {
return {ErrorCodes::BadValue, str::stream() << "Missing required field 'indexBuildUUID'"};
}
+
auto swBuildUUID = UUID::parse(buildUUIDElem);
if (!swBuildUUID.isOK()) {
return swBuildUUID.getStatus().withContext("Error parsing 'indexBuildUUID'");
}
+ boost::optional<CommitQuorumOptions> commitQuorum;
+ if (repl::OplogEntry::CommandType::kStartIndexBuild == commandType) {
+ auto commitQuorumElem = obj.getField(CommitQuorumOptions::kCommitQuorumField);
+ if (commitQuorumElem.eoo()) {
+ return {ErrorCodes::BadValue,
+ str::stream() << "Missing required field '"
+ << CommitQuorumOptions::kCommitQuorumField << "'"};
+ }
+
+ commitQuorum = CommitQuorumOptions();
+ auto status = commitQuorum->parse(commitQuorumElem);
+ if (!status.isOK()) {
+ return status.withContext(str::stream() << "Error parsing '"
+ << CommitQuorumOptions::kCommitQuorumField
+ << "': " << status.reason());
+ }
+ }
+
auto indexesElem = obj.getField("indexes");
if (indexesElem.eoo()) {
return {ErrorCodes::BadValue, str::stream() << "Missing required field 'indexes'"};
@@ -132,6 +152,7 @@ StatusWith<IndexBuildOplogEntry> IndexBuildOplogEntry::parse(const repl::OplogEn
commandType,
commandName.toString(),
swBuildUUID.getValue(),
+ commitQuorum,
indexNames,
indexSpecs,
cause};
diff --git a/src/mongo/db/catalog/index_build_oplog_entry.h b/src/mongo/db/catalog/index_build_oplog_entry.h
index 284afbc669e..1f0d0075965 100644
--- a/src/mongo/db/catalog/index_build_oplog_entry.h
+++ b/src/mongo/db/catalog/index_build_oplog_entry.h
@@ -32,6 +32,7 @@
#include <vector>
#include "mongo/base/status_with.h"
+#include "mongo/db/catalog/commit_quorum_options.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/util/uuid.h"
@@ -48,6 +49,7 @@ public:
repl::OplogEntry::CommandType commandType;
std::string commandName;
UUID buildUUID;
+ boost::optional<CommitQuorumOptions> commitQuorum;
std::vector<std::string> indexNames;
std::vector<BSONObj> indexSpecs;
boost::optional<Status> cause;
diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp
index 26ba9358a98..6aaa602a075 100644
--- a/src/mongo/db/catalog/rename_collection_test.cpp
+++ b/src/mongo/db/catalog/rename_collection_test.cpp
@@ -85,6 +85,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) override;
void onCommitIndexBuild(OperationContext* opCtx,
@@ -179,9 +180,11 @@ void OpObserverMock::onStartIndexBuild(OperationContext* opCtx,
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) {
_logOp(opCtx, nss, "startIndex");
- OpObserverNoop::onStartIndexBuild(opCtx, nss, collUUID, indexBuildUUID, indexes, fromMigrate);
+ OpObserverNoop::onStartIndexBuild(
+ opCtx, nss, collUUID, indexBuildUUID, indexes, commitQuorum, fromMigrate);
}
void OpObserverMock::onCommitIndexBuild(OperationContext* opCtx,
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp
index cfd795a3b52..4f444b9f755 100644
--- a/src/mongo/db/cloner.cpp
+++ b/src/mongo/db/cloner.cpp
@@ -404,8 +404,16 @@ void Cloner::copyIndexes(OperationContext* opCtx,
MultiIndexBlock::OnInitFn onInitFn;
if (opCtx->writesAreReplicated() && buildUUID) {
onInitFn = [&](std::vector<BSONObj>& specs) {
- opObserver->onStartIndexBuild(
- opCtx, to_collection, collection->uuid(), *buildUUID, specs, fromMigrate);
+ // Since, we don't use IndexBuildsCoordinatorMongod thread pool to build indexes,
+ // it's ok to set the commit quorum option as 1. Also, this is currently only get
+ // called in rollback via refetch. So, onStartIndexBuild() call will be a no-op.
+ opObserver->onStartIndexBuild(opCtx,
+ to_collection,
+ collection->uuid(),
+ *buildUUID,
+ specs,
+ CommitQuorumOptions(1),
+ fromMigrate);
return Status::OK();
};
} else {
diff --git a/src/mongo/db/collection_index_builds_tracker.cpp b/src/mongo/db/collection_index_builds_tracker.cpp
index 439a29b9be1..4fd216d0edd 100644
--- a/src/mongo/db/collection_index_builds_tracker.cpp
+++ b/src/mongo/db/collection_index_builds_tracker.cpp
@@ -95,14 +95,16 @@ std::vector<UUID> CollectionIndexBuildsTracker::getIndexBuildUUIDs(WithLock) con
void CollectionIndexBuildsTracker::runOperationOnAllBuilds(
WithLock lk,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::function<void(WithLock,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::shared_ptr<ReplIndexBuildState> replIndexBuildState,
const std::string& reason)> func,
const std::string& reason) noexcept {
for (auto it = _buildStateByBuildUUID.begin(); it != _buildStateByBuildUUID.end(); ++it) {
- func(lk, indexBuildsManager, it->second, reason);
+ func(lk, opCtx, indexBuildsManager, it->second, reason);
}
}
diff --git a/src/mongo/db/collection_index_builds_tracker.h b/src/mongo/db/collection_index_builds_tracker.h
index 234e6ccfcb5..2042be873d5 100644
--- a/src/mongo/db/collection_index_builds_tracker.h
+++ b/src/mongo/db/collection_index_builds_tracker.h
@@ -84,8 +84,10 @@ public:
*/
void runOperationOnAllBuilds(
WithLock,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::function<void(WithLock,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::shared_ptr<ReplIndexBuildState> replIndexBuildState,
const std::string& reason)> func,
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 5211d3854a2..6b8ecff4517 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -322,6 +322,7 @@ env.Library(
'rename_collection_idl',
'test_commands_enabled',
'write_commands_common',
+ "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl",
],
)
@@ -557,7 +558,10 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'map_reduce_agg',
- ]
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl',
+ ],
)
env.CppUnitTest(
diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp
index eb2864eec05..70004a1c10e 100644
--- a/src/mongo/db/commands/create_indexes.cpp
+++ b/src/mongo/db/commands/create_indexes.cpp
@@ -60,6 +60,7 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/storage/two_phase_index_build_knobs_gen.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/compiler.h"
@@ -205,7 +206,9 @@ void appendFinalIndexFieldsToResult(int numIndexesBefore,
result.append(kNoteFieldName, "index already exists");
}
- commitQuorum->append("commitQuorum", &result);
+ // commitQuorum will be populated only when two phase index build is enabled.
+ if (commitQuorum)
+ commitQuorum->appendToBuilder(kCommitQuorumFieldName, &result);
}
@@ -275,21 +278,32 @@ Status validateTTLOptions(OperationContext* opCtx, const BSONObj& cmdObj) {
boost::optional<CommitQuorumOptions> parseAndGetCommitQuorum(OperationContext* opCtx,
const BSONObj& cmdObj) {
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ auto twoPhaseindexBuildEnabled = IndexBuildsCoordinator::supportsTwoPhaseIndexBuild();
+ auto commitQuorumEnabled = (enableIndexBuildCommitQuorum) ? true : false;
if (cmdObj.hasField(kCommitQuorumFieldName)) {
uassert(ErrorCodes::BadValue,
str::stream() << "Standalones can't specify commitQuorum",
replCoord->isReplEnabled());
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "commitQuorum is supported only for two phase index builds with "
+ "majority commit quorum support enabled ",
+ (twoPhaseindexBuildEnabled && commitQuorumEnabled));
CommitQuorumOptions commitQuorum;
uassertStatusOK(commitQuorum.parse(cmdObj.getField(kCommitQuorumFieldName)));
return commitQuorum;
- } else {
+ }
+
+ if (twoPhaseindexBuildEnabled) {
// Retrieve the default commit quorum if one wasn't passed in, which consists of all
// data-bearing nodes.
- int numDataBearingMembers =
- replCoord->isReplEnabled() ? replCoord->getConfig().getNumDataBearingMembers() : 1;
+ int numDataBearingMembers = (replCoord->isReplEnabled() && commitQuorumEnabled)
+ ? replCoord->getConfig().getNumDataBearingMembers()
+ : 1;
return CommitQuorumOptions(numDataBearingMembers);
}
+
+ return boost::none;
}
/**
@@ -625,12 +639,7 @@ bool runCreateIndexesWithCoordinator(OperationContext* opCtx,
// this be a no-op.
// Use a null abort timestamp because the index build will generate its own timestamp
// on cleanup.
- indexBuildsCoord->abortIndexBuildByBuildUUIDNoWait(
- opCtx,
- buildUUID,
- Timestamp(),
- str::stream() << "Index build interrupted: " << buildUUID << ": "
- << interruptionEx.toString());
+ indexBuildsCoord->abortIndexBuildOnError(opCtx, buildUUID, interruptionEx.toStatus());
LOGV2(20443, "Index build aborted: {buildUUID}", "buildUUID"_attr = buildUUID);
throw;
@@ -650,14 +659,7 @@ bool runCreateIndexesWithCoordinator(OperationContext* opCtx,
throw;
}
- // Use a null abort timestamp because the index build will generate a ghost timestamp
- // for the single-phase build on cleanup.
- indexBuildsCoord->abortIndexBuildByBuildUUIDNoWait(
- opCtx,
- buildUUID,
- Timestamp(),
- str::stream() << "Index build interrupted due to change in replication state: "
- << buildUUID << ": " << ex.toString());
+ indexBuildsCoord->abortIndexBuildOnError(opCtx, buildUUID, ex.toStatus());
LOGV2(20446,
"Index build aborted due to NotMaster error: {buildUUID}",
"buildUUID"_attr = buildUUID);
diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp
index d648ee1bdc4..4385b99dad4 100644
--- a/src/mongo/db/commands/mr_test.cpp
+++ b/src/mongo/db/commands/mr_test.cpp
@@ -299,6 +299,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) override;
/**
@@ -354,6 +355,7 @@ void MapReduceOpObserver::onStartIndexBuild(OperationContext* opCtx,
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) {
for (auto&& obj : indexes) {
indexesCreated.push_back(obj.getOwned());
diff --git a/src/mongo/db/commands/vote_commit_index_build_command.cpp b/src/mongo/db/commands/vote_commit_index_build_command.cpp
index cd4ee962525..8c8baedd139 100644
--- a/src/mongo/db/commands/vote_commit_index_build_command.cpp
+++ b/src/mongo/db/commands/vote_commit_index_build_command.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/commands/vote_commit_index_build_gen.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/logv2/log.h"
namespace mongo {
namespace {
@@ -70,19 +71,35 @@ public:
using InvocationBase::InvocationBase;
void typedRun(OperationContext* opCtx) {
- uassertStatusOK(IndexBuildsCoordinator::get(opCtx)->voteCommitIndexBuild(
- request().getCommandParameter(), request().getHostAndPort()));
-
- // Must set the latest op time on the OperationContext in case no write was needed in
- // voteCommitIndexBuild above, i.e. if this command is called several times before it is
- // successful regarding write concern.
- repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
+ auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+
+ const auto& cmd = request();
+ LOGV2_DEBUG(3856208,
+ 1,
+ "Received voteCommitIndexBuild request for index build: {builduuid}, from "
+ "host: {host} ",
+ "builduuid"_attr = cmd.getCommandParameter(),
+ "host"_attr = cmd.getHostAndPort().toString());
+ auto voteStatus = IndexBuildsCoordinator::get(opCtx)->voteCommitIndexBuild(
+ opCtx, cmd.getCommandParameter(), cmd.getHostAndPort());
+
+ // No need to wait for majority write concern if we fail to persist the voter's info.
+ uassertStatusOK(voteStatus);
+
+ auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ // Update the client's last optime to last oplog entry's opTime.
+ // This case can hit only if the member has already voted for that index build. So, to
+ // make sure the voter's info won't be rolled back, we wait for the oplog's last entry's
+ // opTime to be majority replicated.
+ if (lastOpBeforeRun == lastOpAfterRun) {
+ repl::ReplClientInfo::forClient(opCtx->getClient())
+ .setLastOpToSystemLastOpTime(opCtx);
+ }
}
private:
NamespaceString ns() const override {
- MONGO_UNREACHABLE;
- return {};
+ return NamespaceString(request().getDbName(), "");
}
bool supportsWriteConcern() const override {
diff --git a/src/mongo/db/database_index_builds_tracker.cpp b/src/mongo/db/database_index_builds_tracker.cpp
index 313f5fa9a5b..de107e94ecc 100644
--- a/src/mongo/db/database_index_builds_tracker.cpp
+++ b/src/mongo/db/database_index_builds_tracker.cpp
@@ -59,14 +59,16 @@ void DatabaseIndexBuildsTracker::removeIndexBuild(WithLock, const UUID& buildUUI
void DatabaseIndexBuildsTracker::runOperationOnAllBuilds(
WithLock lk,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::function<void(WithLock,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::shared_ptr<ReplIndexBuildState> replIndexBuildState,
const std::string& reason)> func,
const std::string& reason) {
for (auto it = _allIndexBuilds.begin(); it != _allIndexBuilds.end(); ++it) {
- func(lk, indexBuildsManager, it->second, reason);
+ func(lk, opCtx, indexBuildsManager, it->second, reason);
}
}
diff --git a/src/mongo/db/database_index_builds_tracker.h b/src/mongo/db/database_index_builds_tracker.h
index 8b2eb4ea474..603ef1c70a0 100644
--- a/src/mongo/db/database_index_builds_tracker.h
+++ b/src/mongo/db/database_index_builds_tracker.h
@@ -71,8 +71,10 @@ public:
*/
void runOperationOnAllBuilds(
WithLock,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::function<void(WithLock,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::shared_ptr<ReplIndexBuildState> replIndexBuildState,
const std::string& reason)> func,
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index 8ad65f8d2f3..47d13e8d46b 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -215,16 +215,24 @@ void Helpers::upsert(OperationContext* opCtx,
BSONElement e = o["_id"];
verify(e.type());
BSONObj id = e.wrap();
+ upsert(opCtx, ns, id, o, fromMigrate);
+}
+void Helpers::upsert(OperationContext* opCtx,
+ const string& ns,
+ const BSONObj& filter,
+ const BSONObj& updateMod,
+ bool fromMigrate) {
OldClientContext context(opCtx, ns);
const NamespaceString requestNs(ns);
UpdateRequest request(requestNs);
- request.setQuery(id);
- request.setUpdateModification(o);
+ request.setQuery(filter);
+ request.setUpdateModification(updateMod);
request.setUpsert();
request.setFromMigration(fromMigrate);
+ request.setYieldPolicy(PlanExecutor::NO_YIELD);
update(opCtx, context.db(), request);
}
diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h
index 246130853ff..7b606768d02 100644
--- a/src/mongo/db/dbhelpers.h
+++ b/src/mongo/db/dbhelpers.h
@@ -123,6 +123,18 @@ struct Helpers {
const BSONObj& o,
bool fromMigrate = false);
+ /**
+ * Performs an upsert of 'updateMod' if we don't match the given 'filter'.
+ * Callers are expected to hold the collection lock.
+ * Note: Query yielding is turned off, so both read and writes are performed
+ * on the same storage snapshot.
+ */
+ static void upsert(OperationContext* opCtx,
+ const std::string& ns,
+ const BSONObj& filter,
+ const BSONObj& updateMod,
+ bool fromMigrate = false);
+
// TODO: this should be somewhere else probably
/* Takes object o, and returns a new object with the
* same field elements but the names stripped out.
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 21478a822e5..eb1972bc49a 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -56,6 +56,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) final {}
void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {}
diff --git a/src/mongo/db/index_build_entry_helpers.cpp b/src/mongo/db/index_build_entry_helpers.cpp
index 1dce7f5b2c4..47666998c67 100644
--- a/src/mongo/db/index_build_entry_helpers.cpp
+++ b/src/mongo/db/index_build_entry_helpers.cpp
@@ -60,7 +60,8 @@ namespace mongo {
namespace {
-Status upsert(OperationContext* opCtx, IndexBuildEntry indexBuildEntry) {
+Status upsert(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry) {
+
return writeConflictRetry(opCtx,
"upsertIndexBuildEntry",
NamespaceString::kIndexBuildEntryNamespace.ns(),
@@ -85,6 +86,72 @@ Status upsert(OperationContext* opCtx, IndexBuildEntry indexBuildEntry) {
});
}
+std::pair<const BSONObj, const BSONObj> buildIndexBuildEntryFilterAndUpdate(
+ IndexBuildEntry& indexBuildEntry,
+ boost::optional<CommitQuorumOptions> currentCommitQuorum = boost::none) {
+ // Construct the filter.
+ const auto buildUUID =
+ BSON(IndexBuildEntry::kBuildUUIDFieldName << indexBuildEntry.getBuildUUID());
+ const auto collectionUUID =
+ BSON(IndexBuildEntry::kCollectionUUIDFieldName << indexBuildEntry.getCollectionUUID());
+ const auto indexNameList =
+ BSON(IndexBuildEntry::kIndexNamesFieldName << indexBuildEntry.getIndexNames());
+ BSONObjBuilder commitQuorumFilter;
+ if (!currentCommitQuorum) {
+ currentCommitQuorum = indexBuildEntry.getCommitQuorum();
+ }
+ currentCommitQuorum->appendToBuilder(IndexBuildEntry::kCommitQuorumFieldName,
+ &commitQuorumFilter);
+ const auto filter = BSON("$and" << BSON_ARRAY(buildUUID << collectionUUID << indexNameList
+ << commitQuorumFilter.obj()));
+ // Construct the update.
+ BSONObjBuilder updateMod;
+ BSONObjBuilder commitQuorumUpdate;
+ indexBuildEntry.getCommitQuorum().appendToBuilder(IndexBuildEntry::kCommitQuorumFieldName,
+ &commitQuorumUpdate);
+ // If the update commit quorum is same as the value on-disk, we don't update it.
+ updateMod.append("$set", commitQuorumUpdate.obj());
+
+ // '$addToSet' to prevent any duplicate entries written to "commitReadyMembers" field.
+ if (auto commitReadyMembers = indexBuildEntry.getCommitReadyMembers()) {
+ BSONArrayBuilder arrayBuilder;
+ for (const auto& item : commitReadyMembers.get()) {
+ arrayBuilder.append(item.toString());
+ }
+ const auto commitReadyMemberList = BSON(IndexBuildEntry::kCommitReadyMembersFieldName
+ << BSON("$each" << arrayBuilder.arr()));
+ updateMod.append("$addToSet", commitReadyMemberList);
+ }
+
+ return {filter, updateMod.obj()};
+}
+
+Status upsert(OperationContext* opCtx, const BSONObj& filter, const BSONObj& updateMod) {
+ return writeConflictRetry(opCtx,
+ "upsertIndexBuildEntry",
+ NamespaceString::kIndexBuildEntryNamespace.ns(),
+ [&]() -> Status {
+ AutoGetCollection autoCollection(
+ opCtx, NamespaceString::kIndexBuildEntryNamespace, MODE_IX);
+ Collection* collection = autoCollection.getCollection();
+ if (!collection) {
+ str::stream ss;
+ ss << "Collection not found: "
+ << NamespaceString::kIndexBuildEntryNamespace.ns();
+ return Status(ErrorCodes::NamespaceNotFound, ss);
+ }
+
+ WriteUnitOfWork wuow(opCtx);
+ Helpers::upsert(opCtx,
+ NamespaceString::kIndexBuildEntryNamespace.ns(),
+ filter,
+ updateMod,
+ /*fromMigrate=*/false);
+ wuow.commit();
+ return Status::OK();
+ });
+}
+
} // namespace
namespace indexbuildentryhelpers {
@@ -118,7 +185,24 @@ void ensureIndexBuildEntriesNamespaceExists(OperationContext* opCtx) {
});
}
-Status addIndexBuildEntry(OperationContext* opCtx, IndexBuildEntry indexBuildEntry) {
+Status persistCommitReadyMemberInfo(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry) {
+ invariant(indexBuildEntry.getCommitReadyMembers());
+
+ auto [filter, updateMod] = buildIndexBuildEntryFilterAndUpdate(indexBuildEntry);
+ return upsert(opCtx, filter, updateMod);
+}
+
+Status setIndexCommitQuorum(OperationContext* opCtx,
+ IndexBuildEntry& indexBuildEntry,
+ CommitQuorumOptions currentCommitQuorum) {
+ invariant(!indexBuildEntry.getCommitReadyMembers());
+
+ auto [filter, updateMod] =
+ buildIndexBuildEntryFilterAndUpdate(indexBuildEntry, currentCommitQuorum);
+ return upsert(opCtx, filter, updateMod);
+}
+
+Status addIndexBuildEntry(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry) {
return writeConflictRetry(opCtx,
"addIndexBuildEntry",
NamespaceString::kIndexBuildEntryNamespace.ns(),
@@ -303,7 +387,6 @@ Status addCommitReadyMember(OperationContext* opCtx, UUID indexBuildUUID, HostAn
indexBuildEntry.setCommitReadyMembers(newCommitReadyMembers);
return upsert(opCtx, indexBuildEntry);
}
-
return Status::OK();
}
diff --git a/src/mongo/db/index_build_entry_helpers.h b/src/mongo/db/index_build_entry_helpers.h
index e0591077784..8d8e04a9be3 100644
--- a/src/mongo/db/index_build_entry_helpers.h
+++ b/src/mongo/db/index_build_entry_helpers.h
@@ -70,6 +70,26 @@ namespace indexbuildentryhelpers {
void ensureIndexBuildEntriesNamespaceExists(OperationContext* opCtx);
/**
+ * Persist the host and port information about the replica set members that have voted to commit an
+ * index build into config.system.indexBuilds collection. If the member info is already present in
+ * the collection for that index build, then we don't do any updates and don't generate any errors.
+ *
+ * Returns an error if collection is missing.
+ */
+Status persistCommitReadyMemberInfo(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry);
+
+/**
+ * Persist the new commit quorum value for an index build into config.system.indexBuilds collection.
+ * We will update the commit quorum value only if 'currentCommitQuorum' matches the value read from
+ * the "config.system.indexBuilds" collection for that index build.
+ *
+ * Returns an error if collection is missing.
+ */
+Status setIndexCommitQuorum(OperationContext* opCtx,
+ IndexBuildEntry& indexBuildEntry,
+ CommitQuorumOptions currentCommitQuorum);
+
+/**
* Writes the 'indexBuildEntry' to the disk.
*
* An IndexBuildEntry should be stored on the disk during the duration of the index build process
@@ -78,7 +98,7 @@ void ensureIndexBuildEntriesNamespaceExists(OperationContext* opCtx);
* Returns 'DuplicateKey' error code if a document already exists on the disk with the same
* 'indexBuildUUID'.
*/
-Status addIndexBuildEntry(OperationContext* opCtx, IndexBuildEntry indexBuildEntry);
+Status addIndexBuildEntry(OperationContext* opCtx, IndexBuildEntry& indexBuildEntry);
/**
* Removes the IndexBuildEntry from the disk.
diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp
index 64cd255f1df..989f453e21a 100644
--- a/src/mongo/db/index_builds_coordinator.cpp
+++ b/src/mongo/db/index_builds_coordinator.cpp
@@ -134,25 +134,19 @@ bool shouldBuildIndexesOnEmptyCollectionSinglePhased(OperationContext* opCtx,
return collection->isEmpty(opCtx);
}
-/**
- * Returns true if we should wait for a commitIndexBuild or abortIndexBuild oplog entry during oplog
- * application.
+/*
+ * Determines whether to skip the index build state transition check.
+ * Index builder not using ReplIndexBuildState::waitForNextAction to signal primary and secondaries
+ * to commit or abort signal will violate index build state transition i.e, they can move from
+ * prepareAbort to Committed and from Committed to prepareAbort. So, we should skip state transition
+ * verification. Otherwise, we would invariant.
*/
-bool shouldWaitForCommitOrAbort(OperationContext* opCtx, const ReplIndexBuildState& replState) {
- if (IndexBuildProtocol::kTwoPhase != replState.protocol) {
- return false;
- }
-
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->getSettings().usingReplSets()) {
+bool shouldSkipIndexBuildStateTransitionCheck(OperationContext* opCtx,
+ IndexBuildProtocol protocol) {
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord->getSettings().usingReplSets() && protocol == IndexBuildProtocol::kTwoPhase) {
return false;
}
-
- const NamespaceStringOrUUID dbAndUUID(replState.dbName, replState.collectionUUID);
- if (replCoord->canAcceptWritesFor(opCtx, dbAndUUID)) {
- return false;
- }
-
return true;
}
@@ -161,7 +155,7 @@ bool shouldWaitForCommitOrAbort(OperationContext* opCtx, const ReplIndexBuildSta
*/
void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
- const ReplIndexBuildState& replState,
+ ReplIndexBuildState& replState,
bool replSetAndNotPrimaryAtStart) {
const auto& buildUUID = replState.buildUUID;
@@ -174,6 +168,11 @@ void onCommitIndexBuild(OperationContext* opCtx,
const auto& collUUID = replState.collectionUUID;
const auto& indexSpecs = replState.indexSpecs;
auto fromMigrate = false;
+ auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replState.protocol);
+ {
+ stdx::unique_lock<Latch> lk(replState.mutex);
+ replState.indexBuildState.setState(IndexBuildState::kCommitted, skipCheck);
+ }
// Since two phase index builds are allowed to survive replication state transitions, we should
// check if the node is currently a primary before attempting to write to the oplog.
@@ -196,7 +195,7 @@ void onCommitIndexBuild(OperationContext* opCtx,
*/
void onAbortIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
- const ReplIndexBuildState& replState,
+ ReplIndexBuildState& replState,
const Status& cause) {
if (!serverGlobalParams.featureCompatibility.isVersionInitialized()) {
return;
@@ -212,28 +211,52 @@ void onAbortIndexBuild(OperationContext* opCtx,
auto opObserver = opCtx->getServiceContext()->getOpObserver();
auto collUUID = replState.collectionUUID;
auto fromMigrate = false;
+ auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replState.protocol);
+ {
+ stdx::unique_lock<Latch> lk(replState.mutex);
+ replState.indexBuildState.setState(IndexBuildState::kAborted, skipCheck);
+ }
opObserver->onAbortIndexBuild(
opCtx, nss, collUUID, replState.buildUUID, replState.indexSpecs, cause, fromMigrate);
}
/**
* Aborts the index build identified by the provided 'replIndexBuildState'.
- *
- * Sets a signal on the coordinator's repl index build state if the builder does not yet exist in
- * the manager.
+ * It gets called by drop database/collection/index command.
*/
void abortIndexBuild(WithLock lk,
+ OperationContext* opCtx,
IndexBuildsManager* indexBuildsManager,
std::shared_ptr<ReplIndexBuildState> replIndexBuildState,
const std::string& reason) {
- bool res = indexBuildsManager->abortIndexBuild(replIndexBuildState->buildUUID, reason);
- if (res) {
+ auto protocol = replIndexBuildState->protocol;
+ stdx::unique_lock<Latch> replStateLock(replIndexBuildState->mutex);
+ if (protocol == IndexBuildProtocol::kTwoPhase &&
+ replIndexBuildState->waitForNextAction->getFuture().isReady()) {
+ const auto nextAction = replIndexBuildState->waitForNextAction->getFuture().get();
+ invariant(nextAction == IndexBuildAction::kCommitQuorumSatisfied ||
+ nextAction == IndexBuildAction::kPrimaryAbort);
+ // Index build coordinator already received a signal to commit or abort. So, it's ok
+ // to return and wait for the index build to complete. The index build coordinator
+ // will not perform the signaled action (i.e, will not commit or abort the index build)
+ // only when the node steps down. When the node steps down, the caller of this function,
+ // drop commands (user operation) will also get interrupted. So, we no longer need to
+ // abort the index build on step down.
return;
}
- // The index builder was not found in the manager, so it only exists in the coordinator. In this
- // case, set the abort signal on the coordinator index build state.
- replIndexBuildState->aborted = true;
- replIndexBuildState->abortReason = reason;
+
+ auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replIndexBuildState->protocol);
+ // Set the state on replIndexBuildState and indexBuildsManager. And, then signal the value. It's
+ // important we do all these 3 things in a critical section by holding mutex lock.
+ replIndexBuildState->indexBuildState.setState(
+ IndexBuildState::kPrepareAbort, skipCheck, boost::none, reason);
+ indexBuildsManager->abortIndexBuild(replIndexBuildState->buildUUID, reason);
+
+ if (protocol == IndexBuildProtocol::kTwoPhase) {
+ // Only for 2 phase we need to use signaling logic.
+ // Promise can be set only once.
+ replIndexBuildState->waitForNextAction->emplaceValue(IndexBuildAction::kPrimaryAbort);
+ }
}
/**
@@ -334,8 +357,8 @@ IndexBuildsCoordinator* IndexBuildsCoordinator::get(ServiceContext* serviceConte
return indexBuildsCoordinator.get();
}
-IndexBuildsCoordinator* IndexBuildsCoordinator::get(OperationContext* operationContext) {
- return get(operationContext->getServiceContext());
+IndexBuildsCoordinator* IndexBuildsCoordinator::get(OperationContext* OperationContext) {
+ return get(OperationContext->getServiceContext());
}
IndexBuildsCoordinator::~IndexBuildsCoordinator() {
@@ -491,6 +514,23 @@ Status IndexBuildsCoordinator::_startIndexBuildForRecovery(OperationContext* opC
return Status::OK();
}
+std::string IndexBuildsCoordinator::_indexBuildActionToString(IndexBuildAction action) {
+ if (action == IndexBuildAction::kNoAction) {
+ return "No action";
+ } else if (action == IndexBuildAction::kOplogCommit) {
+ return "Oplog commit";
+ } else if (action == IndexBuildAction::kOplogAbort) {
+ return "Oplog abort";
+ } else if (action == IndexBuildAction::kRollbackAbort) {
+ return "Rollback abort";
+ } else if (action == IndexBuildAction::kPrimaryAbort) {
+ return "Primary abort";
+ } else if (action == IndexBuildAction::kCommitQuorumSatisfied) {
+ return "Commit quorum Satisfied";
+ }
+ MONGO_UNREACHABLE;
+}
+
void IndexBuildsCoordinator::waitForAllIndexBuildsToStopForShutdown() {
stdx::unique_lock<Latch> lk(_mutex);
@@ -506,6 +546,7 @@ void IndexBuildsCoordinator::waitForAllIndexBuildsToStopForShutdown() {
}
std::vector<UUID> IndexBuildsCoordinator::_abortCollectionIndexBuilds(stdx::unique_lock<Latch>& lk,
+ OperationContext* opCtx,
const UUID& collectionUUID,
const std::string& reason,
bool shouldWait) {
@@ -520,7 +561,7 @@ std::vector<UUID> IndexBuildsCoordinator::_abortCollectionIndexBuilds(stdx::uniq
std::vector<UUID> buildUUIDs = collIndexBuildsIt->second->getIndexBuildUUIDs(lk);
collIndexBuildsIt->second->runOperationOnAllBuilds(
- lk, &_indexBuildsManager, abortIndexBuild, reason);
+ lk, opCtx, &_indexBuildsManager, abortIndexBuild, reason);
if (!shouldWait) {
return buildUUIDs;
@@ -533,21 +574,23 @@ std::vector<UUID> IndexBuildsCoordinator::_abortCollectionIndexBuilds(stdx::uniq
return buildUUIDs;
}
-void IndexBuildsCoordinator::abortCollectionIndexBuilds(const UUID& collectionUUID,
+void IndexBuildsCoordinator::abortCollectionIndexBuilds(OperationContext* opCtx,
+ const UUID& collectionUUID,
const std::string& reason) {
stdx::unique_lock<Latch> lk(_mutex);
const bool shouldWait = true;
- _abortCollectionIndexBuilds(lk, collectionUUID, reason, shouldWait);
+ _abortCollectionIndexBuilds(lk, opCtx, collectionUUID, reason, shouldWait);
}
std::vector<UUID> IndexBuildsCoordinator::abortCollectionIndexBuildsNoWait(
- const UUID& collectionUUID, const std::string& reason) {
+ OperationContext* opCtx, const UUID& collectionUUID, const std::string& reason) {
stdx::unique_lock<Latch> lk(_mutex);
const bool shouldWait = false;
- return _abortCollectionIndexBuilds(lk, collectionUUID, reason, shouldWait);
+ return _abortCollectionIndexBuilds(lk, opCtx, collectionUUID, reason, shouldWait);
}
void IndexBuildsCoordinator::_abortDatabaseIndexBuilds(stdx::unique_lock<Latch>& lk,
+ OperationContext* opCtx,
const StringData& db,
const std::string& reason,
bool shouldWait) {
@@ -560,7 +603,8 @@ void IndexBuildsCoordinator::_abortDatabaseIndexBuilds(stdx::unique_lock<Latch>&
"About to abort all index builders running for collections in the given database",
"database"_attr = db);
- dbIndexBuilds->runOperationOnAllBuilds(lk, &_indexBuildsManager, abortIndexBuild, reason);
+ dbIndexBuilds->runOperationOnAllBuilds(
+ lk, opCtx, &_indexBuildsManager, abortIndexBuild, reason);
if (!shouldWait) {
return;
@@ -571,17 +615,20 @@ void IndexBuildsCoordinator::_abortDatabaseIndexBuilds(stdx::unique_lock<Latch>&
dbIndexBuilds->waitUntilNoIndexBuildsRemain(lk);
}
-void IndexBuildsCoordinator::abortDatabaseIndexBuilds(StringData db, const std::string& reason) {
+void IndexBuildsCoordinator::abortDatabaseIndexBuilds(OperationContext* opCtx,
+ StringData db,
+ const std::string& reason) {
stdx::unique_lock<Latch> lk(_mutex);
const bool shouldWait = true;
- _abortDatabaseIndexBuilds(lk, db, reason, shouldWait);
+ _abortDatabaseIndexBuilds(lk, opCtx, db, reason, shouldWait);
}
-void IndexBuildsCoordinator::abortDatabaseIndexBuildsNoWait(StringData db,
+void IndexBuildsCoordinator::abortDatabaseIndexBuildsNoWait(OperationContext* opCtx,
+ StringData db,
const std::string& reason) {
stdx::unique_lock<Latch> lk(_mutex);
const bool shouldWait = false;
- _abortDatabaseIndexBuilds(lk, db, reason, shouldWait);
+ _abortDatabaseIndexBuilds(lk, opCtx, db, reason, shouldWait);
}
namespace {
@@ -599,7 +646,8 @@ void IndexBuildsCoordinator::applyStartIndexBuild(OperationContext* opCtx,
const auto nss = getNsFromUUID(opCtx, collUUID);
IndexBuildsCoordinator::IndexBuildOptions indexBuildOptions;
- invariant(!indexBuildOptions.commitQuorum);
+ invariant(oplogEntry.commitQuorum);
+ indexBuildOptions.commitQuorum = oplogEntry.commitQuorum.get();
indexBuildOptions.replSetAndNotPrimaryAtStart = true;
auto indexBuildsCoord = IndexBuildsCoordinator::get(opCtx);
@@ -636,6 +684,9 @@ void IndexBuildsCoordinator::applyCommitIndexBuild(OperationContext* opCtx,
// If the index build was not found, we must restart the build. For some reason the index
// build has already been aborted on this node. This is possible in certain infrequent race
// conditions with stepdown, shutdown, and user interruption.
+ // Also, it can be because, when this node was previously in
+ // initial sync state and this index build was in progress on sync source. And, initial sync
+ // does not start the in progress index builds.
LOGV2(20653,
"Could not find an active index build with UUID {buildUUID} while processing a "
"commitIndexBuild oplog entry. Restarting the index build on "
@@ -648,6 +699,10 @@ void IndexBuildsCoordinator::applyCommitIndexBuild(OperationContext* opCtx,
IndexBuildsCoordinator::IndexBuildOptions indexBuildOptions;
indexBuildOptions.replSetAndNotPrimaryAtStart = true;
+ // It's ok to set the commitQuorum value as 0, as we have already received the
+ // commitIndexBuild oplog entry. No way in future, this index build will be coordinated by
+ // this node.
+ indexBuildOptions.commitQuorum = CommitQuorumOptions(0);
// This spawns a new thread and returns immediately.
auto fut = uassertStatusOK(indexBuildsCoord->startIndexBuild(
@@ -671,12 +726,33 @@ void IndexBuildsCoordinator::applyCommitIndexBuild(OperationContext* opCtx,
}
auto replState = uassertStatusOK(indexBuildsCoord->_getIndexBuild(buildUUID));
- {
+
+ while (true) {
stdx::unique_lock<Latch> lk(replState->mutex);
- replState->isCommitReady = true;
- replState->commitTimestamp = opCtx->recoveryUnit()->getCommitTimestamp();
- replState->condVar.notify_all();
+ if (replState->waitForNextAction->getFuture().isReady()) {
+ // If future wait is made uninterruptible, then the shutdown can stuck behind
+ // oplog applier if the indexBuildCoordinator thread died after interruption on
+ // shutdown. And, commitIndexBuild oplog entry will stuck waiting for reset of the
+ // promise.
+ const auto nextAction = replState->waitForNextAction->getFuture().get(opCtx);
+ invariant(nextAction == IndexBuildAction::kCommitQuorumSatisfied);
+ // Retry until the current promise result is consumed by the index builder thread and
+ // a new empty promise got created by the indexBuildscoordinator thread.
+ // Don't hammer it.
+ sleepmillis(1);
+ continue;
+ }
+ auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replState->protocol);
+ replState->indexBuildState.setState(IndexBuildState::kPrepareCommit,
+ skipCheck,
+ opCtx->recoveryUnit()->getCommitTimestamp());
+ // Promise can be set only once.
+ // We can't skip signaling here if a signal is already set because the previous commit or
+ // abort signal might have been sent to handle for primary case.
+ replState->waitForNextAction->emplaceValue(IndexBuildAction::kOplogCommit);
+ break;
}
+
auto fut = replState->sharedPromise.getFuture();
LOGV2(20654,
"Index build joined after commit: {buildUUID}: {fut_waitNoThrow_opCtx}",
@@ -702,24 +778,40 @@ void IndexBuildsCoordinator::applyAbortIndexBuild(OperationContext* opCtx,
<< buildUUID,
!opCtx->recoveryUnit()->getCommitTimestamp().isNull());
+ std::string abortReason(str::stream()
+ << "abortIndexBuild oplog entry encountered: " << *oplogEntry.cause);
auto indexBuildsCoord = IndexBuildsCoordinator::get(opCtx);
- indexBuildsCoord->abortIndexBuildByBuildUUID(
- opCtx,
- buildUUID,
- opCtx->recoveryUnit()->getCommitTimestamp(),
- str::stream() << "abortIndexBuild oplog entry encountered: " << *oplogEntry.cause);
+ indexBuildsCoord->abortIndexBuildByBuildUUID(opCtx,
+ buildUUID,
+ IndexBuildAction::kOplogAbort,
+ opCtx->recoveryUnit()->getCommitTimestamp(),
+ abortReason);
+}
+
+void IndexBuildsCoordinator::abortIndexBuildOnError(OperationContext* opCtx,
+ const UUID& buildUUID,
+ Status abortStatus) {
+ // Use a null abort timestamp because the index build will generate a ghost timestamp
+ // for the single-phase build on cleanup.
+ std::string abortReason(str::stream() << "Index build interrupted: " << buildUUID << ": "
+ << abortStatus.toString());
+ abortIndexBuildByBuildUUIDNoWait(
+ opCtx, buildUUID, IndexBuildAction::kPrimaryAbort, boost::none, abortReason);
}
void IndexBuildsCoordinator::abortIndexBuildByBuildUUID(OperationContext* opCtx,
const UUID& buildUUID,
- Timestamp abortTimestamp,
- const std::string& reason) {
- if (!abortIndexBuildByBuildUUIDNoWait(opCtx, buildUUID, abortTimestamp, reason)) {
+ IndexBuildAction signalAction,
+ boost::optional<Timestamp> abortTimestamp,
+ boost::optional<std::string> reason) {
+ if (!abortIndexBuildByBuildUUIDNoWait(opCtx, buildUUID, signalAction, abortTimestamp, reason)) {
return;
}
- auto replState = invariant(_getIndexBuild(buildUUID),
- str::stream() << "Abort timestamp: " << abortTimestamp.toString());
+ auto replState =
+ invariant(_getIndexBuild(buildUUID),
+ str::stream() << "Abort timestamp: "
+ << abortTimestamp.get_value_or(Timestamp()).toString());
auto fut = replState->sharedPromise.getFuture();
LOGV2(20655,
@@ -732,8 +824,7 @@ boost::optional<UUID> IndexBuildsCoordinator::abortIndexBuildByIndexNamesNoWait(
OperationContext* opCtx,
const UUID& collectionUUID,
const std::vector<std::string>& indexNames,
- Timestamp abortTimestamp,
- const std::string& reason) {
+ boost::optional<std::string> reason) {
boost::optional<UUID> buildUUID;
auto indexBuilds = _getIndexBuilds();
auto onIndexBuild = [&](std::shared_ptr<ReplIndexBuildState> replState) {
@@ -756,8 +847,11 @@ boost::optional<UUID> IndexBuildsCoordinator::abortIndexBuildByIndexNamesNoWait(
"collectionUUID"_attr = collectionUUID,
"replState_indexNames_front"_attr = replState->indexNames.front());
- if (this->abortIndexBuildByBuildUUIDNoWait(
- opCtx, replState->buildUUID, abortTimestamp, reason)) {
+ if (this->abortIndexBuildByBuildUUIDNoWait(opCtx,
+ replState->buildUUID,
+ IndexBuildAction::kPrimaryAbort,
+ boost::none,
+ reason)) {
buildUUID = replState->buildUUID;
}
};
@@ -792,31 +886,74 @@ bool IndexBuildsCoordinator::hasIndexBuilder(OperationContext* opCtx,
return foundIndexBuilder;
}
-bool IndexBuildsCoordinator::abortIndexBuildByBuildUUIDNoWait(OperationContext* opCtx,
- const UUID& buildUUID,
- Timestamp abortTimestamp,
- const std::string& reason) {
- _indexBuildsManager.abortIndexBuild(buildUUID, reason);
+bool IndexBuildsCoordinator::abortIndexBuildByBuildUUIDNoWait(
+ OperationContext* opCtx,
+ const UUID& buildUUID,
+ IndexBuildAction signalAction,
+ boost::optional<Timestamp> abortTimestamp,
+ boost::optional<std::string> reason) {
+ // We need to avoid race between commit and abort index build.
+ while (true) {
+ // It is possible to receive an abort for a non-existent index build. Abort should always
+ // succeed, so suppress the error.
+ auto replStateResult = _getIndexBuild(buildUUID);
+ if (!replStateResult.isOK()) {
+ LOGV2(20656,
+ "ignoring error while aborting index build {buildUUID}: "
+ "{replStateResult_getStatus}",
+ "buildUUID"_attr = buildUUID,
+ "replStateResult_getStatus"_attr = replStateResult.getStatus());
+ return false;
+ }
- // It is possible to receive an abort for a non-existent index build. Abort should always
- // succeed, so suppress the error.
- auto replStateResult = _getIndexBuild(buildUUID);
- if (!replStateResult.isOK()) {
- LOGV2(20656,
- "ignoring error while aborting index build {buildUUID}: {replStateResult_getStatus}",
- "buildUUID"_attr = buildUUID,
- "replStateResult_getStatus"_attr = replStateResult.getStatus());
- return false;
- }
+ auto replState = replStateResult.getValue();
+ auto protocol = replState->protocol;
- auto replState = replStateResult.getValue();
- {
stdx::unique_lock<Latch> lk(replState->mutex);
- replState->aborted = true;
- replState->abortTimestamp = abortTimestamp;
- replState->abortReason = reason;
- replState->condVar.notify_all();
+ if (protocol == IndexBuildProtocol::kTwoPhase &&
+ replState->waitForNextAction->getFuture().isReady()) {
+ const auto nextAction = replState->waitForNextAction->getFuture().get(opCtx);
+ invariant(nextAction == IndexBuildAction::kCommitQuorumSatisfied ||
+ nextAction == IndexBuildAction::kPrimaryAbort);
+
+ // Index build coordinator already received a signal to commit or abort. So, it's ok
+ // to return and wait for the index build to complete if we are trying to signal
+ // 'kPrimaryAbort'. The index build coordinator will not perform the signaled action
+ // (i.e, will not commit or abort the index build) only when the node steps down. When
+ // the node steps down, the caller of this function, dropIndexes/createIndexes command
+ // (user operation) will also get interrupted. So, we no longer need to abort the index
+ // build on step down.
+ //
+ // Currently dropIndexes command calls this function with the
+ // collection lock held in IX mode, So, there are possibilities, we might block the
+ // index build from completing, leading to 3 way deadlocks involving step down,
+ // dropIndexes command, IndexBuildCoordinator thread.
+ if (signalAction == IndexBuildAction::kPrimaryAbort)
+ return true;
+
+ // Retry until the current promise result is consumed by the index builder thread and
+ // a new empty promise got created by the indexBuildscoordinator thread. Or, until the
+ // index build got torn down after index build commit.
+ // Don't hammer it.
+ sleepmillis(1);
+ continue;
+ }
+
+ auto skipCheck = shouldSkipIndexBuildStateTransitionCheck(opCtx, replState->protocol);
+ // Set the state on replState and _indexBuildsManager. And, then signal the value. It's
+ // important we do all these 3 things in a critical section by holding mutex lock.
+ replState->indexBuildState.setState(
+ IndexBuildState::kPrepareAbort, skipCheck, abortTimestamp, reason);
+ _indexBuildsManager.abortIndexBuild(buildUUID, reason.get_value_or(""));
+
+ if (protocol == IndexBuildProtocol::kTwoPhase) {
+ // Only for 2 phase we need to use signaling logic.
+ // Promise can be set only once.
+ replState->waitForNextAction->emplaceValue(signalAction);
+ }
+ break;
}
+
return true;
}
@@ -850,8 +987,16 @@ std::size_t IndexBuildsCoordinator::getActiveIndexBuildCount(OperationContext* o
void IndexBuildsCoordinator::onStepUp(OperationContext* opCtx) {
LOGV2(20657, "IndexBuildsCoordinator::onStepUp - this node is stepping up to primary");
+ // This would create an empty table even for FCV 4.2 to handle case where a primary node started
+ // with FCV 4.2, and then upgraded FCV 4.4.
+ ensureIndexBuildEntriesNamespaceExists(opCtx);
+
auto indexBuilds = _getIndexBuilds();
auto onIndexBuild = [this, opCtx](std::shared_ptr<ReplIndexBuildState> replState) {
+ if (IndexBuildProtocol::kTwoPhase != replState->protocol) {
+ return;
+ }
+
// TODO(SERVER-44654): re-enable failover support for unique indexes.
if (containsUniqueIndexes(replState->indexSpecs)) {
// We abort unique index builds on step-up on the new primary, as opposed to on
@@ -863,21 +1008,25 @@ void IndexBuildsCoordinator::onStepUp(OperationContext* opCtx) {
// oplog entry.
// Do not wait for the index build to exit, because it may reacquire locks that are not
// available until stepUp completes.
- abortIndexBuildByBuildUUIDNoWait(
- opCtx, replState->buildUUID, Timestamp(), "unique indexes do not support failover");
+ std::string abortReason("unique indexes do not support failover");
+ abortIndexBuildByBuildUUIDNoWait(opCtx,
+ replState->buildUUID,
+ IndexBuildAction::kPrimaryAbort,
+ boost::none,
+ abortReason);
return;
}
- stdx::unique_lock<Latch> lk(replState->mutex);
- if (!replState->aborted) {
- // Leave commit timestamp as null. We will be writing a commitIndexBuild oplog entry now
- // that we are primary and using the timestamp from the oplog entry to update the mdb
- // catalog.
- invariant(replState->commitTimestamp.isNull(), replState->buildUUID.toString());
- invariant(!replState->isCommitReady, replState->buildUUID.toString());
- replState->isCommitReady = true;
- replState->condVar.notify_all();
+ {
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ // After Sending the abort this might have stepped down and stepped back up.
+ if (replState->indexBuildState.isAbortPrepared() &&
+ !replState->waitForNextAction->getFuture().isReady()) {
+ replState->waitForNextAction->emplaceValue(IndexBuildAction::kPrimaryAbort);
+ return;
+ }
}
+ _signalIfCommitQuorumIsSatisfied(opCtx, replState);
};
forEachIndexBuild(indexBuilds, "IndexBuildsCoordinator::onStepUp - "_sd, onIndexBuild);
}
@@ -888,33 +1037,49 @@ IndexBuilds IndexBuildsCoordinator::onRollback(OperationContext* opCtx) {
IndexBuilds buildsAborted;
auto indexBuilds = _getIndexBuilds();
- auto onIndexBuild =
- [this, opCtx, &buildsAborted](std::shared_ptr<ReplIndexBuildState> replState) {
- if (IndexBuildProtocol::kSinglePhase == replState->protocol) {
- LOGV2(20659,
- "IndexBuildsCoordinator::onRollback - not aborting single phase index build: "
- "{replState_buildUUID}",
- "replState_buildUUID"_attr = replState->buildUUID);
- return;
- }
- const std::string reason = "rollback";
-
- IndexBuildDetails aborted{replState->collectionUUID};
- // Record the index builds aborted due to rollback. This allows any rollback algorithm
- // to efficiently restart all unfinished index builds without having to scan all indexes
- // in all collections.
- for (auto spec : replState->indexSpecs) {
- aborted.indexSpecs.emplace_back(spec.getOwned());
- }
- buildsAborted.insert({replState->buildUUID, aborted});
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ auto onIndexBuild = [&](std::shared_ptr<ReplIndexBuildState> replState) {
+ if (IndexBuildProtocol::kSinglePhase == replState->protocol) {
+ LOGV2(3856209,
+ "IndexBuildsCoordinator::onRollback - not aborting single phase index build: "
+ "{builduuid} ",
+ "builduuid"_attr = replState->buildUUID);
+ return;
+ }
+ if (IndexBuildProtocol::kSinglePhase == replState->protocol) {
+ LOGV2(20659,
+ "IndexBuildsCoordinator::onRollback - not aborting single phase index build: "
+ "{replState_buildUUID}",
+ "replState_buildUUID"_attr = replState->buildUUID);
+ return;
+ }
+ const std::string reason = "rollback";
+
+ IndexBuildDetails aborted{replState->collectionUUID};
+ // Record the index builds aborted due to rollback. This allows any rollback algorithm
+ // to efficiently restart all unfinished index builds without having to scan all indexes
+ // in all collections.
+ for (auto spec : replState->indexSpecs) {
+ aborted.indexSpecs.emplace_back(spec.getOwned());
+ }
+ buildsAborted.insert({replState->buildUUID, aborted});
- // Leave abort timestamp as null. This will unblock the index build and allow it to
- // complete without cleaning up. Subsequently, the rollback algorithm can decide how to
- // undo the index build depending on the state of the oplog. Waits for index build
- // thread to exit.
- abortIndexBuildByBuildUUID(opCtx, replState->buildUUID, Timestamp(), reason);
- };
+ // Leave abort timestamp as null. This will unblock the index build and allow it to
+ // complete without cleaning up. Subsequently, the rollback algorithm can decide how to
+ // undo the index build depending on the state of the oplog.
+ // Signals the kRollbackAbort and then waits for the thread to join.
+ abortIndexBuildByBuildUUID(
+ opCtx, replState->buildUUID, IndexBuildAction::kRollbackAbort, boost::none, reason);
+
+ // Now, cancel if there are any active callback handle waiting for the remote
+ // "voteCommitIndexBuild" command's response.
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ if (replState->voteCmdCbkHandle.isValid()) {
+ replCoord->cancelCbkHandle(replState->voteCmdCbkHandle);
+ }
+ };
forEachIndexBuild(indexBuilds, "IndexBuildsCoordinator::onRollback - "_sd, onIndexBuild);
+
return buildsAborted;
}
@@ -1108,7 +1273,12 @@ void IndexBuildsCoordinator::createIndexes(OperationContext* opCtx,
if (!supportsTwoPhaseIndexBuild()) {
return;
}
- opObserver->onStartIndexBuild(opCtx, nss, collectionUUID, buildUUID, specs, fromMigrate);
+ // Since, we don't use IndexBuildsCoordinatorMongod thread pool to build system indexes,
+ // it's ok to set the commit quorum option as 1. Also, this is currently only get
+ // called during system index creation on startup. So, onStartIndexBuild() call will be a
+ // no-op.
+ opObserver->onStartIndexBuild(
+ opCtx, nss, collectionUUID, buildUUID, specs, CommitQuorumOptions(1), fromMigrate);
opObserver->onCommitIndexBuild(opCtx, nss, collectionUUID, buildUUID, specs, fromMigrate);
};
uassertStatusOK(_indexBuildsManager.commitIndexBuild(
@@ -1201,16 +1371,17 @@ Status IndexBuildsCoordinator::_registerIndexBuild(
{
// We have to lock the mutex in order to read the committed/aborted state.
stdx::unique_lock<Latch> lk(existingIndexBuild->mutex);
- if (existingIndexBuild->isCommitReady) {
- ss << " (ready to commit with timestamp: "
- << existingIndexBuild->commitTimestamp.toString() << ")";
- } else if (existingIndexBuild->aborted) {
- ss << " (aborted with reason: " << existingIndexBuild->abortReason
- << " and timestamp: " << existingIndexBuild->abortTimestamp.toString()
- << ")";
+ ss << " index build state: " << existingIndexBuild->indexBuildState.toString();
+ if (auto ts = existingIndexBuild->indexBuildState.getTimestamp()) {
+ ss << ", timestamp: " << ts->toString();
+ }
+ if (existingIndexBuild->indexBuildState.isSet(IndexBuildState::kPrepareAbort |
+ IndexBuildState::kAborted)) {
+ if (auto abortReason =
+ existingIndexBuild->indexBuildState.getAbortReason()) {
+ ss << ", abort reason: " << abortReason.get();
+ }
aborted = true;
- } else {
- ss << " (in-progress)";
}
}
std::string msg = ss;
@@ -1394,15 +1565,28 @@ IndexBuildsCoordinator::PostSetupAction IndexBuildsCoordinator::_setUpIndexBuild
MultiIndexBlock::OnInitFn onInitFn;
if (IndexBuildProtocol::kTwoPhase == replState->protocol) {
+ // Change the startIndexBuild Oplog entry.
// Two-phase index builds write a different oplog entry than the default behavior which
// writes a no-op just to generate an optime.
onInitFn = [&](std::vector<BSONObj>& specs) {
+ if (!replCoord->canAcceptWritesFor(opCtx, nss)) {
+ // Not primary.
+ return Status::OK();
+ }
+
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ // Need to run this in repl mutex, as we want to the commitQuorum value. And, generate
+ // the startIndexBuild oplog entry with mutex lock held. We basically don't want
+ // something like this, SetIndexCommitQuorum command changes the commit quorum from 3
+ // to 5. And, the startIndexBuild resets the commit quorum value to be 3 on secondaries.
+ invariant(replState->commitQuorum, "Commit quorum required for two phase index build");
opCtx->getServiceContext()->getOpObserver()->onStartIndexBuild(
opCtx,
nss,
replState->collectionUUID,
replState->buildUUID,
replState->indexSpecs,
+ replState->commitQuorum.get(),
false /* fromMigrate */);
return Status::OK();
@@ -1602,6 +1786,14 @@ void IndexBuildsCoordinator::_cleanUpTwoPhaseAfterFailure(
const Status& status) {
if (status == ErrorCodes::InterruptedAtShutdown) {
+ // Promise should be set at least once before it's getting destroyed. Else it would
+ // invariant.
+ {
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ if (!replState->waitForNextAction->getFuture().isReady()) {
+ replState->waitForNextAction->emplaceValue(IndexBuildAction::kNoAction);
+ }
+ }
// Leave it as-if kill -9 happened. Startup recovery will restart the index build.
_indexBuildsManager.abortIndexBuildWithoutCleanup(
opCtx, collection, replState->buildUUID, "shutting down");
@@ -1628,14 +1820,15 @@ void IndexBuildsCoordinator::_cleanUpTwoPhaseAfterFailure(
// build from the oplog entry unless the index build did not fail due to processing an
// abortIndexBuild oplog entry. This is the case if we were aborted due to rollback.
stdx::unique_lock<Latch> lk(replState->mutex);
- invariant(replState->aborted, replState->buildUUID.toString());
- Timestamp abortIndexBuildTimestamp = replState->abortTimestamp;
+ invariant(replState->indexBuildState.isAbortPrepared(),
+ replState->buildUUID.toString());
+ auto abortIndexBuildTimestamp = replState->indexBuildState.getTimestamp();
// If we were aborted and no abort timestamp is set, then we should leave the index
// build unfinished. This can happen during rollback because we are not primary and
// cannot generate an optime to timestamp the index build abort. We rely on the
// rollback process to correct this state.
- if (abortIndexBuildTimestamp.isNull()) {
+ if (!abortIndexBuildTimestamp) {
_indexBuildsManager.abortIndexBuildWithoutCleanup(
opCtx, collection, replState->buildUUID, "no longer primary");
_indexBuildsManager.tearDownIndexBuild(
@@ -1647,7 +1840,7 @@ void IndexBuildsCoordinator::_cleanUpTwoPhaseAfterFailure(
unlockRSTLForIndexCleanup(opCtx);
Lock::CollectionLock collLock(opCtx, nss, MODE_X);
- TimestampBlock tsBlock(opCtx, abortIndexBuildTimestamp);
+ TimestampBlock tsBlock(opCtx, abortIndexBuildTimestamp.get());
_indexBuildsManager.tearDownIndexBuild(
opCtx, collection, replState->buildUUID, MultiIndexBlock::kNoopOnCleanUpFn);
return;
@@ -1802,7 +1995,9 @@ void IndexBuildsCoordinator::_buildIndexTwoPhase(
_scanCollectionAndInsertKeysIntoSorter(opCtx, replState, exclusiveCollectionLock);
_insertKeysFromSideTablesWithoutBlockingWrites(opCtx, replState);
- auto commitIndexBuildTimestamp = _waitForCommitOrAbort(opCtx, replState);
+ _signalPrimaryForCommitReadiness(opCtx, replState);
+ auto commitIndexBuildTimestamp = _waitForNextIndexBuildAction(opCtx, replState);
+
_insertKeysFromSideTablesAndCommit(
opCtx, replState, indexBuildOptions, exclusiveCollectionLock, commitIndexBuildTimestamp);
}
@@ -1896,51 +2091,6 @@ void IndexBuildsCoordinator::_insertKeysFromSideTablesWithoutBlockingWrites(
}
/**
- * Waits for commit or abort signal from primary.
- */
-Timestamp IndexBuildsCoordinator::_waitForCommitOrAbort(
- OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) {
- Timestamp commitIndexBuildTimestamp;
- if (shouldWaitForCommitOrAbort(opCtx, *replState)) {
- LOGV2(20668,
- "Index build waiting for commit or abort before completing final phase: "
- "{replState_buildUUID}",
- "replState_buildUUID"_attr = replState->buildUUID);
-
- // Yield locks and storage engine resources before blocking.
- opCtx->recoveryUnit()->abandonSnapshot();
- Lock::TempRelease release(opCtx->lockState());
- invariant(!opCtx->lockState()->isLocked(),
- str::stream()
- << "failed to yield locks for index build while waiting for commit or abort: "
- << replState->buildUUID);
-
- stdx::unique_lock<Latch> lk(replState->mutex);
- auto isReadyToCommitOrAbort = [rs = replState] { return rs->isCommitReady || rs->aborted; };
- opCtx->waitForConditionOrInterrupt(replState->condVar, lk, isReadyToCommitOrAbort);
-
- if (replState->isCommitReady) {
- LOGV2(20669,
- "Committing index build",
- "buildUUID"_attr = replState->buildUUID,
- "commitTimestamp"_attr = replState->commitTimestamp,
- "collectionUUID"_attr = replState->collectionUUID);
- commitIndexBuildTimestamp = replState->commitTimestamp;
- invariant(!replState->aborted, replState->buildUUID.toString());
- } else if (replState->aborted) {
- LOGV2(20670,
- "Aborting index build",
- "buildUUID"_attr = replState->buildUUID,
- "abortTimestamp"_attr = replState->abortTimestamp,
- "abortReason"_attr = replState->abortReason,
- "collectionUUID"_attr = replState->collectionUUID);
- invariant(!replState->isCommitReady, replState->buildUUID.toString());
- }
- }
- return commitIndexBuildTimestamp;
-}
-
-/**
* Third phase is catching up on all the writes that occurred during the first two phases.
* Accepts a commit timestamp for the index (null if not available).
*/
diff --git a/src/mongo/db/index_builds_coordinator.h b/src/mongo/db/index_builds_coordinator.h
index 18baf0d8fcf..801952ac5bb 100644
--- a/src/mongo/db/index_builds_coordinator.h
+++ b/src/mongo/db/index_builds_coordinator.h
@@ -48,6 +48,8 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl_index_build_state.h"
#include "mongo/db/storage/durable_catalog.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -198,7 +200,9 @@ public:
* provided 'reason' will be used in the error message that the index builders return to their
* callers.
*/
- void abortCollectionIndexBuilds(const UUID& collectionUUID, const std::string& reason);
+ void abortCollectionIndexBuilds(OperationContext* opCtx,
+ const UUID& collectionUUID,
+ const std::string& reason);
/**
* Signals all of the index builds on the specified collection to abort and returns the build
@@ -206,7 +210,8 @@ public:
* provided 'reason' will be used in the error message that the index builders return to their
* callers.
*/
- std::vector<UUID> abortCollectionIndexBuildsNoWait(const UUID& collectionUUID,
+ std::vector<UUID> abortCollectionIndexBuildsNoWait(OperationContext* opCtx,
+ const UUID& collectionUUID,
const std::string& reason);
/**
@@ -214,21 +219,33 @@ public:
* builds are no longer running. The provided 'reason' will be used in the error message that
* the index builders return to their callers.
*/
- void abortDatabaseIndexBuilds(StringData db, const std::string& reason);
+ void abortDatabaseIndexBuilds(OperationContext* opCtx,
+ StringData db,
+ const std::string& reason);
/**
* Signals all of the index builds on the specified database to abort. The provided 'reason'
* will be used in the error message that the index builders return to their callers.
*/
- void abortDatabaseIndexBuildsNoWait(StringData db, const std::string& reason);
+ void abortDatabaseIndexBuildsNoWait(OperationContext* opCtx,
+ StringData db,
+ const std::string& reason);
+
+ /**
+ * Aborts an index build by index build UUID. This gets called when the index build on primary
+ * failed due to interruption or replica set state change.
+ * It's a wrapper function to abortIndexBuildByBuildUUIDNoWait().
+ */
+ void abortIndexBuildOnError(OperationContext* opCtx, const UUID& buildUUID, Status abortStatus);
/**
* Aborts an index build by index build UUID. Returns when the index build thread exits.
*/
void abortIndexBuildByBuildUUID(OperationContext* opCtx,
const UUID& buildUUID,
- Timestamp abortTimestamp,
- const std::string& reason);
+ IndexBuildAction signalAction,
+ boost::optional<Timestamp> abortTimestamp = boost::none,
+ boost::optional<std::string> reason = boost::none);
/**
* Aborts an index build by index build UUID. Does not wait for the index build thread to
@@ -236,9 +253,9 @@ public:
*/
bool abortIndexBuildByBuildUUIDNoWait(OperationContext* opCtx,
const UUID& buildUUID,
- Timestamp abortTimestamp,
- const std::string& reason);
-
+ IndexBuildAction signalAction,
+ boost::optional<Timestamp> abortTimestamp = boost::none,
+ boost::optional<std::string> reason = boost::none);
/**
* Aborts an index build by its index name(s). This will only abort in-progress index builds if
* all of the indexes are specified that a single builder is building together. When an
@@ -249,8 +266,7 @@ public:
OperationContext* opCtx,
const UUID& collectionUUID,
const std::vector<std::string>& indexNames,
- Timestamp abortTimestamp,
- const std::string& reason);
+ boost::optional<std::string> reason = boost::none);
/**
* Returns true if there is an index builder building the given index names on a collection.
@@ -281,9 +297,13 @@ public:
IndexBuilds onRollback(OperationContext* opCtx);
/**
- * TODO: This is not yet implemented.
+ * Handles the 'VoteCommitIndexBuild' command request.
+ * Writes the host and port information of the replica set member that has voted to commit an
+ * index build into config.system.indexBuilds collection.
*/
- virtual Status voteCommitIndexBuild(const UUID& buildUUID, const HostAndPort& hostAndPort) = 0;
+ virtual Status voteCommitIndexBuild(OperationContext* opCtx,
+ const UUID& buildUUID,
+ const HostAndPort& hostAndPort) = 0;
/**
* Sets a new commit quorum on an index build that manages 'indexNames' on collection 'nss'.
@@ -574,7 +594,34 @@ protected:
OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState);
/**
- * Waits for commit or abort signal from primary.
+ * Reads the commit ready members list for index build UUID in 'replState' from
+ * "config.system.indexBuilds" collection. And, signals the index builder thread on primary to
+ * commit the index build if the number of voters have satisfied the commit quorum for that
+ * index build. Sets the ReplIndexBuildState::waitForNextAction promise value to be
+ * IndexBuildAction::kCommitQuorumSatisfied.
+ */
+ virtual void _signalIfCommitQuorumIsSatisfied(
+ OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) = 0;
+
+ /**
+ * Signals the primary to commit the index build by sending "voteCommitIndexBuild" command
+ * request to it with write concern 'majority', then waits for that command's response. And,
+ * command gets retried on error. This function gets called after the second draining phase of
+ * index build.
+ */
+ virtual void _signalPrimaryForCommitReadiness(
+ OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) = 0;
+
+ /**
+ * Both primary and secondaries will wait on 'ReplIndexBuildState::waitForNextAction' future for
+ * commit or abort index build signal.
+ * On primary:
+ * - Commit signal can be sent either by voteCommitIndexBuild command or stepup.
+ * - Abort signal can be sent either by createIndexes command thread on user interruption or
+ * drop indexes/databases/collection commands.
+ * On secondaries:
+ * - Commit signal can be sent only by oplog applier.
+ * - Abort signal on secondaries can be sent by oplog applier, bgSync on rollback.
*
* On completion, this function returns a timestamp, which may be null, that may be used to
* update the mdb catalog as we commit the index build. The commit index build timestamp is
@@ -583,14 +630,17 @@ protected:
* are currently a primary, in which case we do not need to wait any external signal to commit
* the index build.
*/
- Timestamp _waitForCommitOrAbort(OperationContext* opCtx,
- std::shared_ptr<ReplIndexBuildState> replState);
+ virtual Timestamp _waitForNextIndexBuildAction(
+ OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) = 0;
+
+ std::string _indexBuildActionToString(IndexBuildAction action);
/**
* Third phase is catching up on all the writes that occurred during the first two phases.
- * Accepts a commit timestamp for the index, which could be null. See _waitForCommitOrAbort()
- * comments. This timestamp is used only for committing the index, which sets the ready flag to
- * true, to the catalog; it is not used for the catch-up writes during the final drain phase.
+ * Accepts a commit timestamp for the index, which could be null. See
+ * _waitForNextIndexBuildAction() comments. This timestamp is used only for committing the
+ * index, which sets the ready flag to true, to the catalog; it is not used for the catch-up
+ * writes during the final drain phase.
*/
void _insertKeysFromSideTablesAndCommit(
OperationContext* opCtx,
@@ -628,6 +678,7 @@ protected:
* UUIDs of the aborted index builders
*/
std::vector<UUID> _abortCollectionIndexBuilds(stdx::unique_lock<Latch>& lk,
+ OperationContext* opCtx,
const UUID& collectionUUID,
const std::string& reason,
bool shouldWait);
@@ -636,6 +687,7 @@ protected:
* Helper for 'abortDatabaseIndexBuilds' and 'abortDatabaseIndexBuildsNoWait'.
*/
void _abortDatabaseIndexBuilds(stdx::unique_lock<Latch>& lk,
+ OperationContext* opCtx,
const StringData& db,
const std::string& reason,
bool shouldWait);
diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp
index bf06151e5f7..f59f8da6082 100644
--- a/src/mongo/db/index_builds_coordinator_mongod.cpp
+++ b/src/mongo/db/index_builds_coordinator_mongod.cpp
@@ -34,12 +34,19 @@
#include "mongo/db/index_builds_coordinator_mongod.h"
#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/catalog/index_build_entry_gen.h"
+#include "mongo/db/concurrency/locker.h"
+#include "mongo/db/concurrency/replication_state_transition_lock_guard.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/index_build_entry_helpers.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/two_phase_index_build_knobs_gen.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/logv2/log.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/str.h"
@@ -166,6 +173,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
// written, or an error has been encountered otherwise.
auto [startPromise, startFuture] = makePromiseFuture<void>();
+
auto replState = invariant(_getIndexBuild(buildUUID));
_threadPool.schedule([
this,
@@ -245,10 +253,371 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
return replState->sharedPromise.getFuture();
}
-Status IndexBuildsCoordinatorMongod::voteCommitIndexBuild(const UUID& buildUUID,
+Status IndexBuildsCoordinatorMongod::voteCommitIndexBuild(OperationContext* opCtx,
+ const UUID& buildUUID,
const HostAndPort& hostAndPort) {
- // TODO: not yet implemented.
- return Status::OK();
+
+ auto swReplState = _getIndexBuild(buildUUID);
+ if (!swReplState.isOK()) {
+ // Index build might have got torn down.
+ return swReplState.getStatus();
+ }
+
+ Status upsertStatus = Status::OK();
+ std::vector<HostAndPort> members;
+ members.push_back(hostAndPort);
+ auto replState = swReplState.getValue();
+
+ {
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ // This indicates the index build was successfully able to commit or abort, and about to
+ // write 'commitIndexBuild' or 'abortIndexBuild' oplog entry. In such case, we should throw
+ // a retryable error code to secondary and not try to persist the votes. Otherwise a
+ // deadlock can happen if a commit/abortIndexBuild oplog entry is followed by write to
+ // "config.system.indexBuilds" collection. In that case, voteCommitIndexBuild cmd on primary
+ // will be waiting for the system.indexBuilds write to be majority replicated. But, then,
+ // secondary oplog will be stuck waiting on commit/abortIndexBuild oplog entry. And,
+ // commit/abortIndexBuild oplog entry will be waiting on the secondary indexBuildCoordinator
+ // thread to join. But, the indexBuildCoordinator thread will be waiting for the
+ // voteCommitIndexBuild response.
+ if (replState->indexBuildState.isSet(IndexBuildState::kCommitted |
+ IndexBuildState::kAborted)) {
+ return Status{ErrorCodes::CommandFailed,
+ str::stream()
+ << "Index build state : " << replState->indexBuildState.toString()};
+ }
+
+ invariant(replState->commitQuorum);
+ IndexBuildEntry indexbuildEntry(buildUUID,
+ replState->collectionUUID,
+ replState->commitQuorum.get(),
+ replState->indexNames);
+ indexbuildEntry.setCommitReadyMembers(members);
+
+ // Persist the vote with replState mutex lock held to make sure that node will not write the
+ // commit/abortIndexBuild oplog entry.
+ upsertStatus = persistCommitReadyMemberInfo(opCtx, indexbuildEntry);
+ // 'DuplicateKey' error indicates that the commit quorum value read from replState does not
+ // match on-disk commit quorum value. Since, we persist the vote with replState mutex lock
+ // held, there is no way this can happen. We basically don't want something like this,
+ // SetIndexCommitQuorum command changes the commit quorum from 3 to 5. And, the
+ // voteCommitIndexBuild resets the commit quorum value to be 3 while updating the voter's
+ // info.
+ invariant(upsertStatus.code() != ErrorCodes::DuplicateKey);
+ }
+
+ if (upsertStatus.isOK()) {
+ _signalIfCommitQuorumIsSatisfied(opCtx, replState);
+ }
+ return upsertStatus;
+}
+
+void IndexBuildsCoordinatorMongod::_signalIfCommitQuorumIsSatisfied(
+ OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) {
+ auto sendCommitQuorumSatisfiedSignal = [&]() {
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ if (!replState->waitForNextAction->getFuture().isReady()) {
+ replState->waitForNextAction->emplaceValue(IndexBuildAction::kCommitQuorumSatisfied);
+ } else {
+ // This implies we already got a commit or abort signal by other ways. This might have
+ // been signaled earlier with kPrimaryAbort or kCommitQuorumSatisfied. Or, it's also
+ // possible the node got stepped down and received kOplogCommit/koplogAbort or got
+ // kRollbackAbort. So, it's ok to skip signaling.
+ auto action = replState->waitForNextAction->getFuture().get(opCtx);
+
+ LOGV2(3856200,
+ "Not signaling \"{signalAction}\" as it was previously signaled with "
+ "\"{signalActionSet}\" for index build: {buildUUID}",
+ "signalAction"_attr =
+ _indexBuildActionToString(IndexBuildAction::kCommitQuorumSatisfied),
+ "signalActionSet"_attr = _indexBuildActionToString(action),
+ "buildUUID"_attr = replState->buildUUID);
+ }
+ };
+
+ if (!enableIndexBuildCommitQuorum) {
+ // When enableIndexBuildCommitQuorum is turned off, we should not use
+ // system.indexBuilds collection to decide whether the commit quorum is satisfied or not.
+ sendCommitQuorumSatisfiedSignal();
+ return;
+ }
+
+ // Read the index builds entry from config.system.indexBuilds collection.
+ auto swIndexBuildEntry = getIndexBuildEntry(opCtx, replState->buildUUID);
+ auto status = swIndexBuildEntry.getStatus();
+ // This can occur when no vote got received and stepup tries to check if commit quorum is
+ // satisfied.
+ if (status == ErrorCodes::NoMatchingDocument)
+ return;
+ invariant(status.isOK());
+
+ auto indexBuildEntry = swIndexBuildEntry.getValue();
+
+ auto voteMemberList = indexBuildEntry.getCommitReadyMembers();
+ invariant(voteMemberList);
+ int voteCount = voteMemberList->size();
+
+ auto commitQuorum = indexBuildEntry.getCommitQuorum();
+ int requiredQuorumCount = commitQuorum.numNodes;
+ if (commitQuorum.mode == CommitQuorumOptions::kMajority) {
+ requiredQuorumCount =
+ repl::ReplicationCoordinator::get(opCtx)->getConfig().getWriteMajority();
+ }
+
+ if (voteCount >= requiredQuorumCount) {
+ LOGV2(3856201,
+ "Index build Commit Quorum Satisfied: {indexBuildEntry}",
+ "indexBuildEntry"_attr = indexBuildEntry.toBSON());
+ sendCommitQuorumSatisfiedSignal();
+ }
+}
+
+bool IndexBuildsCoordinatorMongod::_checkVoteCommitIndexCmdSucceeded(const BSONObj& response) {
+ auto commandStatus = getStatusFromCommandResult(response);
+ auto wcStatus = getWriteConcernStatusFromCommandResult(response);
+ if (commandStatus.isOK() && wcStatus.isOK()) {
+ return true;
+ }
+ LOGV2(3856202,
+ "'voteCommitIndexBuild' command failed with response : {response}",
+ "response"_attr = response);
+ return false;
+}
+
+void IndexBuildsCoordinatorMongod::_signalPrimaryForCommitReadiness(
+ OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->getSettings().usingReplSets()) {
+ // Standalones does not support commit quorum.
+ return;
+ }
+
+ if (!enableIndexBuildCommitQuorum) {
+ // No need to vote instead directly signal primary to commit the index build.
+ invariant(opCtx->lockState()->isRSTLLocked());
+ const NamespaceStringOrUUID dbAndUUID(replState->dbName, replState->collectionUUID);
+ if (replCoord->canAcceptWritesFor(opCtx, dbAndUUID)) {
+ _signalIfCommitQuorumIsSatisfied(opCtx, replState);
+ }
+ return;
+ }
+
+ // Yield locks and storage engine resources before blocking.
+ opCtx->recoveryUnit()->abandonSnapshot();
+ Lock::TempRelease release(opCtx->lockState());
+ invariant(!opCtx->lockState()->isRSTLLocked());
+
+ Backoff exponentialBackoff(Seconds(1), Seconds(2));
+
+ auto onRemoteCmdScheduled = [&](executor::TaskExecutor::CallbackHandle handle) {
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ auto future = replState->waitForNextAction->getFuture();
+ // Don't set the callback handle if we have been signaled with kRollbackAbort.
+ // Otherwise, it can violate liveness property. Consider a case, where the bgsync
+ // thread has signaled aborted and waits for the secondary indexBuildCoordinator
+ // thread to join. But, the indexBuildCoordinator thread will be waiting for the
+ // remote "voteCommitIndexBuild" command's response. And, the primary will be
+ // waiting for 'voteCommitIndexBuild' command's write to be majority replicated. But,
+ // gets stuck waiting for the rollback node to transition out to secondary.
+ if (future.isReady() && future.get(opCtx) == IndexBuildAction::kRollbackAbort) {
+ replCoord->cancelCbkHandle(handle);
+ } else {
+ invariant(!replState->voteCmdCbkHandle.isValid());
+ replState->voteCmdCbkHandle = handle;
+ }
+ };
+
+ auto onRemoteCmdComplete = [&](executor::TaskExecutor::CallbackHandle) {
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ replState->voteCmdCbkHandle = executor::TaskExecutor::CallbackHandle();
+ };
+
+ auto needToVote = [&]() -> bool {
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ // Needs comment.
+ return !replState->waitForNextAction->getFuture().isReady() ? true : false;
+ };
+
+ auto convertToNonFatalStatus = [&](Status origStatus) -> Status {
+ auto errCode = ErrorCodes::InterruptedAtShutdown;
+
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ if (replState->indexBuildState.isAbortPrepared()) {
+ errCode = ErrorCodes::IndexBuildAborted;
+ }
+
+ return Status{errCode, origStatus.reason()};
+ };
+
+
+ // Retry 'voteCommitIndexBuild' command on error until we have been signaled either with commit
+ // or abort. This way, we can make sure majority of nodes will never stop voting and wait for
+ // commit or abort signal until they have received commit or abort signal.
+ while (needToVote()) {
+ // Don't hammer the network.
+ sleepFor(exponentialBackoff.nextSleep());
+ // When index build started during startup recovery can try to get it's address when
+ // rsConfig is uninitialized. So, retry till it gets initialized. Also, it's important, when
+ // we retry, we check if we have received commit or abort signal to ensure liveness. For
+ // e.g., consider a case where index build gets restarted on startup recovery and
+ // indexBuildsCoordinator thread waits for valid address w/o checking commit or abort
+ // signal. Now, things can go wrong if we try to replay commitIndexBuild oplog entry for
+ // that index build on startup recovery. Oplog applier would get stuck waiting on the
+ // indexBuildsCoordinator thread. As a result, we won't be able to transition to secondary
+ // state, get stuck on startup state.
+ auto myAddress = replCoord->getMyHostAndPort();
+ if (myAddress.empty()) {
+ continue;
+ }
+ auto const voteCmdRequest =
+ BSON("voteCommitIndexBuild" << replState->buildUUID << "hostAndPort"
+ << myAddress.toString() << "writeConcern"
+ << BSON("w"
+ << "majority"));
+
+ BSONObj voteCmdResponse;
+ try {
+ voteCmdResponse = replCoord->runCmdOnPrimaryAndAwaitResponse(
+ opCtx, "admin", voteCmdRequest, onRemoteCmdScheduled, onRemoteCmdComplete);
+ } catch (DBException& ex) {
+ if (ex.isA<ErrorCategory::ShutdownError>() ||
+ ex.isA<ErrorCategory::CancelationError>()) {
+ // This includes error like ErrorCodes::CallbackCanceled,
+ // ErrorCodes::ShutdownInProgress We might have either received
+ // ErrorCodes::CallbackCanceled due to rollback or shutdown. converting the status
+ // to non-fatal
+ uassertStatusOK(convertToNonFatalStatus(ex.toStatus()));
+ }
+ // All other error including network errors should be retried.
+ continue;
+ }
+
+ // Command error and write concern error have to be retried.
+ if (_checkVoteCommitIndexCmdSucceeded(voteCmdResponse)) {
+ break;
+ }
+ }
+ return;
+}
+
+Timestamp IndexBuildsCoordinatorMongod::_waitForNextIndexBuildAction(
+ OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) {
+ Timestamp commitIndexBuildTimestamp;
+
+ invariant(replState->protocol == IndexBuildProtocol::kTwoPhase);
+
+ // standalones doesn't need to wait for commit or abort index build oplog entry.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->getSettings().usingReplSets()) {
+ return commitIndexBuildTimestamp;
+ };
+
+ // Yield locks and storage engine resources before blocking.
+ opCtx->recoveryUnit()->abandonSnapshot();
+ Lock::TempRelease release(opCtx->lockState());
+
+ LOGV2(3856203,
+ "Index build waiting for next action before completing final phase: {buildUUID}",
+ "buildUUID"_attr = replState->buildUUID);
+
+ while (true) {
+ // Future wait should ignore state transition.
+ invariant(!opCtx->lockState()->isRSTLLocked(),
+ str::stream()
+ << "failed to yield locks for index build while waiting for commit or abort: "
+ << replState->buildUUID);
+
+ // future wait should get interrupted if the node shutdowns.
+ const auto nextAction = replState->waitForNextAction->getFuture().get(opCtx);
+ LOGV2(3856204,
+ "Index build received signal for build uuid: {buildUUID} , action: {action}",
+ "buildUUID"_attr = replState->buildUUID,
+ "action"_attr = _indexBuildActionToString(nextAction));
+
+ bool needsToRetryWait = false;
+
+ // Reacquire RSTL lock
+ repl::ReplicationStateTransitionLockGuard rstl(opCtx, MODE_IX);
+ const NamespaceStringOrUUID dbAndUUID(replState->dbName, replState->collectionUUID);
+ auto isMaster = replCoord->canAcceptWritesFor(opCtx, dbAndUUID);
+
+ stdx::unique_lock<Latch> lk(replState->mutex);
+ switch (nextAction) {
+ case IndexBuildAction::kOplogCommit:
+ // Sanity check
+ // This signal can be received during primary (drain phase), secondary,
+ // startup( startup recovery) and startup2 (initial sync).
+ invariant(!isMaster && replState->indexBuildState.isCommitPrepared(),
+ str::stream()
+ << "Index build: " << replState->buildUUID
+ << ", index build state: " << replState->indexBuildState.toString());
+ invariant(replState->indexBuildState.getTimestamp(),
+ replState->buildUUID.toString());
+ // set the commit timestamp
+ commitIndexBuildTimestamp = replState->indexBuildState.getTimestamp().get();
+ LOGV2(3856205,
+ "Committing index build",
+ "buildUUID"_attr = replState->buildUUID,
+ "commitTimestamp"_attr = replState->indexBuildState.getTimestamp().get(),
+ "collectionUUID"_attr = replState->collectionUUID);
+ break;
+ case IndexBuildAction::kOplogAbort:
+ // Sanity check
+ // This signal can be received during primary (drain phase), secondary,
+ // startup( startup recovery) and startup2 (initial sync).
+ invariant(!isMaster && replState->indexBuildState.isAbortPrepared(),
+ str::stream()
+ << "Index build: " << replState->buildUUID
+ << ", index build state: " << replState->indexBuildState.toString());
+ invariant(replState->indexBuildState.getTimestamp() &&
+ replState->indexBuildState.getAbortReason(),
+ replState->buildUUID.toString());
+ LOGV2(3856206,
+ "Aborting index build",
+ "buildUUID"_attr = replState->buildUUID,
+ "abortTimestamp"_attr = replState->indexBuildState.getTimestamp().get(),
+ "abortReason"_attr = replState->indexBuildState.getAbortReason().get(),
+ "collectionUUID"_attr = replState->collectionUUID);
+ case IndexBuildAction::kRollbackAbort:
+ // Currently, We abort the index build before transitioning the state to rollback.
+ // So, we can check if the node state is rollback.
+ break;
+ case IndexBuildAction::kPrimaryAbort:
+ // There are chances when the index build got aborted, it only existed in the
+ // coordinator, So, we missed marking the index build aborted on manager. So, it's
+ // important, we exit from here if we are still primary. Otherwise, the index build
+ // gets committed, though our index build was marked aborted.
+ if (isMaster) {
+ uassertStatusOK(Status(
+ ErrorCodes::IndexBuildAborted,
+ str::stream()
+ << "Index build aborted for index build: " << replState->buildUUID
+ << " , abort reason:"
+ << replState->indexBuildState.getAbortReason().get_value_or("")));
+ }
+ case IndexBuildAction::kCommitQuorumSatisfied:
+ if (!isMaster) {
+ // Reset the promise as the node has stepped down,
+ // wait for the new primary to coordinate the index build and send the new
+ // signal/action.
+ LOGV2(3856207,
+ "No longer primary, so will be waiting again for next action before "
+ "completing final phase: {buildUUID}",
+ "buildUUID"_attr = replState->buildUUID);
+ replState->waitForNextAction =
+ std::make_unique<SharedPromise<IndexBuildAction>>();
+ needsToRetryWait = true;
+ }
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+
+ if (!needsToRetryWait) {
+ break;
+ }
+ }
+ return commitIndexBuildTimestamp;
}
Status IndexBuildsCoordinatorMongod::setCommitQuorum(OperationContext* opCtx,
diff --git a/src/mongo/db/index_builds_coordinator_mongod.h b/src/mongo/db/index_builds_coordinator_mongod.h
index 20691de7751..46ce97a3c8c 100644
--- a/src/mongo/db/index_builds_coordinator_mongod.h
+++ b/src/mongo/db/index_builds_coordinator_mongod.h
@@ -78,7 +78,9 @@ public:
IndexBuildProtocol protocol,
IndexBuildOptions indexBuildOptions) override;
- Status voteCommitIndexBuild(const UUID& buildUUID, const HostAndPort& hostAndPort) override;
+ Status voteCommitIndexBuild(OperationContext* opCtx,
+ const UUID& buildUUID,
+ const HostAndPort& hostAndPort) override;
Status setCommitQuorum(OperationContext* opCtx,
const NamespaceString& nss,
@@ -123,6 +125,20 @@ private:
*/
void _refreshReplStateFromPersisted(OperationContext* opCtx, const UUID& buildUUID);
+ /**
+ * Process voteCommitIndexBuild command's response.
+ */
+ bool _checkVoteCommitIndexCmdSucceeded(const BSONObj& response);
+
+ void _signalIfCommitQuorumIsSatisfied(OperationContext* opCtx,
+ std::shared_ptr<ReplIndexBuildState> replState);
+
+ void _signalPrimaryForCommitReadiness(OperationContext* opCtx,
+ std::shared_ptr<ReplIndexBuildState> replState) override;
+
+ Timestamp _waitForNextIndexBuildAction(OperationContext* opCtx,
+ std::shared_ptr<ReplIndexBuildState> replState) override;
+
// Thread pool on which index builds are run.
ThreadPool _threadPool;
};
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index e75669a01cb..377ef343e8a 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -34,6 +34,7 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/catalog/commit_quorum_options.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/rollback.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -99,6 +100,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) = 0;
/**
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 30f460db2a7..f458b0f6477 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -273,12 +273,15 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx,
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) {
BSONObjBuilder oplogEntryBuilder;
oplogEntryBuilder.append("startIndexBuild", nss.coll());
indexBuildUUID.appendToBuilder(&oplogEntryBuilder, "indexBuildUUID");
+ commitQuorum.appendToBuilder(CommitQuorumOptions::kCommitQuorumField, &oplogEntryBuilder);
+
BSONArrayBuilder indexesArr(oplogEntryBuilder.subarrayStart("indexes"));
for (auto indexDoc : indexes) {
indexesArr.append(indexDoc);
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 51d6ebd6c99..9019a253acd 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -52,6 +52,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) final;
void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final;
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index e197b34b70c..b66303045a6 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -144,8 +144,9 @@ TEST_F(OpObserverTest, StartIndexBuildExpectedOplogEntry) {
{
AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
+ CommitQuorumOptions commitQuorum(1);
opObserver.onStartIndexBuild(
- opCtx.get(), nss, uuid, indexBuildUUID, specs, false /*fromMigrate*/);
+ opCtx.get(), nss, uuid, indexBuildUUID, specs, commitQuorum, false /*fromMigrate*/);
wunit.commit();
}
@@ -153,6 +154,7 @@ TEST_F(OpObserverTest, StartIndexBuildExpectedOplogEntry) {
BSONObjBuilder startIndexBuildBuilder;
startIndexBuildBuilder.append("startIndexBuild", nss.coll());
indexBuildUUID.appendToBuilder(&startIndexBuildBuilder, "indexBuildUUID");
+ startIndexBuildBuilder.append("commitQuorum", 1);
BSONArrayBuilder indexesArr(startIndexBuildBuilder.subarrayStart("indexes"));
indexesArr.append(specX);
indexesArr.append(specA);
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index 5c5816e7c0b..0a1542ea50a 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -46,6 +46,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) override {}
void onStartIndexBuildSinglePhase(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 2470baa74cc..ec7fe1908c1 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -73,10 +73,12 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) override {
ReservedTimes times{opCtx};
for (auto& o : _observers) {
- o->onStartIndexBuild(opCtx, nss, collUUID, indexBuildUUID, indexes, fromMigrate);
+ o->onStartIndexBuild(
+ opCtx, nss, collUUID, indexBuildUUID, indexes, commitQuorum, fromMigrate);
}
}
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index e10b9cf4d33..11db337f35c 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -41,6 +41,7 @@
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/split_horizon.h"
#include "mongo/db/repl/sync_source_selector.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/rpc/topology_version_gen.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -1007,6 +1008,28 @@ public:
*/
virtual HostAndPort getCurrentPrimaryHostAndPort() const = 0;
+ /*
+ * Cancels the callback referenced in the given callback handle.
+ * This function expects the activeHandle to be valid.
+ */
+ virtual void cancelCbkHandle(executor::TaskExecutor::CallbackHandle activeHandle) = 0;
+
+ using OnRemoteCmdScheduledFn = std::function<void(executor::TaskExecutor::CallbackHandle)>;
+ using OnRemoteCmdCompleteFn = std::function<void(executor::TaskExecutor::CallbackHandle)>;
+ /**
+ * Runs the given command 'cmdObj' on primary and waits till the response for that command is
+ * received. If the node is primary, then the command will be executed using DBDirectClient to
+ * avoid tcp network calls. Otherwise, the node will execute the remote command using the repl
+ * task executor (AsyncDBClient).
+ * - 'OnRemoteCmdScheduled' will be called once the remote command is scheduled.
+ * - 'OnRemoteCmdComplete' will be called once the response for the remote command is received.
+ */
+ virtual BSONObj runCmdOnPrimaryAndAwaitResponse(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ OnRemoteCmdScheduledFn onRemoteCmdScheduled,
+ OnRemoteCmdCompleteFn onRemoteCmdComplete) = 0;
+
protected:
ReplicationCoordinator();
};
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 1f460a3fdbc..235128a2656 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/replication_state_transition_lock_guard.h"
#include "mongo/db/curop_failpoint_helpers.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/kill_sessions_local.h"
@@ -2105,6 +2106,90 @@ HostAndPort ReplicationCoordinatorImpl::getCurrentPrimaryHostAndPort() const {
return primary ? primary->getHostAndPort() : HostAndPort();
}
+void ReplicationCoordinatorImpl::cancelCbkHandle(CallbackHandle activeHandle) {
+ _replExecutor->cancel(activeHandle);
+}
+
+BSONObj ReplicationCoordinatorImpl::_runCmdOnSelfOnAlternativeClient(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
+
+ auto client = opCtx->getServiceContext()->makeClient("DBDirectClientCmd");
+ // We want the command's opCtx that gets executed via DBDirectClient to be interruptible
+ // so that we don't block state transitions. Callers of this function might run opCtx
+ // in an uninterruptible mode. To be on safer side, run the command in AlternativeClientRegion,
+ // to make sure that the command's opCtx is interruptible.
+ AlternativeClientRegion acr(client);
+ auto uniqueNewOpCtx = cc().makeOperationContext();
+ {
+ stdx::lock_guard<Client> lk(cc());
+ cc().setSystemOperationKillable(lk);
+ }
+
+ DBDirectClient dbClient(uniqueNewOpCtx.get());
+ const auto commandResponse = dbClient.runCommand(OpMsgRequest::fromDBAndBody(dbName, cmdObj));
+
+ return commandResponse->getCommandReply();
+}
+
+BSONObj ReplicationCoordinatorImpl::runCmdOnPrimaryAndAwaitResponse(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ OnRemoteCmdScheduledFn onRemoteCmdScheduled,
+ OnRemoteCmdCompleteFn onRemoteCmdComplete) {
+ // Sanity check
+ invariant(!opCtx->lockState()->isRSTLLocked());
+
+ repl::ReplicationStateTransitionLockGuard rstl(opCtx, MODE_IX);
+
+ if (getMemberState().primary()) {
+ if (canAcceptWritesForDatabase(opCtx, dbName)) {
+ // Run command using DBDirectClient to avoid tcp connection.
+ return _runCmdOnSelfOnAlternativeClient(opCtx, dbName, cmdObj);
+ }
+ // Node is primary but it's not in a state to accept non-local writes because it might be in
+ // the catchup or draining phase. So, try releasing and reacquiring RSTL lock so that we
+ // give chance for the node to finish executing signalDrainComplete() and become master.
+ uassertStatusOK(
+ Status{ErrorCodes::NotMaster, "Node is in primary state but can't accept writes."});
+ }
+
+ // Node is not primary, so we will run the remote command via AsyncDBClient. To use
+ // AsyncDBClient, we will be using repl task executor.
+ const auto primary = getCurrentPrimaryHostAndPort();
+ if (primary.empty()) {
+ uassertStatusOK(Status{ErrorCodes::CommandFailed, "Primary is unknown/down."});
+ }
+
+ executor::RemoteCommandRequest request(primary, dbName, cmdObj, nullptr);
+ executor::RemoteCommandResponse cbkResponse(
+ Status{ErrorCodes::InternalError, "Uninitialized value"});
+
+ // Schedule the remote command.
+ auto&& scheduleResult = _replExecutor->scheduleRemoteCommand(
+ request, [&cbkResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbk) {
+ cbkResponse = cbk.response;
+ });
+
+ uassertStatusOK(scheduleResult.getStatus());
+ CallbackHandle cbkHandle = scheduleResult.getValue();
+
+ onRemoteCmdScheduled(cbkHandle);
+ // Before, we wait for the remote command response, it's important we release the rstl lock to
+ // ensure that the state transition can happen during the wait period. Else, it can lead to
+ // deadlock. Consider a case, where the remote command waits for majority write concern. But, we
+ // are not able to transition our state to secondary (steady state replication).
+ rstl.release();
+
+ // Wait for the response in an interruptible mode.
+ _replExecutor->wait(cbkHandle, opCtx);
+
+ onRemoteCmdComplete(cbkHandle);
+ uassertStatusOK(cbkResponse.status);
+ return cbkResponse.data;
+}
+
void ReplicationCoordinatorImpl::_killConflictingOpsOnStepUpAndStepDown(
AutoGetRstlForStepUpStepDown* arsc, ErrorCodes::Error reason) {
const OperationContext* rstlOpCtx = arsc->getOpCtx();
@@ -2644,7 +2729,11 @@ int ReplicationCoordinatorImpl::getMyId() const {
}
HostAndPort ReplicationCoordinatorImpl::getMyHostAndPort() const {
- stdx::lock_guard<Latch> lock(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
+
+ if (_selfIndex == -1) {
+ return HostAndPort();
+ }
return _rsConfig.getMemberAt(_selfIndex).getHostAndPort();
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index a45bc225ff0..b7b474fc786 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -364,6 +364,14 @@ public:
virtual HostAndPort getCurrentPrimaryHostAndPort() const override;
+ void cancelCbkHandle(executor::TaskExecutor::CallbackHandle activeHandle) override;
+
+ BSONObj runCmdOnPrimaryAndAwaitResponse(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ OnRemoteCmdScheduledFn onRemoteCmdScheduled,
+ OnRemoteCmdCompleteFn onRemoteCmdComplete) override;
+
// ================== Test support API ===================
/**
@@ -1385,6 +1393,13 @@ private:
*/
int64_t _nextRandomInt64_inlock(int64_t limit);
+ /**
+ * Runs the command using DBDirectClient and returns the response received for that command.
+ */
+ BSONObj _runCmdOnSelfOnAlternativeClient(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj);
+
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index f6983bcfe68..9a9fe7dc7f3 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -603,5 +603,18 @@ HostAndPort ReplicationCoordinatorMock::getCurrentPrimaryHostAndPort() const {
return HostAndPort();
}
+void ReplicationCoordinatorMock::cancelCbkHandle(
+ executor::TaskExecutor::CallbackHandle activeHandle) {
+ MONGO_UNREACHABLE;
+}
+
+BSONObj ReplicationCoordinatorMock::runCmdOnPrimaryAndAwaitResponse(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ OnRemoteCmdScheduledFn onRemoteCmdScheduled,
+ OnRemoteCmdCompleteFn onRemoteCmdComplete) {
+ return BSON("ok" << 1);
+}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index ddecc044a53..ad82eb065ce 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -343,6 +343,13 @@ public:
virtual HostAndPort getCurrentPrimaryHostAndPort() const override;
+ void cancelCbkHandle(executor::TaskExecutor::CallbackHandle activeHandle) override;
+ BSONObj runCmdOnPrimaryAndAwaitResponse(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ OnRemoteCmdScheduledFn onRemoteCmdScheduled,
+ OnRemoteCmdCompleteFn onRemoteCmdComplete) override;
+
private:
ServiceContext* const _service;
ReplSettings _settings;
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index 0cdebd3464e..480a233c240 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -503,5 +503,19 @@ HostAndPort ReplicationCoordinatorNoOp::getCurrentPrimaryHostAndPort() const {
MONGO_UNREACHABLE;
}
+void ReplicationCoordinatorNoOp::cancelCbkHandle(
+ executor::TaskExecutor::CallbackHandle activeHandle) {
+ MONGO_UNREACHABLE;
+}
+
+BSONObj ReplicationCoordinatorNoOp::runCmdOnPrimaryAndAwaitResponse(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ OnRemoteCmdScheduledFn onRemoteCmdScheduled,
+ OnRemoteCmdCompleteFn onRemoteCmdComplete) {
+ MONGO_UNREACHABLE;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index 329d5b5fe14..452a96161c7 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -197,7 +197,6 @@ public:
Status checkReplEnabledForCommand(BSONObjBuilder*) final;
-
HostAndPort chooseNewSyncSource(const OpTime&) final;
void blacklistSyncSource(const HostAndPort&, Date_t) final;
@@ -279,6 +278,14 @@ public:
HostAndPort getCurrentPrimaryHostAndPort() const override;
+ void cancelCbkHandle(executor::TaskExecutor::CallbackHandle activeHandle) override;
+
+ BSONObj runCmdOnPrimaryAndAwaitResponse(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ OnRemoteCmdScheduledFn onRemoteCmdScheduled,
+ OnRemoteCmdCompleteFn onRemoteCmdComplete) override;
+
private:
ServiceContext* const _service;
};
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 0cf86695206..b24e22bb12b 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -101,7 +101,7 @@ OplogInterfaceMock::Operation makeStartIndexBuildOplogEntry(Collection* collecti
BSONObj spec,
int time) {
auto entry = BSON("startIndexBuild" << collection->ns().coll() << "indexBuildUUID" << buildUUID
- << "indexes" << BSON_ARRAY(spec));
+ << "indexes" << BSON_ARRAY(spec) << "commitQuorum" << 1);
return std::make_pair(BSON("ts" << Timestamp(Seconds(time), 0) << "op"
<< "c"
@@ -1061,7 +1061,7 @@ TEST_F(RSRollbackTest, RollbackCommitIndexBuild) {
// Kill the index build we just restarted so the fixture can shut down.
IndexBuildsCoordinator::get(_opCtx.get())
- ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, Timestamp(), "");
+ ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, IndexBuildAction::kRollbackAbort);
}
TEST_F(RSRollbackTest, RollbackAbortIndexBuild) {
@@ -1105,7 +1105,7 @@ TEST_F(RSRollbackTest, RollbackAbortIndexBuild) {
// Kill the index build we just restarted so the fixture can shut down.
IndexBuildsCoordinator::get(_opCtx.get())
- ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, Timestamp(), "");
+ ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, IndexBuildAction::kRollbackAbort);
}
TEST_F(RSRollbackTest, AbortedIndexBuildsAreRestarted) {
@@ -1154,7 +1154,7 @@ TEST_F(RSRollbackTest, AbortedIndexBuildsAreRestarted) {
// Kill the index build we just restarted so the fixture can shut down.
IndexBuildsCoordinator::get(_opCtx.get())
- ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, Timestamp(), "");
+ ->abortIndexBuildByBuildUUID(_opCtx.get(), buildUUID, IndexBuildAction::kRollbackAbort);
}
TEST_F(RSRollbackTest, AbortedIndexBuildsAreNotRestartedWhenStartIsRolledBack) {
diff --git a/src/mongo/db/repl_index_build_state.h b/src/mongo/db/repl_index_build_state.h
index e75ca9ee41c..f5f7023b660 100644
--- a/src/mongo/db/repl_index_build_state.h
+++ b/src/mongo/db/repl_index_build_state.h
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/commit_quorum_options.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/future.h"
#include "mongo/util/net/hostandport.h"
@@ -60,6 +61,152 @@ enum class IndexBuildProtocol {
kTwoPhase
};
+// Indicates the type of abort or commit signal that will be received by primary and secondaries.
+enum class IndexBuildAction {
+ /**
+ * Does nothing. And, we set on shutdown.
+ */
+ kNoAction,
+ /**
+ * Commit signal set by oplog applier.
+ */
+ kOplogCommit,
+ /**
+ * Abort signal set by oplog applier.
+ */
+ kOplogAbort,
+ /**
+ * Abort signal set on rollback.
+ */
+ kRollbackAbort,
+ /**
+ * Abort signal set by createIndexes cmd or by drop databases/collections/indexes cmds
+ */
+ kPrimaryAbort,
+ /**
+ * Commit signal set by "voteCommitIndexBuild" cmd and step up.
+ */
+ kCommitQuorumSatisfied
+};
+
+/**
+ * Represents the index build state.
+ * Valid State transition for primary:
+ * ===================================
+ * kNone ---> kAborted.
+ * kNone ---> kPrepareAbort ---> kAborted.
+ *
+ * Valid State transition for secondaries:
+ * =======================================
+ * kNone ---> kPrepareCommit ---> kCommitted.
+ * kNone ---> kPrepareAbort ---> kAborted.
+ */
+class IndexBuildState {
+public:
+ enum StateFlag {
+ kNone = 1 << 0,
+ /**
+ * Below state indicates that indexBuildscoordinator thread was externally asked either to
+ * commit or abort. Oplog applier, rollback, createIndexes command and drop
+ * databases/collections/indexes cmds can change this state to kPrepareCommit or
+ * kPrepareAbort.
+ */
+ kPrepareCommit = 1 << 1,
+ kPrepareAbort = 1 << 2,
+ /**
+ * Below state indicates that index build was successfully able to commit or abort. And,
+ * it's yet to generate the commitIndexBuild or abortIndexBuild oplog entry respectively.
+ */
+ kCommitted = 1 << 3,
+ kAborted = 1 << 4
+ };
+
+ using StateSet = int;
+ bool isSet(StateSet stateSet) const {
+ return _state & stateSet;
+ }
+
+ bool checkIfValidTransition(StateFlag newState) {
+ if (_state == kNone || (_state == kPrepareCommit && newState == kCommitted) ||
+ (_state == kPrepareAbort && newState == kAborted)) {
+ return true;
+ }
+ return false;
+ }
+
+ void setState(StateFlag state,
+ bool skipCheck,
+ boost::optional<Timestamp> timestamp = boost::none,
+ boost::optional<std::string> abortReason = boost::none) {
+ if (!skipCheck) {
+ invariant(checkIfValidTransition(state),
+ str::stream() << "current state :" << toString(_state)
+ << ", new state: " << toString(state));
+ }
+ _state = state;
+ if (timestamp)
+ _timestamp = timestamp;
+ if (abortReason) {
+ invariant(_state == kPrepareAbort);
+ _abortReason = abortReason;
+ }
+ }
+
+ bool isCommitPrepared() const {
+ return _state == kPrepareCommit;
+ }
+
+ bool isCommitted() const {
+ return _state == kCommitted;
+ }
+
+ bool isAbortPrepared() const {
+ return _state == kPrepareAbort;
+ }
+
+ bool isAborted() const {
+ return _state == kAborted;
+ }
+
+ boost::optional<Timestamp> getTimestamp() const {
+ return _timestamp;
+ }
+
+ boost::optional<std::string> getAbortReason() const {
+ return _abortReason;
+ }
+
+ std::string toString() const {
+ return toString(_state);
+ }
+
+ static std::string toString(StateFlag state) {
+ switch (state) {
+ case kNone:
+ return "in-progress";
+ case kPrepareCommit:
+ return "Prepare commit";
+ case kCommitted:
+ return "Committed";
+ case kPrepareAbort:
+ return "Prepare abort";
+ case kAborted:
+ return "Aborted";
+ }
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ // Represents the index build state.
+ StateFlag _state = kNone;
+ // Timestamp will be populated only if the node is secondary.
+ // It represents the commit or abort timestamp communicated via
+ // commitIndexBuild and abortIndexBuild oplog entry.
+ boost::optional<Timestamp> _timestamp;
+ // Reason for abort reason.
+ boost::optional<std::string> _abortReason;
+};
+
/**
* Tracks the cross replica set progress of a particular index build identified by a build UUID.
*
@@ -80,7 +227,11 @@ struct ReplIndexBuildState {
indexNames(extractIndexNames(specs)),
indexSpecs(specs),
protocol(protocol),
- commitQuorum(commitQuorum) {}
+ commitQuorum(commitQuorum) {
+ if (IndexBuildProtocol::kTwoPhase == protocol) {
+ waitForNextAction = std::make_unique<SharedPromise<IndexBuildAction>>();
+ }
+ }
// Uniquely identifies this index build across replica set members.
const UUID buildUUID;
@@ -127,20 +278,14 @@ struct ReplIndexBuildState {
// SharedSemiFuture(s).
SharedPromise<IndexCatalogStats> sharedPromise;
- // Set to true on a secondary on receipt of a commitIndexBuild oplog entry.
- bool isCommitReady = false;
- Timestamp commitTimestamp;
+ // Primary and secondaries gets their commit or abort signal via this promise future pair.
+ std::unique_ptr<SharedPromise<IndexBuildAction>> waitForNextAction;
- // There is a period of time where the index build is registered on the coordinator, but an
- // index builder does not yet exist. Since a signal cannot be set on the index builder at that
- // time, it must be saved here.
- bool aborted = false;
- Timestamp abortTimestamp;
- std::string abortReason = "";
+ // Maintains the state of the index build.
+ IndexBuildState indexBuildState;
- // The coordinator for the index build will wait upon this when awaiting an external signal,
- // such as commit or commit readiness signals.
- stdx::condition_variable condVar;
+ // Represents the callback handle for scheduled remote command "voteCommitIndexBuild".
+ executor::TaskExecutor::CallbackHandle voteCmdCbkHandle;
private:
std::vector<std::string> extractIndexNames(const std::vector<BSONObj>& specs) {
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index d274dfef21a..40b24d10bd6 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -56,6 +56,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) override {}
void onStartIndexBuildSinglePhase(OperationContext* opCtx,
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index 939125ba75f..476098424d0 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -471,6 +471,7 @@ void ShardServerOpObserver::onStartIndexBuild(OperationContext* opCtx,
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) {
abortOngoingMigrationIfNeeded(opCtx, nss);
};
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 2b16ac88150..f3e659b437d 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -57,6 +57,7 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
+ const CommitQuorumOptions& commitQuorum,
bool fromMigrate) override;
void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) override;
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 485a1ae4f4b..520a2229470 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -430,13 +430,21 @@ env.Library(
'$BUILD_DIR/mongo/db/server_options_core',
],
)
+env.Library(
+ target='two_phase_index_build_knobs_idl',
+ source=[
+ env.Idlc('two_phase_index_build_knobs.idl')[0],
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/idl/server_parameter',
+ ],
+)
env.Library(
target='storage_engine_impl',
source=[
'storage_engine_impl.cpp',
'kv/temporary_kv_record_store.cpp', # TODO: SERVER-41892 Avoid source under kv sub-directory
- env.Idlc('two_phase_index_build_knobs.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
@@ -451,6 +459,7 @@ env.Library(
'$BUILD_DIR/mongo/db/storage/storage_repair_observer',
'$BUILD_DIR/mongo/db/catalog/collection_catalog_helper',
'$BUILD_DIR/mongo/idl/server_parameter',
+ 'two_phase_index_build_knobs_idl',
],
)
diff --git a/src/mongo/db/storage/two_phase_index_build_knobs.idl b/src/mongo/db/storage/two_phase_index_build_knobs.idl
index 1b29ea40481..d23c7670603 100644
--- a/src/mongo/db/storage/two_phase_index_build_knobs.idl
+++ b/src/mongo/db/storage/two_phase_index_build_knobs.idl
@@ -40,9 +40,9 @@ server_parameters:
default: true
# Commit quorum option is supported only for two phase index builds.
- enableIndexBuildMajorityCommitQuorum:
- description: "Support for using majority commit quorum for two phase index builds."
+ enableIndexBuildCommitQuorum:
+ description: "Support for using commit quorum for two phase index builds."
set_at: startup
cpp_vartype: bool
- cpp_varname: "enableIndexBuildMajorityCommitQuorum"
+ cpp_varname: "enableIndexBuildCommitQuorum"
default: false