summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl_index_build_state.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl_index_build_state.cpp')
-rw-r--r--src/mongo/db/repl_index_build_state.cpp188
1 files changed, 188 insertions, 0 deletions
diff --git a/src/mongo/db/repl_index_build_state.cpp b/src/mongo/db/repl_index_build_state.cpp
index a7617172312..c1fda23d4dc 100644
--- a/src/mongo/db/repl_index_build_state.cpp
+++ b/src/mongo/db/repl_index_build_state.cpp
@@ -27,10 +27,15 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
+
#include "mongo/platform/basic.h"
#include "mongo/db/repl_index_build_state.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/logv2/log.h"
+
namespace mongo {
namespace {
@@ -68,6 +73,27 @@ bool checkIfValidTransition(IndexBuildState::StateFlag currentState,
} // namespace
+std::string indexBuildActionToString(IndexBuildAction action) {
+ if (action == IndexBuildAction::kNoAction) {
+ return "No action";
+ } else if (action == IndexBuildAction::kOplogCommit) {
+ return "Oplog commit";
+ } else if (action == IndexBuildAction::kOplogAbort) {
+ return "Oplog abort";
+ } else if (action == IndexBuildAction::kInitialSyncAbort) {
+ return "Initial sync abort";
+ } else if (action == IndexBuildAction::kRollbackAbort) {
+ return "Rollback abort";
+ } else if (action == IndexBuildAction::kPrimaryAbort) {
+ return "Primary abort";
+ } else if (action == IndexBuildAction::kSinglePhaseCommit) {
+ return "Single-phase commit";
+ } else if (action == IndexBuildAction::kCommitQuorumSatisfied) {
+ return "Commit quorum Satisfied";
+ }
+ MONGO_UNREACHABLE;
+}
+
void IndexBuildState::setState(StateFlag state,
bool skipCheck,
boost::optional<Timestamp> timestamp,
@@ -102,6 +128,148 @@ ReplIndexBuildState::ReplIndexBuildState(const UUID& indexBuildUUID,
commitQuorumLock.emplace(indexBuildUUID.toString());
}
+void ReplIndexBuildState::start(OperationContext* opCtx) {
+ stdx::unique_lock<Latch> lk(mutex);
+ _opId = opCtx->getOpID();
+ indexBuildState.setState(IndexBuildState::kInProgress, false /* skipCheck */);
+}
+
+bool ReplIndexBuildState::isAborted() const {
+ stdx::unique_lock<Latch> lk(mutex);
+ return indexBuildState.isAborted();
+}
+
+std::string ReplIndexBuildState::getAbortReason() const {
+ stdx::unique_lock<Latch> lk(mutex);
+ invariant(indexBuildState.isAborted(),
+ str::stream() << "Index build: " << buildUUID
+ << ", index build state: " << indexBuildState.toString());
+ auto reason = indexBuildState.getAbortReason();
+ invariant(reason, str::stream() << buildUUID);
+ return *reason;
+}
+
+void ReplIndexBuildState::setCommitQuorumSatisfied(OperationContext* opCtx) {
+ stdx::unique_lock<Latch> lk(mutex);
+ if (!waitForNextAction->getFuture().isReady()) {
+ _setSignalAndCancelVoteRequestCbkIfActive(
+ lk, opCtx, IndexBuildAction::kCommitQuorumSatisfied);
+ } else {
+ // This implies we already got a commit or abort signal by other ways. This might have
+ // been signaled earlier with kPrimaryAbort or kCommitQuorumSatisfied. Or, it's also
+ // possible the node got stepped down and received kOplogCommit/koplogAbort or got
+ // kRollbackAbort. So, it's ok to skip signaling.
+ auto action = waitForNextAction->getFuture().get(opCtx);
+
+ LOGV2(3856200,
+ "Not signaling \"{skippedAction}\" as it was previously signaled with "
+ "\"{previousAction}\" for index build: {buildUUID}",
+ "Skipping signaling as it was previously signaled for index build",
+ "skippedAction"_attr =
+ indexBuildActionToString(IndexBuildAction::kCommitQuorumSatisfied),
+ "previousAction"_attr = indexBuildActionToString(action),
+ "buildUUID"_attr = buildUUID);
+ }
+}
+
+bool ReplIndexBuildState::tryCommit(OperationContext* opCtx) {
+ stdx::unique_lock<Latch> lk(mutex);
+ if (indexBuildState.isSettingUp()) {
+ // It's possible that the index build thread has not reached the point where it can be
+ // committed yet.
+ return false;
+ }
+ if (waitForNextAction->getFuture().isReady()) {
+ // If the future wait were uninterruptible, then shutdown could hang. If the
+ // IndexBuildsCoordinator thread gets interrupted on shutdown, the oplog applier will hang
+ // waiting for the promise applying the commitIndexBuild oplog entry.
+ const auto nextAction = waitForNextAction->getFuture().get(opCtx);
+ invariant(nextAction == IndexBuildAction::kCommitQuorumSatisfied);
+ // Retry until the current promise result is consumed by the index builder thread and
+ // a new empty promise got created by the indexBuildscoordinator thread.
+ return false;
+ }
+ auto skipCheck = _shouldSkipIndexBuildStateTransitionCheck(opCtx);
+ indexBuildState.setState(
+ IndexBuildState::kPrepareCommit, skipCheck, opCtx->recoveryUnit()->getCommitTimestamp());
+ // Promise can be set only once.
+ // We can't skip signaling here if a signal is already set because the previous commit or
+ // abort signal might have been sent to handle for primary case.
+ _setSignalAndCancelVoteRequestCbkIfActive(lk, opCtx, IndexBuildAction::kOplogCommit);
+ return true;
+}
+
+ReplIndexBuildState::TryAbortResult ReplIndexBuildState::tryAbort(OperationContext* opCtx,
+ IndexBuildAction signalAction,
+ std::string reason) {
+ stdx::unique_lock<Latch> lk(mutex);
+ // Wait until the build is done setting up. This indicates that all required state is
+ // initialized to attempt an abort.
+ if (indexBuildState.isSettingUp()) {
+ LOGV2_DEBUG(465605,
+ 2,
+ "waiting until index build is done setting up before attempting to abort",
+ "buildUUID"_attr = buildUUID);
+ return TryAbortResult::kRetry;
+ }
+ if (waitForNextAction->getFuture().isReady()) {
+ const auto nextAction = waitForNextAction->getFuture().get(opCtx);
+ invariant(nextAction == IndexBuildAction::kSinglePhaseCommit ||
+ nextAction == IndexBuildAction::kCommitQuorumSatisfied ||
+ nextAction == IndexBuildAction::kPrimaryAbort);
+
+ // Index build coordinator already received a signal to commit or abort. So, it's ok
+ // to return and wait for the index build to complete if we are trying to signal
+ // 'kPrimaryAbort'. The index build coordinator will not perform the signaled action
+ // (i.e, will not commit or abort the index build) only when the node steps down.
+ // When the node steps down, the caller of this function, dropIndexes/createIndexes
+ // command (user operation) will also get interrupted. So, we no longer need to
+ // abort the index build on step down.
+ if (signalAction == IndexBuildAction::kPrimaryAbort) {
+ // Indicate if the index build is already being committed or aborted.
+ if (nextAction == IndexBuildAction::kPrimaryAbort) {
+ return TryAbortResult::kAlreadyAborted;
+ } else {
+ return TryAbortResult::kNotAborted;
+ }
+ }
+
+ // Retry until the current promise result is consumed by the index builder thread
+ // and a new empty promise got created by the indexBuildscoordinator thread. Or,
+ // until the index build got torn down after index build commit.
+ return TryAbortResult::kRetry;
+ }
+
+ LOGV2(4656003, "Aborting index build", "buildUUID"_attr = buildUUID, "error"_attr = reason);
+
+ // Set the state on replState. Once set, the calling thread must complete the abort process.
+ auto abortTimestamp =
+ boost::make_optional<Timestamp>(!opCtx->recoveryUnit()->getCommitTimestamp().isNull(),
+ opCtx->recoveryUnit()->getCommitTimestamp());
+ auto skipCheck = _shouldSkipIndexBuildStateTransitionCheck(opCtx);
+ indexBuildState.setState(IndexBuildState::kAborted, skipCheck, abortTimestamp, reason);
+
+ // Interrupt the builder thread so that it can no longer acquire locks or make progress.
+ // It is possible that the index build thread may have completed its operation and removed
+ // itself from the ServiceContext. This may happen in the case of an explicit db.killOp()
+ // operation or during shutdown.
+ // During normal operation, the abort logic, initiated through external means such as
+ // dropIndexes or internally through an indexing error, should have set the state in
+ // ReplIndexBuildState so that this code would not be reachable as it is no longer necessary
+ // to interrupt the builder thread here.
+ auto serviceContext = opCtx->getServiceContext();
+ if (auto target = serviceContext->getLockedClient(*_opId)) {
+ auto targetOpCtx = target->getOperationContext();
+ serviceContext->killOperation(target, targetOpCtx, ErrorCodes::IndexBuildAborted);
+ }
+
+ // Set the signal. Because we have already interrupted the index build, it will not observe
+ // this signal. We do this so that other observers do not also try to abort the index build.
+ _setSignalAndCancelVoteRequestCbkIfActive(lk, opCtx, signalAction);
+
+ return TryAbortResult::kContinueAbort;
+}
+
bool ReplIndexBuildState::isResumable() const {
stdx::unique_lock<Latch> lk(mutex);
return !_lastOpTimeBeforeInterceptors.isNull();
@@ -122,4 +290,24 @@ void ReplIndexBuildState::clearLastOpTimeBeforeInterceptors() {
_lastOpTimeBeforeInterceptors = {};
}
+bool ReplIndexBuildState::_shouldSkipIndexBuildStateTransitionCheck(OperationContext* opCtx) const {
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord->getSettings().usingReplSets() && protocol == IndexBuildProtocol::kTwoPhase) {
+ return false;
+ }
+ return true;
+}
+
+void ReplIndexBuildState::_setSignalAndCancelVoteRequestCbkIfActive(WithLock lk,
+ OperationContext* opCtx,
+ IndexBuildAction signal) {
+ // set the signal
+ waitForNextAction->emplaceValue(signal);
+ // Cancel the callback.
+ if (voteCmdCbkHandle.isValid()) {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ replCoord->cancelCbkHandle(voteCmdCbkHandle);
+ }
+}
+
} // namespace mongo