diff options
Diffstat (limited to 'src/mongo/db/repl_index_build_state.cpp')
-rw-r--r-- | src/mongo/db/repl_index_build_state.cpp | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/src/mongo/db/repl_index_build_state.cpp b/src/mongo/db/repl_index_build_state.cpp index a7617172312..c1fda23d4dc 100644 --- a/src/mongo/db/repl_index_build_state.cpp +++ b/src/mongo/db/repl_index_build_state.cpp @@ -27,10 +27,15 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage + #include "mongo/platform/basic.h" #include "mongo/db/repl_index_build_state.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/logv2/log.h" + namespace mongo { namespace { @@ -68,6 +73,27 @@ bool checkIfValidTransition(IndexBuildState::StateFlag currentState, } // namespace +std::string indexBuildActionToString(IndexBuildAction action) { + if (action == IndexBuildAction::kNoAction) { + return "No action"; + } else if (action == IndexBuildAction::kOplogCommit) { + return "Oplog commit"; + } else if (action == IndexBuildAction::kOplogAbort) { + return "Oplog abort"; + } else if (action == IndexBuildAction::kInitialSyncAbort) { + return "Initial sync abort"; + } else if (action == IndexBuildAction::kRollbackAbort) { + return "Rollback abort"; + } else if (action == IndexBuildAction::kPrimaryAbort) { + return "Primary abort"; + } else if (action == IndexBuildAction::kSinglePhaseCommit) { + return "Single-phase commit"; + } else if (action == IndexBuildAction::kCommitQuorumSatisfied) { + return "Commit quorum Satisfied"; + } + MONGO_UNREACHABLE; +} + void IndexBuildState::setState(StateFlag state, bool skipCheck, boost::optional<Timestamp> timestamp, @@ -102,6 +128,148 @@ ReplIndexBuildState::ReplIndexBuildState(const UUID& indexBuildUUID, commitQuorumLock.emplace(indexBuildUUID.toString()); } +void ReplIndexBuildState::start(OperationContext* opCtx) { + stdx::unique_lock<Latch> lk(mutex); + _opId = opCtx->getOpID(); + indexBuildState.setState(IndexBuildState::kInProgress, false /* skipCheck */); +} + +bool ReplIndexBuildState::isAborted() const { + stdx::unique_lock<Latch> lk(mutex); + return indexBuildState.isAborted(); +} + +std::string ReplIndexBuildState::getAbortReason() const { + stdx::unique_lock<Latch> lk(mutex); + invariant(indexBuildState.isAborted(), + str::stream() << "Index build: " << buildUUID + << ", index build state: " << indexBuildState.toString()); + auto reason = indexBuildState.getAbortReason(); + invariant(reason, str::stream() << buildUUID); + return *reason; +} + +void ReplIndexBuildState::setCommitQuorumSatisfied(OperationContext* opCtx) { + stdx::unique_lock<Latch> lk(mutex); + if (!waitForNextAction->getFuture().isReady()) { + _setSignalAndCancelVoteRequestCbkIfActive( + lk, opCtx, IndexBuildAction::kCommitQuorumSatisfied); + } else { + // This implies we already got a commit or abort signal by other ways. This might have + // been signaled earlier with kPrimaryAbort or kCommitQuorumSatisfied. Or, it's also + // possible the node got stepped down and received kOplogCommit/koplogAbort or got + // kRollbackAbort. So, it's ok to skip signaling. + auto action = waitForNextAction->getFuture().get(opCtx); + + LOGV2(3856200, + "Not signaling \"{skippedAction}\" as it was previously signaled with " + "\"{previousAction}\" for index build: {buildUUID}", + "Skipping signaling as it was previously signaled for index build", + "skippedAction"_attr = + indexBuildActionToString(IndexBuildAction::kCommitQuorumSatisfied), + "previousAction"_attr = indexBuildActionToString(action), + "buildUUID"_attr = buildUUID); + } +} + +bool ReplIndexBuildState::tryCommit(OperationContext* opCtx) { + stdx::unique_lock<Latch> lk(mutex); + if (indexBuildState.isSettingUp()) { + // It's possible that the index build thread has not reached the point where it can be + // committed yet. + return false; + } + if (waitForNextAction->getFuture().isReady()) { + // If the future wait were uninterruptible, then shutdown could hang. If the + // IndexBuildsCoordinator thread gets interrupted on shutdown, the oplog applier will hang + // waiting for the promise applying the commitIndexBuild oplog entry. + const auto nextAction = waitForNextAction->getFuture().get(opCtx); + invariant(nextAction == IndexBuildAction::kCommitQuorumSatisfied); + // Retry until the current promise result is consumed by the index builder thread and + // a new empty promise got created by the indexBuildscoordinator thread. + return false; + } + auto skipCheck = _shouldSkipIndexBuildStateTransitionCheck(opCtx); + indexBuildState.setState( + IndexBuildState::kPrepareCommit, skipCheck, opCtx->recoveryUnit()->getCommitTimestamp()); + // Promise can be set only once. + // We can't skip signaling here if a signal is already set because the previous commit or + // abort signal might have been sent to handle for primary case. + _setSignalAndCancelVoteRequestCbkIfActive(lk, opCtx, IndexBuildAction::kOplogCommit); + return true; +} + +ReplIndexBuildState::TryAbortResult ReplIndexBuildState::tryAbort(OperationContext* opCtx, + IndexBuildAction signalAction, + std::string reason) { + stdx::unique_lock<Latch> lk(mutex); + // Wait until the build is done setting up. This indicates that all required state is + // initialized to attempt an abort. + if (indexBuildState.isSettingUp()) { + LOGV2_DEBUG(465605, + 2, + "waiting until index build is done setting up before attempting to abort", + "buildUUID"_attr = buildUUID); + return TryAbortResult::kRetry; + } + if (waitForNextAction->getFuture().isReady()) { + const auto nextAction = waitForNextAction->getFuture().get(opCtx); + invariant(nextAction == IndexBuildAction::kSinglePhaseCommit || + nextAction == IndexBuildAction::kCommitQuorumSatisfied || + nextAction == IndexBuildAction::kPrimaryAbort); + + // Index build coordinator already received a signal to commit or abort. So, it's ok + // to return and wait for the index build to complete if we are trying to signal + // 'kPrimaryAbort'. The index build coordinator will not perform the signaled action + // (i.e, will not commit or abort the index build) only when the node steps down. + // When the node steps down, the caller of this function, dropIndexes/createIndexes + // command (user operation) will also get interrupted. So, we no longer need to + // abort the index build on step down. + if (signalAction == IndexBuildAction::kPrimaryAbort) { + // Indicate if the index build is already being committed or aborted. + if (nextAction == IndexBuildAction::kPrimaryAbort) { + return TryAbortResult::kAlreadyAborted; + } else { + return TryAbortResult::kNotAborted; + } + } + + // Retry until the current promise result is consumed by the index builder thread + // and a new empty promise got created by the indexBuildscoordinator thread. Or, + // until the index build got torn down after index build commit. + return TryAbortResult::kRetry; + } + + LOGV2(4656003, "Aborting index build", "buildUUID"_attr = buildUUID, "error"_attr = reason); + + // Set the state on replState. Once set, the calling thread must complete the abort process. + auto abortTimestamp = + boost::make_optional<Timestamp>(!opCtx->recoveryUnit()->getCommitTimestamp().isNull(), + opCtx->recoveryUnit()->getCommitTimestamp()); + auto skipCheck = _shouldSkipIndexBuildStateTransitionCheck(opCtx); + indexBuildState.setState(IndexBuildState::kAborted, skipCheck, abortTimestamp, reason); + + // Interrupt the builder thread so that it can no longer acquire locks or make progress. + // It is possible that the index build thread may have completed its operation and removed + // itself from the ServiceContext. This may happen in the case of an explicit db.killOp() + // operation or during shutdown. + // During normal operation, the abort logic, initiated through external means such as + // dropIndexes or internally through an indexing error, should have set the state in + // ReplIndexBuildState so that this code would not be reachable as it is no longer necessary + // to interrupt the builder thread here. + auto serviceContext = opCtx->getServiceContext(); + if (auto target = serviceContext->getLockedClient(*_opId)) { + auto targetOpCtx = target->getOperationContext(); + serviceContext->killOperation(target, targetOpCtx, ErrorCodes::IndexBuildAborted); + } + + // Set the signal. Because we have already interrupted the index build, it will not observe + // this signal. We do this so that other observers do not also try to abort the index build. + _setSignalAndCancelVoteRequestCbkIfActive(lk, opCtx, signalAction); + + return TryAbortResult::kContinueAbort; +} + bool ReplIndexBuildState::isResumable() const { stdx::unique_lock<Latch> lk(mutex); return !_lastOpTimeBeforeInterceptors.isNull(); @@ -122,4 +290,24 @@ void ReplIndexBuildState::clearLastOpTimeBeforeInterceptors() { _lastOpTimeBeforeInterceptors = {}; } +bool ReplIndexBuildState::_shouldSkipIndexBuildStateTransitionCheck(OperationContext* opCtx) const { + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord->getSettings().usingReplSets() && protocol == IndexBuildProtocol::kTwoPhase) { + return false; + } + return true; +} + +void ReplIndexBuildState::_setSignalAndCancelVoteRequestCbkIfActive(WithLock lk, + OperationContext* opCtx, + IndexBuildAction signal) { + // set the signal + waitForNextAction->emplaceValue(signal); + // Cancel the callback. + if (voteCmdCbkHandle.isValid()) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + replCoord->cancelCbkHandle(voteCmdCbkHandle); + } +} + } // namespace mongo |