diff options
author | Ian Boros <ian.boros@mongodb.com> | 2022-01-19 21:21:53 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-19 23:03:19 +0000 |
commit | d8afa17c615c274390899917d19180e374040f3e (patch) | |
tree | e9efc40d03b37026e1bad40752666fbd6cad5b33 /src/mongo | |
parent | 6a08c23ebeab0e0434226cbc6c690b458e61d745 (diff) | |
download | mongo-d8afa17c615c274390899917d19180e374040f3e.tar.gz |
SERVER-60742 Maintain RecoveryUnit and storage resources across getMores for non-exchange aggregation operations
Diffstat (limited to 'src/mongo')
23 files changed, 239 insertions, 71 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index bb205d784f2..80fae774c96 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -98,6 +98,7 @@ ClientCursor::ClientCursor(ClientCursorParams params, _originatingPrivileges(std::move(params.originatingPrivileges)), _tailableMode(params.tailableMode), _isNoTimeout(params.isNoTimeout), + _stashedRecoveryUnit(std::move(params.recoveryUnit)), _exec(std::move(params.exec)), _operationUsingCursor(operationUsingCursor), _lastUseDate(now), @@ -124,12 +125,6 @@ ClientCursor::~ClientCursor() { invariant(!_operationUsingCursor); invariant(_disposed); - if (_stashedRecoveryUnit) { - // Now that the associated PlanExecutor is being destroyed, the recovery unit no longer - // needs to keep data pinned. - _stashedRecoveryUnit->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kAbort); - } - cursorStatsOpen.decrement(); if (isNoTimeout()) { cursorStatsOpenNoTimeout.decrement(); @@ -184,7 +179,13 @@ ClientCursorPin::ClientCursorPin(OperationContext* opCtx, invariant(_cursor); invariant(_cursor->_operationUsingCursor); invariant(!_cursor->_disposed); - _shouldSaveRecoveryUnit = _cursor->getExecutor()->isSaveRecoveryUnitAcrossCommandsEnabled(); + + // If the feature is enabled, we want to ensure that the RecoveryUnit is stashed on the + // ClientCursor when the ClientCursorPin is destroyed. If there is already a stashed recovery + // unit on the cursor at the time this pin is constructed, the caller may unstash it and + // re-stash it if they want to use the associated PlanExecutor. + _shouldSaveRecoveryUnit = !_cursor->_stashedRecoveryUnit && + _cursor->getExecutor()->isSaveRecoveryUnitAcrossCommandsEnabled(); // We keep track of the number of cursors currently pinned. The cursor can become unpinned // either by being released back to the cursor manager or by being deleted. A cursor may be @@ -254,7 +255,6 @@ void ClientCursorPin::release() { if (_shouldSaveRecoveryUnit) { stashResourcesFromOperationContext(); - _shouldSaveRecoveryUnit = false; } // Unpin the cursor. This must be done by calling into the cursor manager, since the cursor @@ -300,7 +300,11 @@ void ClientCursorPin::unstashResourcesOntoOperationContext() { invariant(_opCtx == _cursor->_operationUsingCursor); if (auto& ru = _cursor->_stashedRecoveryUnit) { + // If unstashResourcesOntoOperationContext() is called, the pin is responsible for + // re-stashing the resources onto the cursor, unless the caller decides to call + // stashResourcesFromOperationContext() directly. _shouldSaveRecoveryUnit = true; + invariant(!_opCtx->recoveryUnit()->isActive()); _opCtx->setRecoveryUnit(std::move(ru), WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); @@ -311,6 +315,7 @@ void ClientCursorPin::stashResourcesFromOperationContext() { // Move the recovery unit from the operation context onto the cursor and create a new RU for // the current OperationContext. _cursor->stashRecoveryUnit(_opCtx->releaseAndReplaceRecoveryUnit()); + _shouldSaveRecoveryUnit = false; } namespace { diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 0724c49d4dd..4be531d406d 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -57,6 +57,7 @@ class RecoveryUnit; */ struct ClientCursorParams { ClientCursorParams(std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor, + std::unique_ptr<RecoveryUnit> recoveryUnit, NamespaceString nss, UserNameIterator authenticatedUsersIter, APIParameters apiParameters, @@ -65,7 +66,8 @@ struct ClientCursorParams { ReadPreferenceSetting readPreferenceSetting, BSONObj originatingCommandObj, PrivilegeVector originatingPrivileges) - : exec(std::move(planExecutor)), + : recoveryUnit(std::move(recoveryUnit)), + exec(std::move(planExecutor)), nss(std::move(nss)), apiParameters(std::move(apiParameters)), writeConcernOptions(std::move(writeConcernOptions)), @@ -91,6 +93,7 @@ struct ClientCursorParams { tailableMode = newMode; } + std::unique_ptr<RecoveryUnit> recoveryUnit; std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; const NamespaceString nss; std::vector<UserName> authenticatedUsers; @@ -294,11 +297,8 @@ public: return _opKey; } - std::unique_ptr<RecoveryUnit> releaseStashedRecoveryUnit() { - return std::move(_stashedRecoveryUnit); - } - void stashRecoveryUnit(std::unique_ptr<RecoveryUnit> ru) { + invariant(!_stashedRecoveryUnit); _stashedRecoveryUnit = std::move(ru); } @@ -533,6 +533,10 @@ public: /** * Inverse of above: Transfers resources which need the same lifetime as the cursor from the * operation context to the cursor itself. + + * If this method is not called after unstashing the resources onto the OperationContext, the + * pin will re-stash the resources on destruction, unless the pin was moved from or + * deleteUnderlying() was called. */ void stashResourcesFromOperationContext(); @@ -547,6 +551,26 @@ private: bool _shouldSaveRecoveryUnit = false; }; +/** + * RAII type for unstashing resources from a pinned cursor onto the operation context. Generally a + * ClientCursorPin will handle the logic of re-stashing resources from the operation context on + * destruction. If there are multiple pinned cursors, however, only one cursor may have its + * resources unstashed at a time. In such cases, this class should be used. + */ +class MoveResourcesFromPinToOpCtxBlock { +public: + MoveResourcesFromPinToOpCtxBlock(ClientCursorPin* pin) : _pin(pin) { + _pin->unstashResourcesOntoOperationContext(); + } + + ~MoveResourcesFromPinToOpCtxBlock() { + _pin->stashResourcesFromOperationContext(); + } + +private: + ClientCursorPin* _pin; +}; + void startClientCursorMonitor(); } // namespace mongo diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 665a58c0850..d8878f71558 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -358,11 +358,13 @@ public: // Get the execution plan for the query. bool permitYield = true; + const bool allowMaintainValidCursorsAcrossCommands = true; auto exec = uassertStatusOK(getExecutorFind(opCtx, &collection, std::move(cq), nullptr /* extractAndAttachPipelineStages */, + allowMaintainValidCursorsAcrossCommands, permitYield)); auto bodyBuilder = result->getBodyBuilder(); @@ -558,19 +560,9 @@ public: &collection, std::move(cq), nullptr /* extractAndAttachPipelineStages */, + true, /* allow saving cursors across commands */ permitYield)); - // If the executor supports it, find operations will maintain the storage engine state - // across commands. - if (serverGlobalParams.featureCompatibility.isVersionInitialized() && - feature_flags::gYieldingSupportForSBE.isEnabled( - serverGlobalParams.featureCompatibility) && - !opCtx->inMultiDocumentTransaction() && - repl::ReadConcernArgs::get(opCtx).getLevel() != - repl::ReadConcernLevel::kSnapshotReadConcern) { - exec->enableSaveRecoveryUnitAcrossCommandsIfSupported(); - } - { stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanExplainer().getPlanSummary()); @@ -657,6 +649,9 @@ public: ClientCursorPin pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, {std::move(exec), + nullptr, // nullptr RecoveryUnit. The opCtx's RecoveryUnit will be stashed on + // Pin destruct if saving cursors across find/getMore commands is + // supported. nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), APIParameters::get(opCtx), diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index 5bdb655fcd4..694dfe04d4c 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -541,6 +541,7 @@ public: auto pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, {std::move(exec), + nullptr, /* recoveryUnit */ cursorNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), APIParameters::get(opCtx), diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 89fde689852..3a569a7cc7f 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -312,6 +312,7 @@ public: auto pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, {std::move(exec), + nullptr, /* recoveryUnit */ nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), APIParameters::get(opCtx), diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index ba8214138b8..ed18c05fd2f 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -118,40 +118,53 @@ bool canOptimizeAwayPipeline(const Pipeline* pipeline, * and thus will be different from that in 'request'. */ bool handleCursorCommand(OperationContext* opCtx, + bool stashResourceForGetMore, boost::intrusive_ptr<ExpressionContext> expCtx, const NamespaceString& nsForCursor, - std::vector<ClientCursor*> cursors, + std::vector<ClientCursorPin>& pins, const AggregateCommandRequest& request, const BSONObj& cmdObj, rpc::ReplyBuilderInterface* result) { - invariant(!cursors.empty()); + invariant(!pins.empty()); long long batchSize = request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize); - if (cursors.size() > 1) { + if (pins.size() > 1) { uassert( ErrorCodes::BadValue, "the exchange initial batch size must be zero", batchSize == 0); BSONArrayBuilder cursorsBuilder; - for (size_t idx = 0; idx < cursors.size(); ++idx) { - invariant(cursors[idx]); + for (size_t idx = 0; idx < pins.size(); ++idx) { + auto* cursor = pins[idx].getCursor(); + + // Each ClientCursorPin has its own stashed operation state associated with it, in the + // form of a RecoveryUnit. Since we may have many pins at once here, we cannot unstash + // that state on pin creation. Therefore, when we use a ClientCursorPin, we must fetch + // the Pin's resources onto the OperationContext and stash them away again on + // completion; then go onto the next pin, etc. + boost::optional<MoveResourcesFromPinToOpCtxBlock> unstashedResourceBlock; + if (stashResourceForGetMore) { + unstashedResourceBlock.emplace(&pins[idx]); + } + + invariant(cursor); BSONObjBuilder cursorResult; appendCursorResponseObject( - cursors[idx]->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult); + cursor->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult); cursorResult.appendBool("ok", 1); cursorsBuilder.append(cursorResult.obj()); // If a time limit was set on the pipeline, remaining time is "rolled over" to the // cursor (for use by future getmore ops). - cursors[idx]->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); + cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); // Cursor needs to be in a saved state while we yield locks for getmore. State // will be restored in getMore(). - cursors[idx]->getExecutor()->saveState(); - cursors[idx]->getExecutor()->detachFromOperationContext(); + cursor->getExecutor()->saveState(); + cursor->getExecutor()->detachFromOperationContext(); } auto bodyBuilder = result->getBodyBuilder(); @@ -168,7 +181,12 @@ bool handleCursorCommand(OperationContext* opCtx, CursorResponseBuilder responseBuilder(result, options); auto curOp = CurOp::get(opCtx); - auto cursor = cursors[0]; + auto cursor = pins[0].getCursor(); + boost::optional<MoveResourcesFromPinToOpCtxBlock> unstashedResourceBlock; + if (stashResourceForGetMore) { + unstashedResourceBlock.emplace(&pins[0]); + } + invariant(cursor); auto exec = cursor->getExecutor(); invariant(exec); @@ -573,8 +591,10 @@ Status runAggregate(OperationContext* opCtx, // re-running the expanded aggregation. boost::optional<AutoGetCollectionForReadCommandMaybeLockFree> ctx; - std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs; + std::vector<std::unique_ptr<RecoveryUnit>> recoveryUnits; + std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs; boost::intrusive_ptr<ExpressionContext> expCtx; + bool stashResourcesForGetMore = false; auto curOp = CurOp::get(opCtx); { // If we are in a transaction, check whether the parsed pipeline supports being in @@ -790,6 +810,11 @@ Status runAggregate(OperationContext* opCtx, auto attachExecutorCallback = PipelineD::buildInnerQueryExecutor(collection, nss, &request, pipeline.get()); + if (attachExecutorCallback.second) { + stashResourcesForGetMore = + attachExecutorCallback.second->isSaveRecoveryUnitAcrossCommandsEnabled(); + } + if (canOptimizeAwayPipeline(pipeline.get(), attachExecutorCallback.second.get(), request, @@ -801,6 +826,13 @@ Status runAggregate(OperationContext* opCtx, // PlanExecutor by itself. The resulting cursor will look like what the client would // have gotten from find command. execs.emplace_back(std::move(attachExecutorCallback.second)); + + if (stashResourcesForGetMore) { + // When the PlanExecutor is created, the cursors it sets up are tied to the + // RecoveryUnit. For that reason, we must save the recovery unit made when the + // executor was set up, and not re-use it for other executors. + recoveryUnits.push_back(opCtx->releaseAndReplaceRecoveryUnit()); + } } else { // Complete creation of the initial $cursor stage, if needed. PipelineD::attachInnerQueryExecutorToPipeline(collection, @@ -822,6 +854,19 @@ Status runAggregate(OperationContext* opCtx, request, liteParsedPipeline.hasChangeStream()))); } + if (request.getExchange()) { + // Exchange pipelines should never use the behavior of stashing the recovery unit + // across getMores. + invariant(!stashResourcesForGetMore); + } else if (stashResourcesForGetMore) { + // For non-exchange queries, there should only be one pipeline. + invariant(pipelines.size() == 1); + // When the PlanExecutor is created, the cursors it sets up are tied to the + // RecoveryUnit. For that reason, we must save the recovery unit made when the + // executor was set up, and not re-use it for other executors. + recoveryUnits.push_back(opCtx->releaseAndReplaceRecoveryUnit()); + } + // With the pipelines created, we can relinquish locks as they will manage the locks // internally further on. We still need to keep the lock for an optimized away pipeline // though, as we will be changing its lock policy to 'kLockExternally' (see details @@ -844,16 +889,24 @@ Status runAggregate(OperationContext* opCtx, // invalidations and kill notifications themselves, not the cursor we create here. std::vector<ClientCursorPin> pins; - std::vector<ClientCursor*> cursors; ScopeGuard cursorFreer([&] { for (auto& p : pins) { p.deleteUnderlying(); } }); + + size_t i = 0; + invariant(!stashResourcesForGetMore || recoveryUnits.size() == execs.size()); for (auto&& exec : execs) { + std::unique_ptr<RecoveryUnit> recoveryUnit; + if (stashResourcesForGetMore) { + recoveryUnit = std::move(recoveryUnits[i]); + } + ClientCursorParams cursorParams( std::move(exec), + std::move(recoveryUnit), origNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), APIParameters::get(opCtx), @@ -867,8 +920,8 @@ Status runAggregate(OperationContext* opCtx, auto pin = CursorManager::get(opCtx)->registerCursor(opCtx, std::move(cursorParams)); pin->incNBatches(); - cursors.emplace_back(pin.getCursor()); pins.emplace_back(std::move(pin)); + ++i; } // Report usage statistics for each stage in the pipeline. @@ -877,6 +930,12 @@ Status runAggregate(OperationContext* opCtx, // If both explain and cursor are specified, explain wins. if (expCtx->explain) { auto explainExecutor = pins[0]->getExecutor(); + + boost::optional<MoveResourcesFromPinToOpCtxBlock> unstashedResourceBlock; + if (stashResourcesForGetMore) { + unstashedResourceBlock.emplace(&pins[0]); + } + auto bodyBuilder = result->getBodyBuilder(); if (auto pipelineExec = dynamic_cast<PlanExecutorPipeline*>(explainExecutor)) { Explain::explainPipeline( @@ -898,7 +957,7 @@ Status runAggregate(OperationContext* opCtx, } else { // Cursor must be specified, if explain is not. const bool keepCursor = handleCursorCommand( - opCtx, expCtx, origNss, std::move(cursors), request, cmdObj, result); + opCtx, stashResourcesForGetMore, expCtx, origNss, pins, request, cmdObj, result); if (keepCursor) { cursorFreer.dismiss(); } diff --git a/src/mongo/db/db_raii_test.cpp b/src/mongo/db/db_raii_test.cpp index 076d8cbbe13..715381a7c35 100644 --- a/src/mongo/db/db_raii_test.cpp +++ b/src/mongo/db/db_raii_test.cpp @@ -95,6 +95,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makeTailableQueryPlan( &collection, std::move(cq), nullptr /* extractAndAttachPipelineStages */, + true, /* allow saving cursors across commands */ permitYield); ASSERT_OK(swExec.getStatus()); return std::move(swExec.getValue()); diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp index 5468c9a54e7..16224f39ea7 100644 --- a/src/mongo/db/exec/sbe_cmd.cpp +++ b/src/mongo/db/exec/sbe_cmd.cpp @@ -155,6 +155,7 @@ public: const auto pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, {std::move(exec), + nullptr, /* recoveryUnit */ nss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), APIParameters::get(opCtx), diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 3067074f008..8aed6178e78 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -259,6 +259,9 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe } auto permitYield = true; + // Maintaining valid cursors across commands is not supported for exchange aggregations. + const bool allowMaintainValidCursorsAcrossCommands = + !static_cast<bool>(aggRequest && aggRequest->getExchange()); return getExecutorFind(expCtx->opCtx, &collection, std::move(cq.getValue()), @@ -266,6 +269,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown( expCtx, collection, canonicalQuery, pipeline)); }, + allowMaintainValidCursorsAcrossCommands, permitYield, plannerOpts); } diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 6898ff287a5..7bce39af9f3 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -1270,6 +1270,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind const CollectionPtr* collection, std::unique_ptr<CanonicalQuery> canonicalQuery, std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, + bool allowMaintainValidCursorsAcrossCommands, bool permitYield, size_t plannerOptions) { auto yieldPolicy = (permitYield && !opCtx->inMultiDocumentTransaction()) @@ -1279,12 +1280,25 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind if (OperationShardingState::isOperationVersioned(opCtx)) { plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } - return getExecutor(opCtx, - collection, - std::move(canonicalQuery), - extractAndAttachPipelineStages, - yieldPolicy, - plannerOptions); + auto executor = getExecutor(opCtx, + collection, + std::move(canonicalQuery), + extractAndAttachPipelineStages, + yieldPolicy, + plannerOptions); + + // If the executor supports it and the operation is eligible, we will maintain the storage + // engine state across commands. + if (allowMaintainValidCursorsAcrossCommands && executor.isOK() && + serverGlobalParams.featureCompatibility.isVersionInitialized() && + feature_flags::gYieldingSupportForSBE.isEnabled(serverGlobalParams.featureCompatibility) && + !opCtx->inMultiDocumentTransaction() && + repl::ReadConcernArgs::get(opCtx).getLevel() != + repl::ReadConcernLevel::kSnapshotReadConcern) { + executor.getValue()->enableSaveRecoveryUnitAcrossCommandsIfSupported(); + } + + return executor; } namespace { diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 84d1ca7f82a..0f8c9bd4bb7 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -154,6 +154,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind const CollectionPtr* collection, std::unique_ptr<CanonicalQuery> canonicalQuery, std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages, + bool allowMaintainValidCursorsAcrossCommands = true, bool permitYield = false, size_t plannerOptions = QueryPlannerParams::DEFAULT); diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index 3b7a2db857f..3646464dde9 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -124,11 +124,12 @@ void PlanExecutorSBE::saveState() { if (_isSaveRecoveryUnitAcrossCommandsEnabled) { _root->saveState(false /* NOT relinquishing cursor */); - // Put the RU into 'kCommit' mode so that subsequent calls to abandonSnapshot() keep + // Ensure the RU is in 'kCommit' mode so that the following call to abandonSnapshot() keeps // cursors positioned. This ensures that no pointers into memory owned by the storage // engine held by the SBE PlanStage tree become invalid while the executor is in a saved // state. - _opCtx->recoveryUnit()->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kCommit); + invariant(_opCtx->recoveryUnit()->abandonSnapshotMode() == + RecoveryUnit::AbandonSnapshotMode::kCommit); _opCtx->recoveryUnit()->abandonSnapshot(); } else { _root->saveState(true /* relinquish cursor */); @@ -142,12 +143,9 @@ void PlanExecutorSBE::restoreState(const RestoreContext& context) { _yieldPolicy->setYieldable(context.collection()); if (_isSaveRecoveryUnitAcrossCommandsEnabled) { + invariant(_opCtx->recoveryUnit()->abandonSnapshotMode() == + RecoveryUnit::AbandonSnapshotMode::kCommit); _root->restoreState(false /* NOT relinquishing cursor */); - - // Put the RU back into 'kAbort' mode. Since the executor is now in a restored state, calls - // to doAbandonSnapshot() only happen if the query has failed and the executor will not be - // used again. In this case, we do not rely on the guarantees provided by 'kCommit' mode. - _opCtx->recoveryUnit()->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kAbort); } else { _root->restoreState(true /* relinquish cursor */); } @@ -395,4 +393,17 @@ sbe::PlanState fetchNext(sbe::PlanStage* root, return state; } +void PlanExecutorSBE::enableSaveRecoveryUnitAcrossCommandsIfSupported() { + // If we enable saving the recovery unit across commands, we must put the RecoveryUnit into a + // state where calls to abandonSnapshot() result in a transaction commit, instead of abort. The + // transaction commit guarantees that all open cursors will remain positioned and valid. + // + // To put the RecoveryUnit in this state, we bump a reference counter on it which tracks how + // many callers require that it be in commit mode. Note that under one RecoveryUnit there may + // be multiple PlanExecutors which require cursors to remain valid across abandonSnapshot() + // calls (for example, a pipeline containing $lookup). + _recoveryUnitCommitModeBlock.emplace(_opCtx->recoveryUnit()); + _isSaveRecoveryUnitAcrossCommandsEnabled = true; +} + } // namespace mongo diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h index a0f7a62f57b..4fadc99ebe1 100644 --- a/src/mongo/db/query/plan_executor_sbe.h +++ b/src/mongo/db/query/plan_executor_sbe.h @@ -50,6 +50,7 @@ public: NamespaceString nss, bool isOpen, std::unique_ptr<PlanYieldPolicySBE> yieldPolicy); + ~PlanExecutorSBE() {} CanonicalQuery* getCanonicalQuery() const override { return _cq.get(); @@ -131,9 +132,7 @@ public: return *_planExplainer; } - void enableSaveRecoveryUnitAcrossCommandsIfSupported() override { - _isSaveRecoveryUnitAcrossCommandsEnabled = true; - } + void enableSaveRecoveryUnitAcrossCommandsIfSupported() override; bool isSaveRecoveryUnitAcrossCommandsEnabled() const override { return _isSaveRecoveryUnitAcrossCommandsEnabled; } @@ -179,6 +178,10 @@ private: bool _isDisposed{false}; bool _isSaveRecoveryUnitAcrossCommandsEnabled = false; + // If engaged, forces the recovery unit to commit instead of abort on calls to + // abandonSnapshot(). This field is only used when '_isSaveRecoveryUnitAcrossCommandsEnabled' + // is true. + boost::optional<AbandonSnapshotCommitModeBlock> _recoveryUnitCommitModeBlock; }; /** diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index 58064f76d6e..3121e2a07e7 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -105,13 +105,11 @@ Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx, // flag for the duration of yield will force any calls to abandonSnapshot() to // commit the transaction, rather than abort it, in order to leave the cursors // valid. - opCtx->recoveryUnit()->setAbandonSnapshotMode( - RecoveryUnit::AbandonSnapshotMode::kCommit); + opCtx->recoveryUnit()->incAbandonSnapshotCommitModeCount(); exitGuard.emplace([&] { invariant(opCtx->recoveryUnit()->abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kCommit); - opCtx->recoveryUnit()->setAbandonSnapshotMode( - RecoveryUnit::AbandonSnapshotMode::kAbort); + opCtx->recoveryUnit()->decAbandonSnapshotCommitModeCount(); }); } diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp index 307c6e79078..21658dc9627 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp @@ -113,7 +113,7 @@ void RecoveryUnit::prepareUnitOfWork() { void RecoveryUnit::doAbandonSnapshot() { invariant(!_inUnitOfWork(), toString(_getState())); - if (_abandonSnapshotMode == RecoveryUnit::AbandonSnapshotMode::kCommit) { + if (abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kCommit) { invariant(!_dirty); // Cannot commit written data outside WUOW. } diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp index 93781b84ed7..b43dfa6a3b1 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp @@ -87,7 +87,7 @@ public: TEST_F(EphemeralForTestRecoveryUnitTestHarness, AbandonSnapshotAbortMode) { Lock::GlobalLock globalLk(opCtx.get(), MODE_IX); - ru->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kAbort); + ASSERT(ru->abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kAbort); const auto rs = harnessHelper->createRecordStore(opCtx.get(), "table1"); opCtx->lockState()->beginWriteUnitOfWork(); diff --git a/src/mongo/db/storage/recovery_unit.cpp b/src/mongo/db/storage/recovery_unit.cpp index a913d99cc56..fbdcdaa920f 100644 --- a/src/mongo/db/storage/recovery_unit.cpp +++ b/src/mongo/db/storage/recovery_unit.cpp @@ -50,6 +50,15 @@ RecoveryUnit::RecoveryUnit() { assignNextSnapshotId(); } +void RecoveryUnit::incAbandonSnapshotCommitModeCount() { + ++_abandonSnapshotCommitModeCounter; +} + +void RecoveryUnit::decAbandonSnapshotCommitModeCount() { + invariant(_abandonSnapshotCommitModeCounter > 0); + --_abandonSnapshotCommitModeCounter; +} + void RecoveryUnit::assignNextSnapshotId() { _mySnapshotId = nextSnapshotId.fetchAndAdd(1); } diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index e53429cb490..2da225bb107 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -233,14 +233,25 @@ public: assignNextSnapshotId(); } - void setAbandonSnapshotMode(AbandonSnapshotMode mode) { - _abandonSnapshotMode = mode; - } AbandonSnapshotMode abandonSnapshotMode() const { - return _abandonSnapshotMode; + return (_abandonSnapshotCommitModeCounter > 0) ? AbandonSnapshotMode::kCommit + : AbandonSnapshotMode::kAbort; } /** + * Methods for modifying the 'abandonSnapshotModeCount.' When the count is greater than 0, the + * RecoveryUnit is in 'commit' mode, and calls to abandonSnapshot() will result in a transaction + * commit, preserving cursor positions. + * + * If the count is 0, the RecoveryUnit is in 'abort' mode and calls to abandonSnapshot() will + * result in a transaction abort. + * + * The count may not go below zero. + */ + void incAbandonSnapshotCommitModeCount(); + void decAbandonSnapshotCommitModeCount(); + + /** * Informs the RecoveryUnit that a snapshot will be needed soon, if one was not already * established. This specifically allows the storage engine to preallocate any required * transaction resources while minimizing the critical section between generating a new @@ -800,8 +811,6 @@ protected: bool _noEvictionAfterRollback = false; - AbandonSnapshotMode _abandonSnapshotMode = AbandonSnapshotMode::kAbort; - private: // Sets the snapshot associated with this RecoveryUnit to a new globally unique id number. void assignNextSnapshotId(); @@ -819,6 +828,10 @@ private: std::unique_ptr<Change> _changeForCatalogVisibility; State _state = State::kInactive; uint64_t _mySnapshotId; + + // Causes transactions to be committed instead of aborted by abandonSnapshot() if set higher + // than 0. + int _abandonSnapshotCommitModeCounter = 0; }; /** @@ -841,4 +854,19 @@ private: RecoveryUnit* const _recoveryUnit; }; +class AbandonSnapshotCommitModeBlock final { + AbandonSnapshotCommitModeBlock(const AbandonSnapshotCommitModeBlock&) = delete; + AbandonSnapshotCommitModeBlock& operator=(const AbandonSnapshotCommitModeBlock&) = delete; + +public: + AbandonSnapshotCommitModeBlock(RecoveryUnit* ru) : _recoveryUnit(ru) { + ru->incAbandonSnapshotCommitModeCount(); + } + ~AbandonSnapshotCommitModeBlock() { + _recoveryUnit->decAbandonSnapshotCommitModeCount(); + } + +private: + RecoveryUnit* _recoveryUnit; +}; } // namespace mongo diff --git a/src/mongo/db/storage/recovery_unit_test_harness.cpp b/src/mongo/db/storage/recovery_unit_test_harness.cpp index 0c4cd3b1739..478e74ec710 100644 --- a/src/mongo/db/storage/recovery_unit_test_harness.cpp +++ b/src/mongo/db/storage/recovery_unit_test_harness.cpp @@ -191,7 +191,8 @@ TEST_F(RecoveryUnitTestHarness, AbortUnitOfWorkIncrementsSnapshotId) { TEST_F(RecoveryUnitTestHarness, AbandonSnapshotCommitMode) { Lock::GlobalLock globalLk(opCtx.get(), MODE_IX); - ru->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kCommit); + ru->incAbandonSnapshotCommitModeCount(); + ASSERT(ru->abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kCommit); const auto rs = harnessHelper->createRecordStore(opCtx.get(), "table1"); opCtx->lockState()->beginWriteUnitOfWork(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 176ad6694af..19b40e4403a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -352,7 +352,7 @@ void WiredTigerRecoveryUnit::doAbandonSnapshot() { if (_isActive()) { // Can't be in a WriteUnitOfWork, so safe to rollback if the AbandonSnapshotMode is // kAbort. If kCommit, however, then any active cursors will remain positioned and valid. - _txnClose(_abandonSnapshotMode == AbandonSnapshotMode::kCommit /* commit */); + _txnClose(abandonSnapshotMode() == AbandonSnapshotMode::kCommit /* commit */); } _setState(State::kInactive); } @@ -379,7 +379,7 @@ void WiredTigerRecoveryUnit::refreshSnapshot() { invariant(_isActive()); invariant(!_inUnitOfWork()); invariant(!_noEvictionAfterRollback); - invariant(_abandonSnapshotMode == AbandonSnapshotMode::kAbort); + invariant(abandonSnapshotMode() == AbandonSnapshotMode::kAbort); auto newSession = _sessionCache->getSession(); WiredTigerBeginTxnBlock txnOpen(newSession->getSession(), @@ -493,7 +493,6 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) { LOGV2_DEBUG( 22412, 3, "WT commit_transaction", "snapshotId"_attr = getSnapshotId().toNumber()); } else { - invariant(_abandonSnapshotMode == AbandonSnapshotMode::kAbort); StringBuilder config; if (_noEvictionAfterRollback) { // The only point at which rollback_transaction() can time out is in the bonus-eviction diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp index 07480a1ce6a..db277ca22b2 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp @@ -816,7 +816,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, MultiTimestampConstraintsInternalState } TEST_F(WiredTigerRecoveryUnitTestFixture, AbandonSnapshotAbortMode) { - ru1->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kAbort); + ASSERT(ru1->abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kAbort); OperationContext* opCtx = clientAndCtx1.second.get(); const char* const key = "key"; diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp index b01259efd5e..be5acc542cd 100644 --- a/src/mongo/db/transaction_history_iterator.cpp +++ b/src/mongo/db/transaction_history_iterator.cpp @@ -90,6 +90,7 @@ BSONObj findOneOplogEntry(OperationContext* opCtx, &oplogRead.getCollection(), std::move(cq), nullptr /*extractAndAttachPipelineStages */, + true, /* allow saving cursors across commands */ permitYield)); PlanExecutor::ExecState getNextResult; diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp index 1cc8ee1f535..142642b88d9 100644 --- a/src/mongo/dbtests/cursor_manager_test.cpp +++ b/src/mongo/dbtests/cursor_manager_test.cpp @@ -100,6 +100,7 @@ public: ClientCursorParams makeParams(OperationContext* opCtx) { return { makeFakePlanExecutor(opCtx), + nullptr, /* recoveryUnit */ kTestNss, {}, APIParameters(), @@ -152,6 +153,7 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursor) { auto cursorPin = cursorManager->registerCursor( pinningOpCtx, {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ kTestNss, {}, APIParameters(), @@ -179,6 +181,7 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursorMultiClient) { auto cursorPin = cursorManager->registerCursor( pinningOpCtx, {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ kTestNss, {}, APIParameters(), @@ -217,6 +220,7 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { cursorManager->registerCursor(_opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ NamespaceString{"test.collection"}, {}, APIParameters(), @@ -234,6 +238,7 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { cursorManager->registerCursor(_opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ NamespaceString{"test.collection"}, {}, APIParameters(), @@ -256,6 +261,7 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) { auto cursorPin = cursorManager->registerCursor( _opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ NamespaceString{"test.collection"}, {}, APIParameters(), @@ -282,6 +288,7 @@ TEST_F(CursorManagerTest, MarkedAsKilledCursorsShouldBeDeletedOnCursorPin) { auto cursorPin = cursorManager->registerCursor( _opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ NamespaceString{"test.collection"}, {}, APIParameters(), @@ -317,6 +324,7 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) { auto cursorPin = cursorManager->registerCursor( _opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ NamespaceString{"test.collection"}, {}, APIParameters(), @@ -351,6 +359,7 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) { auto cursorPin = cursorManager->registerCursor( _opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ kTestNss, {}, APIParameters(), @@ -366,6 +375,7 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) { // schedule. cursorManager->registerCursor(_opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ kTestNss, {}, APIParameters(), @@ -405,6 +415,7 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing auto cursorPin = cursorManager->registerCursor( _opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ kTestNss, {}, APIParameters(), @@ -448,6 +459,7 @@ TEST_F(CursorManagerTest, CursorStoresAPIParameters) { auto cursorPin = cursorManager->registerCursor( _opCtx.get(), {makeFakePlanExecutor(), + nullptr, /* recoveryUnit */ kTestNss, {}, apiParams, |