summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorIan Boros <ian.boros@mongodb.com>2022-01-19 21:21:53 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-19 23:03:19 +0000
commitd8afa17c615c274390899917d19180e374040f3e (patch)
treee9efc40d03b37026e1bad40752666fbd6cad5b33 /src/mongo
parent6a08c23ebeab0e0434226cbc6c690b458e61d745 (diff)
downloadmongo-d8afa17c615c274390899917d19180e374040f3e.tar.gz
SERVER-60742 Maintain RecoveryUnit and storage resources across getMores for non-exchange aggregation operations
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/clientcursor.cpp21
-rw-r--r--src/mongo/db/clientcursor.h34
-rw-r--r--src/mongo/db/commands/find_cmd.cpp17
-rw-r--r--src/mongo/db/commands/list_collections.cpp1
-rw-r--r--src/mongo/db/commands/list_indexes.cpp1
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp87
-rw-r--r--src/mongo/db/db_raii_test.cpp1
-rw-r--r--src/mongo/db/exec/sbe_cmd.cpp1
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp4
-rw-r--r--src/mongo/db/query/get_executor.cpp26
-rw-r--r--src/mongo/db/query/get_executor.h1
-rw-r--r--src/mongo/db/query/plan_executor_sbe.cpp25
-rw-r--r--src/mongo/db/query/plan_executor_sbe.h9
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp6
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp2
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp2
-rw-r--r--src/mongo/db/storage/recovery_unit.cpp9
-rw-r--r--src/mongo/db/storage/recovery_unit.h40
-rw-r--r--src/mongo/db/storage/recovery_unit_test_harness.cpp3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp2
-rw-r--r--src/mongo/db/transaction_history_iterator.cpp1
-rw-r--r--src/mongo/dbtests/cursor_manager_test.cpp12
23 files changed, 239 insertions, 71 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index bb205d784f2..80fae774c96 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -98,6 +98,7 @@ ClientCursor::ClientCursor(ClientCursorParams params,
_originatingPrivileges(std::move(params.originatingPrivileges)),
_tailableMode(params.tailableMode),
_isNoTimeout(params.isNoTimeout),
+ _stashedRecoveryUnit(std::move(params.recoveryUnit)),
_exec(std::move(params.exec)),
_operationUsingCursor(operationUsingCursor),
_lastUseDate(now),
@@ -124,12 +125,6 @@ ClientCursor::~ClientCursor() {
invariant(!_operationUsingCursor);
invariant(_disposed);
- if (_stashedRecoveryUnit) {
- // Now that the associated PlanExecutor is being destroyed, the recovery unit no longer
- // needs to keep data pinned.
- _stashedRecoveryUnit->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kAbort);
- }
-
cursorStatsOpen.decrement();
if (isNoTimeout()) {
cursorStatsOpenNoTimeout.decrement();
@@ -184,7 +179,13 @@ ClientCursorPin::ClientCursorPin(OperationContext* opCtx,
invariant(_cursor);
invariant(_cursor->_operationUsingCursor);
invariant(!_cursor->_disposed);
- _shouldSaveRecoveryUnit = _cursor->getExecutor()->isSaveRecoveryUnitAcrossCommandsEnabled();
+
+ // If the feature is enabled, we want to ensure that the RecoveryUnit is stashed on the
+ // ClientCursor when the ClientCursorPin is destroyed. If there is already a stashed recovery
+ // unit on the cursor at the time this pin is constructed, the caller may unstash it and
+ // re-stash it if they want to use the associated PlanExecutor.
+ _shouldSaveRecoveryUnit = !_cursor->_stashedRecoveryUnit &&
+ _cursor->getExecutor()->isSaveRecoveryUnitAcrossCommandsEnabled();
// We keep track of the number of cursors currently pinned. The cursor can become unpinned
// either by being released back to the cursor manager or by being deleted. A cursor may be
@@ -254,7 +255,6 @@ void ClientCursorPin::release() {
if (_shouldSaveRecoveryUnit) {
stashResourcesFromOperationContext();
- _shouldSaveRecoveryUnit = false;
}
// Unpin the cursor. This must be done by calling into the cursor manager, since the cursor
@@ -300,7 +300,11 @@ void ClientCursorPin::unstashResourcesOntoOperationContext() {
invariant(_opCtx == _cursor->_operationUsingCursor);
if (auto& ru = _cursor->_stashedRecoveryUnit) {
+ // If unstashResourcesOntoOperationContext() is called, the pin is responsible for
+ // re-stashing the resources onto the cursor, unless the caller decides to call
+ // stashResourcesFromOperationContext() directly.
_shouldSaveRecoveryUnit = true;
+
invariant(!_opCtx->recoveryUnit()->isActive());
_opCtx->setRecoveryUnit(std::move(ru),
WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
@@ -311,6 +315,7 @@ void ClientCursorPin::stashResourcesFromOperationContext() {
// Move the recovery unit from the operation context onto the cursor and create a new RU for
// the current OperationContext.
_cursor->stashRecoveryUnit(_opCtx->releaseAndReplaceRecoveryUnit());
+ _shouldSaveRecoveryUnit = false;
}
namespace {
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 0724c49d4dd..4be531d406d 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -57,6 +57,7 @@ class RecoveryUnit;
*/
struct ClientCursorParams {
ClientCursorParams(std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor,
+ std::unique_ptr<RecoveryUnit> recoveryUnit,
NamespaceString nss,
UserNameIterator authenticatedUsersIter,
APIParameters apiParameters,
@@ -65,7 +66,8 @@ struct ClientCursorParams {
ReadPreferenceSetting readPreferenceSetting,
BSONObj originatingCommandObj,
PrivilegeVector originatingPrivileges)
- : exec(std::move(planExecutor)),
+ : recoveryUnit(std::move(recoveryUnit)),
+ exec(std::move(planExecutor)),
nss(std::move(nss)),
apiParameters(std::move(apiParameters)),
writeConcernOptions(std::move(writeConcernOptions)),
@@ -91,6 +93,7 @@ struct ClientCursorParams {
tailableMode = newMode;
}
+ std::unique_ptr<RecoveryUnit> recoveryUnit;
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
const NamespaceString nss;
std::vector<UserName> authenticatedUsers;
@@ -294,11 +297,8 @@ public:
return _opKey;
}
- std::unique_ptr<RecoveryUnit> releaseStashedRecoveryUnit() {
- return std::move(_stashedRecoveryUnit);
- }
-
void stashRecoveryUnit(std::unique_ptr<RecoveryUnit> ru) {
+ invariant(!_stashedRecoveryUnit);
_stashedRecoveryUnit = std::move(ru);
}
@@ -533,6 +533,10 @@ public:
/**
* Inverse of above: Transfers resources which need the same lifetime as the cursor from the
* operation context to the cursor itself.
+
+ * If this method is not called after unstashing the resources onto the OperationContext, the
+ * pin will re-stash the resources on destruction, unless the pin was moved from or
+ * deleteUnderlying() was called.
*/
void stashResourcesFromOperationContext();
@@ -547,6 +551,26 @@ private:
bool _shouldSaveRecoveryUnit = false;
};
+/**
+ * RAII type for unstashing resources from a pinned cursor onto the operation context. Generally a
+ * ClientCursorPin will handle the logic of re-stashing resources from the operation context on
+ * destruction. If there are multiple pinned cursors, however, only one cursor may have its
+ * resources unstashed at a time. In such cases, this class should be used.
+ */
+class MoveResourcesFromPinToOpCtxBlock {
+public:
+ MoveResourcesFromPinToOpCtxBlock(ClientCursorPin* pin) : _pin(pin) {
+ _pin->unstashResourcesOntoOperationContext();
+ }
+
+ ~MoveResourcesFromPinToOpCtxBlock() {
+ _pin->stashResourcesFromOperationContext();
+ }
+
+private:
+ ClientCursorPin* _pin;
+};
+
void startClientCursorMonitor();
} // namespace mongo
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 665a58c0850..d8878f71558 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -358,11 +358,13 @@ public:
// Get the execution plan for the query.
bool permitYield = true;
+ const bool allowMaintainValidCursorsAcrossCommands = true;
auto exec =
uassertStatusOK(getExecutorFind(opCtx,
&collection,
std::move(cq),
nullptr /* extractAndAttachPipelineStages */,
+ allowMaintainValidCursorsAcrossCommands,
permitYield));
auto bodyBuilder = result->getBodyBuilder();
@@ -558,19 +560,9 @@ public:
&collection,
std::move(cq),
nullptr /* extractAndAttachPipelineStages */,
+ true, /* allow saving cursors across commands */
permitYield));
- // If the executor supports it, find operations will maintain the storage engine state
- // across commands.
- if (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- feature_flags::gYieldingSupportForSBE.isEnabled(
- serverGlobalParams.featureCompatibility) &&
- !opCtx->inMultiDocumentTransaction() &&
- repl::ReadConcernArgs::get(opCtx).getLevel() !=
- repl::ReadConcernLevel::kSnapshotReadConcern) {
- exec->enableSaveRecoveryUnitAcrossCommandsIfSupported();
- }
-
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanExplainer().getPlanSummary());
@@ -657,6 +649,9 @@ public:
ClientCursorPin pinnedCursor = CursorManager::get(opCtx)->registerCursor(
opCtx,
{std::move(exec),
+ nullptr, // nullptr RecoveryUnit. The opCtx's RecoveryUnit will be stashed on
+ // Pin destruct if saving cursors across find/getMore commands is
+ // supported.
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
APIParameters::get(opCtx),
diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp
index 5bdb655fcd4..694dfe04d4c 100644
--- a/src/mongo/db/commands/list_collections.cpp
+++ b/src/mongo/db/commands/list_collections.cpp
@@ -541,6 +541,7 @@ public:
auto pinnedCursor = CursorManager::get(opCtx)->registerCursor(
opCtx,
{std::move(exec),
+ nullptr, /* recoveryUnit */
cursorNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
APIParameters::get(opCtx),
diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp
index 89fde689852..3a569a7cc7f 100644
--- a/src/mongo/db/commands/list_indexes.cpp
+++ b/src/mongo/db/commands/list_indexes.cpp
@@ -312,6 +312,7 @@ public:
auto pinnedCursor = CursorManager::get(opCtx)->registerCursor(
opCtx,
{std::move(exec),
+ nullptr, /* recoveryUnit */
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
APIParameters::get(opCtx),
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index ba8214138b8..ed18c05fd2f 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -118,40 +118,53 @@ bool canOptimizeAwayPipeline(const Pipeline* pipeline,
* and thus will be different from that in 'request'.
*/
bool handleCursorCommand(OperationContext* opCtx,
+ bool stashResourceForGetMore,
boost::intrusive_ptr<ExpressionContext> expCtx,
const NamespaceString& nsForCursor,
- std::vector<ClientCursor*> cursors,
+ std::vector<ClientCursorPin>& pins,
const AggregateCommandRequest& request,
const BSONObj& cmdObj,
rpc::ReplyBuilderInterface* result) {
- invariant(!cursors.empty());
+ invariant(!pins.empty());
long long batchSize =
request.getCursor().getBatchSize().value_or(aggregation_request_helper::kDefaultBatchSize);
- if (cursors.size() > 1) {
+ if (pins.size() > 1) {
uassert(
ErrorCodes::BadValue, "the exchange initial batch size must be zero", batchSize == 0);
BSONArrayBuilder cursorsBuilder;
- for (size_t idx = 0; idx < cursors.size(); ++idx) {
- invariant(cursors[idx]);
+ for (size_t idx = 0; idx < pins.size(); ++idx) {
+ auto* cursor = pins[idx].getCursor();
+
+ // Each ClientCursorPin has its own stashed operation state associated with it, in the
+ // form of a RecoveryUnit. Since we may have many pins at once here, we cannot unstash
+ // that state on pin creation. Therefore, when we use a ClientCursorPin, we must fetch
+ // the Pin's resources onto the OperationContext and stash them away again on
+ // completion; then go onto the next pin, etc.
+ boost::optional<MoveResourcesFromPinToOpCtxBlock> unstashedResourceBlock;
+ if (stashResourceForGetMore) {
+ unstashedResourceBlock.emplace(&pins[idx]);
+ }
+
+ invariant(cursor);
BSONObjBuilder cursorResult;
appendCursorResponseObject(
- cursors[idx]->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult);
+ cursor->cursorid(), nsForCursor.ns(), BSONArray(), &cursorResult);
cursorResult.appendBool("ok", 1);
cursorsBuilder.append(cursorResult.obj());
// If a time limit was set on the pipeline, remaining time is "rolled over" to the
// cursor (for use by future getmore ops).
- cursors[idx]->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
+ cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
// Cursor needs to be in a saved state while we yield locks for getmore. State
// will be restored in getMore().
- cursors[idx]->getExecutor()->saveState();
- cursors[idx]->getExecutor()->detachFromOperationContext();
+ cursor->getExecutor()->saveState();
+ cursor->getExecutor()->detachFromOperationContext();
}
auto bodyBuilder = result->getBodyBuilder();
@@ -168,7 +181,12 @@ bool handleCursorCommand(OperationContext* opCtx,
CursorResponseBuilder responseBuilder(result, options);
auto curOp = CurOp::get(opCtx);
- auto cursor = cursors[0];
+ auto cursor = pins[0].getCursor();
+ boost::optional<MoveResourcesFromPinToOpCtxBlock> unstashedResourceBlock;
+ if (stashResourceForGetMore) {
+ unstashedResourceBlock.emplace(&pins[0]);
+ }
+
invariant(cursor);
auto exec = cursor->getExecutor();
invariant(exec);
@@ -573,8 +591,10 @@ Status runAggregate(OperationContext* opCtx,
// re-running the expanded aggregation.
boost::optional<AutoGetCollectionForReadCommandMaybeLockFree> ctx;
- std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs;
+ std::vector<std::unique_ptr<RecoveryUnit>> recoveryUnits;
+ std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs;
boost::intrusive_ptr<ExpressionContext> expCtx;
+ bool stashResourcesForGetMore = false;
auto curOp = CurOp::get(opCtx);
{
// If we are in a transaction, check whether the parsed pipeline supports being in
@@ -790,6 +810,11 @@ Status runAggregate(OperationContext* opCtx,
auto attachExecutorCallback =
PipelineD::buildInnerQueryExecutor(collection, nss, &request, pipeline.get());
+ if (attachExecutorCallback.second) {
+ stashResourcesForGetMore =
+ attachExecutorCallback.second->isSaveRecoveryUnitAcrossCommandsEnabled();
+ }
+
if (canOptimizeAwayPipeline(pipeline.get(),
attachExecutorCallback.second.get(),
request,
@@ -801,6 +826,13 @@ Status runAggregate(OperationContext* opCtx,
// PlanExecutor by itself. The resulting cursor will look like what the client would
// have gotten from find command.
execs.emplace_back(std::move(attachExecutorCallback.second));
+
+ if (stashResourcesForGetMore) {
+ // When the PlanExecutor is created, the cursors it sets up are tied to the
+ // RecoveryUnit. For that reason, we must save the recovery unit made when the
+ // executor was set up, and not re-use it for other executors.
+ recoveryUnits.push_back(opCtx->releaseAndReplaceRecoveryUnit());
+ }
} else {
// Complete creation of the initial $cursor stage, if needed.
PipelineD::attachInnerQueryExecutorToPipeline(collection,
@@ -822,6 +854,19 @@ Status runAggregate(OperationContext* opCtx,
request, liteParsedPipeline.hasChangeStream())));
}
+ if (request.getExchange()) {
+ // Exchange pipelines should never use the behavior of stashing the recovery unit
+ // across getMores.
+ invariant(!stashResourcesForGetMore);
+ } else if (stashResourcesForGetMore) {
+ // For non-exchange queries, there should only be one pipeline.
+ invariant(pipelines.size() == 1);
+ // When the PlanExecutor is created, the cursors it sets up are tied to the
+ // RecoveryUnit. For that reason, we must save the recovery unit made when the
+ // executor was set up, and not re-use it for other executors.
+ recoveryUnits.push_back(opCtx->releaseAndReplaceRecoveryUnit());
+ }
+
// With the pipelines created, we can relinquish locks as they will manage the locks
// internally further on. We still need to keep the lock for an optimized away pipeline
// though, as we will be changing its lock policy to 'kLockExternally' (see details
@@ -844,16 +889,24 @@ Status runAggregate(OperationContext* opCtx,
// invalidations and kill notifications themselves, not the cursor we create here.
std::vector<ClientCursorPin> pins;
- std::vector<ClientCursor*> cursors;
ScopeGuard cursorFreer([&] {
for (auto& p : pins) {
p.deleteUnderlying();
}
});
+
+ size_t i = 0;
+ invariant(!stashResourcesForGetMore || recoveryUnits.size() == execs.size());
for (auto&& exec : execs) {
+ std::unique_ptr<RecoveryUnit> recoveryUnit;
+ if (stashResourcesForGetMore) {
+ recoveryUnit = std::move(recoveryUnits[i]);
+ }
+
ClientCursorParams cursorParams(
std::move(exec),
+ std::move(recoveryUnit),
origNss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
APIParameters::get(opCtx),
@@ -867,8 +920,8 @@ Status runAggregate(OperationContext* opCtx,
auto pin = CursorManager::get(opCtx)->registerCursor(opCtx, std::move(cursorParams));
pin->incNBatches();
- cursors.emplace_back(pin.getCursor());
pins.emplace_back(std::move(pin));
+ ++i;
}
// Report usage statistics for each stage in the pipeline.
@@ -877,6 +930,12 @@ Status runAggregate(OperationContext* opCtx,
// If both explain and cursor are specified, explain wins.
if (expCtx->explain) {
auto explainExecutor = pins[0]->getExecutor();
+
+ boost::optional<MoveResourcesFromPinToOpCtxBlock> unstashedResourceBlock;
+ if (stashResourcesForGetMore) {
+ unstashedResourceBlock.emplace(&pins[0]);
+ }
+
auto bodyBuilder = result->getBodyBuilder();
if (auto pipelineExec = dynamic_cast<PlanExecutorPipeline*>(explainExecutor)) {
Explain::explainPipeline(
@@ -898,7 +957,7 @@ Status runAggregate(OperationContext* opCtx,
} else {
// Cursor must be specified, if explain is not.
const bool keepCursor = handleCursorCommand(
- opCtx, expCtx, origNss, std::move(cursors), request, cmdObj, result);
+ opCtx, stashResourcesForGetMore, expCtx, origNss, pins, request, cmdObj, result);
if (keepCursor) {
cursorFreer.dismiss();
}
diff --git a/src/mongo/db/db_raii_test.cpp b/src/mongo/db/db_raii_test.cpp
index 076d8cbbe13..715381a7c35 100644
--- a/src/mongo/db/db_raii_test.cpp
+++ b/src/mongo/db/db_raii_test.cpp
@@ -95,6 +95,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makeTailableQueryPlan(
&collection,
std::move(cq),
nullptr /* extractAndAttachPipelineStages */,
+ true, /* allow saving cursors across commands */
permitYield);
ASSERT_OK(swExec.getStatus());
return std::move(swExec.getValue());
diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp
index 5468c9a54e7..16224f39ea7 100644
--- a/src/mongo/db/exec/sbe_cmd.cpp
+++ b/src/mongo/db/exec/sbe_cmd.cpp
@@ -155,6 +155,7 @@ public:
const auto pinnedCursor = CursorManager::get(opCtx)->registerCursor(
opCtx,
{std::move(exec),
+ nullptr, /* recoveryUnit */
nss,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
APIParameters::get(opCtx),
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 3067074f008..8aed6178e78 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -259,6 +259,9 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
}
auto permitYield = true;
+ // Maintaining valid cursors across commands is not supported for exchange aggregations.
+ const bool allowMaintainValidCursorsAcrossCommands =
+ !static_cast<bool>(aggRequest && aggRequest->getExchange());
return getExecutorFind(expCtx->opCtx,
&collection,
std::move(cq.getValue()),
@@ -266,6 +269,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
canonicalQuery->setPipeline(extractSbeCompatibleGroupsForPushdown(
expCtx, collection, canonicalQuery, pipeline));
},
+ allowMaintainValidCursorsAcrossCommands,
permitYield,
plannerOpts);
}
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 6898ff287a5..7bce39af9f3 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -1270,6 +1270,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
const CollectionPtr* collection,
std::unique_ptr<CanonicalQuery> canonicalQuery,
std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages,
+ bool allowMaintainValidCursorsAcrossCommands,
bool permitYield,
size_t plannerOptions) {
auto yieldPolicy = (permitYield && !opCtx->inMultiDocumentTransaction())
@@ -1279,12 +1280,25 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
if (OperationShardingState::isOperationVersioned(opCtx)) {
plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
- return getExecutor(opCtx,
- collection,
- std::move(canonicalQuery),
- extractAndAttachPipelineStages,
- yieldPolicy,
- plannerOptions);
+ auto executor = getExecutor(opCtx,
+ collection,
+ std::move(canonicalQuery),
+ extractAndAttachPipelineStages,
+ yieldPolicy,
+ plannerOptions);
+
+ // If the executor supports it and the operation is eligible, we will maintain the storage
+ // engine state across commands.
+ if (allowMaintainValidCursorsAcrossCommands && executor.isOK() &&
+ serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ feature_flags::gYieldingSupportForSBE.isEnabled(serverGlobalParams.featureCompatibility) &&
+ !opCtx->inMultiDocumentTransaction() &&
+ repl::ReadConcernArgs::get(opCtx).getLevel() !=
+ repl::ReadConcernLevel::kSnapshotReadConcern) {
+ executor.getValue()->enableSaveRecoveryUnitAcrossCommandsIfSupported();
+ }
+
+ return executor;
}
namespace {
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index 84d1ca7f82a..0f8c9bd4bb7 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -154,6 +154,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
const CollectionPtr* collection,
std::unique_ptr<CanonicalQuery> canonicalQuery,
std::function<void(CanonicalQuery*)> extractAndAttachPipelineStages,
+ bool allowMaintainValidCursorsAcrossCommands = true,
bool permitYield = false,
size_t plannerOptions = QueryPlannerParams::DEFAULT);
diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp
index 3b7a2db857f..3646464dde9 100644
--- a/src/mongo/db/query/plan_executor_sbe.cpp
+++ b/src/mongo/db/query/plan_executor_sbe.cpp
@@ -124,11 +124,12 @@ void PlanExecutorSBE::saveState() {
if (_isSaveRecoveryUnitAcrossCommandsEnabled) {
_root->saveState(false /* NOT relinquishing cursor */);
- // Put the RU into 'kCommit' mode so that subsequent calls to abandonSnapshot() keep
+ // Ensure the RU is in 'kCommit' mode so that the following call to abandonSnapshot() keeps
// cursors positioned. This ensures that no pointers into memory owned by the storage
// engine held by the SBE PlanStage tree become invalid while the executor is in a saved
// state.
- _opCtx->recoveryUnit()->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kCommit);
+ invariant(_opCtx->recoveryUnit()->abandonSnapshotMode() ==
+ RecoveryUnit::AbandonSnapshotMode::kCommit);
_opCtx->recoveryUnit()->abandonSnapshot();
} else {
_root->saveState(true /* relinquish cursor */);
@@ -142,12 +143,9 @@ void PlanExecutorSBE::restoreState(const RestoreContext& context) {
_yieldPolicy->setYieldable(context.collection());
if (_isSaveRecoveryUnitAcrossCommandsEnabled) {
+ invariant(_opCtx->recoveryUnit()->abandonSnapshotMode() ==
+ RecoveryUnit::AbandonSnapshotMode::kCommit);
_root->restoreState(false /* NOT relinquishing cursor */);
-
- // Put the RU back into 'kAbort' mode. Since the executor is now in a restored state, calls
- // to doAbandonSnapshot() only happen if the query has failed and the executor will not be
- // used again. In this case, we do not rely on the guarantees provided by 'kCommit' mode.
- _opCtx->recoveryUnit()->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kAbort);
} else {
_root->restoreState(true /* relinquish cursor */);
}
@@ -395,4 +393,17 @@ sbe::PlanState fetchNext(sbe::PlanStage* root,
return state;
}
+void PlanExecutorSBE::enableSaveRecoveryUnitAcrossCommandsIfSupported() {
+ // If we enable saving the recovery unit across commands, we must put the RecoveryUnit into a
+ // state where calls to abandonSnapshot() result in a transaction commit, instead of abort. The
+ // transaction commit guarantees that all open cursors will remain positioned and valid.
+ //
+ // To put the RecoveryUnit in this state, we bump a reference counter on it which tracks how
+ // many callers require that it be in commit mode. Note that under one RecoveryUnit there may
+ // be multiple PlanExecutors which require cursors to remain valid across abandonSnapshot()
+ // calls (for example, a pipeline containing $lookup).
+ _recoveryUnitCommitModeBlock.emplace(_opCtx->recoveryUnit());
+ _isSaveRecoveryUnitAcrossCommandsEnabled = true;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h
index a0f7a62f57b..4fadc99ebe1 100644
--- a/src/mongo/db/query/plan_executor_sbe.h
+++ b/src/mongo/db/query/plan_executor_sbe.h
@@ -50,6 +50,7 @@ public:
NamespaceString nss,
bool isOpen,
std::unique_ptr<PlanYieldPolicySBE> yieldPolicy);
+ ~PlanExecutorSBE() {}
CanonicalQuery* getCanonicalQuery() const override {
return _cq.get();
@@ -131,9 +132,7 @@ public:
return *_planExplainer;
}
- void enableSaveRecoveryUnitAcrossCommandsIfSupported() override {
- _isSaveRecoveryUnitAcrossCommandsEnabled = true;
- }
+ void enableSaveRecoveryUnitAcrossCommandsIfSupported() override;
bool isSaveRecoveryUnitAcrossCommandsEnabled() const override {
return _isSaveRecoveryUnitAcrossCommandsEnabled;
}
@@ -179,6 +178,10 @@ private:
bool _isDisposed{false};
bool _isSaveRecoveryUnitAcrossCommandsEnabled = false;
+ // If engaged, forces the recovery unit to commit instead of abort on calls to
+ // abandonSnapshot(). This field is only used when '_isSaveRecoveryUnitAcrossCommandsEnabled'
+ // is true.
+ boost::optional<AbandonSnapshotCommitModeBlock> _recoveryUnitCommitModeBlock;
};
/**
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index 58064f76d6e..3121e2a07e7 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -105,13 +105,11 @@ Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx,
// flag for the duration of yield will force any calls to abandonSnapshot() to
// commit the transaction, rather than abort it, in order to leave the cursors
// valid.
- opCtx->recoveryUnit()->setAbandonSnapshotMode(
- RecoveryUnit::AbandonSnapshotMode::kCommit);
+ opCtx->recoveryUnit()->incAbandonSnapshotCommitModeCount();
exitGuard.emplace([&] {
invariant(opCtx->recoveryUnit()->abandonSnapshotMode() ==
RecoveryUnit::AbandonSnapshotMode::kCommit);
- opCtx->recoveryUnit()->setAbandonSnapshotMode(
- RecoveryUnit::AbandonSnapshotMode::kAbort);
+ opCtx->recoveryUnit()->decAbandonSnapshotCommitModeCount();
});
}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
index 307c6e79078..21658dc9627 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
@@ -113,7 +113,7 @@ void RecoveryUnit::prepareUnitOfWork() {
void RecoveryUnit::doAbandonSnapshot() {
invariant(!_inUnitOfWork(), toString(_getState()));
- if (_abandonSnapshotMode == RecoveryUnit::AbandonSnapshotMode::kCommit) {
+ if (abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kCommit) {
invariant(!_dirty); // Cannot commit written data outside WUOW.
}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp
index 93781b84ed7..b43dfa6a3b1 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit_test.cpp
@@ -87,7 +87,7 @@ public:
TEST_F(EphemeralForTestRecoveryUnitTestHarness, AbandonSnapshotAbortMode) {
Lock::GlobalLock globalLk(opCtx.get(), MODE_IX);
- ru->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kAbort);
+ ASSERT(ru->abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kAbort);
const auto rs = harnessHelper->createRecordStore(opCtx.get(), "table1");
opCtx->lockState()->beginWriteUnitOfWork();
diff --git a/src/mongo/db/storage/recovery_unit.cpp b/src/mongo/db/storage/recovery_unit.cpp
index a913d99cc56..fbdcdaa920f 100644
--- a/src/mongo/db/storage/recovery_unit.cpp
+++ b/src/mongo/db/storage/recovery_unit.cpp
@@ -50,6 +50,15 @@ RecoveryUnit::RecoveryUnit() {
assignNextSnapshotId();
}
+void RecoveryUnit::incAbandonSnapshotCommitModeCount() {
+ ++_abandonSnapshotCommitModeCounter;
+}
+
+void RecoveryUnit::decAbandonSnapshotCommitModeCount() {
+ invariant(_abandonSnapshotCommitModeCounter > 0);
+ --_abandonSnapshotCommitModeCounter;
+}
+
void RecoveryUnit::assignNextSnapshotId() {
_mySnapshotId = nextSnapshotId.fetchAndAdd(1);
}
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index e53429cb490..2da225bb107 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -233,14 +233,25 @@ public:
assignNextSnapshotId();
}
- void setAbandonSnapshotMode(AbandonSnapshotMode mode) {
- _abandonSnapshotMode = mode;
- }
AbandonSnapshotMode abandonSnapshotMode() const {
- return _abandonSnapshotMode;
+ return (_abandonSnapshotCommitModeCounter > 0) ? AbandonSnapshotMode::kCommit
+ : AbandonSnapshotMode::kAbort;
}
/**
+ * Methods for modifying the 'abandonSnapshotModeCount.' When the count is greater than 0, the
+ * RecoveryUnit is in 'commit' mode, and calls to abandonSnapshot() will result in a transaction
+ * commit, preserving cursor positions.
+ *
+ * If the count is 0, the RecoveryUnit is in 'abort' mode and calls to abandonSnapshot() will
+ * result in a transaction abort.
+ *
+ * The count may not go below zero.
+ */
+ void incAbandonSnapshotCommitModeCount();
+ void decAbandonSnapshotCommitModeCount();
+
+ /**
* Informs the RecoveryUnit that a snapshot will be needed soon, if one was not already
* established. This specifically allows the storage engine to preallocate any required
* transaction resources while minimizing the critical section between generating a new
@@ -800,8 +811,6 @@ protected:
bool _noEvictionAfterRollback = false;
- AbandonSnapshotMode _abandonSnapshotMode = AbandonSnapshotMode::kAbort;
-
private:
// Sets the snapshot associated with this RecoveryUnit to a new globally unique id number.
void assignNextSnapshotId();
@@ -819,6 +828,10 @@ private:
std::unique_ptr<Change> _changeForCatalogVisibility;
State _state = State::kInactive;
uint64_t _mySnapshotId;
+
+ // Causes transactions to be committed instead of aborted by abandonSnapshot() if set higher
+ // than 0.
+ int _abandonSnapshotCommitModeCounter = 0;
};
/**
@@ -841,4 +854,19 @@ private:
RecoveryUnit* const _recoveryUnit;
};
+class AbandonSnapshotCommitModeBlock final {
+ AbandonSnapshotCommitModeBlock(const AbandonSnapshotCommitModeBlock&) = delete;
+ AbandonSnapshotCommitModeBlock& operator=(const AbandonSnapshotCommitModeBlock&) = delete;
+
+public:
+ AbandonSnapshotCommitModeBlock(RecoveryUnit* ru) : _recoveryUnit(ru) {
+ ru->incAbandonSnapshotCommitModeCount();
+ }
+ ~AbandonSnapshotCommitModeBlock() {
+ _recoveryUnit->decAbandonSnapshotCommitModeCount();
+ }
+
+private:
+ RecoveryUnit* _recoveryUnit;
+};
} // namespace mongo
diff --git a/src/mongo/db/storage/recovery_unit_test_harness.cpp b/src/mongo/db/storage/recovery_unit_test_harness.cpp
index 0c4cd3b1739..478e74ec710 100644
--- a/src/mongo/db/storage/recovery_unit_test_harness.cpp
+++ b/src/mongo/db/storage/recovery_unit_test_harness.cpp
@@ -191,7 +191,8 @@ TEST_F(RecoveryUnitTestHarness, AbortUnitOfWorkIncrementsSnapshotId) {
TEST_F(RecoveryUnitTestHarness, AbandonSnapshotCommitMode) {
Lock::GlobalLock globalLk(opCtx.get(), MODE_IX);
- ru->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kCommit);
+ ru->incAbandonSnapshotCommitModeCount();
+ ASSERT(ru->abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kCommit);
const auto rs = harnessHelper->createRecordStore(opCtx.get(), "table1");
opCtx->lockState()->beginWriteUnitOfWork();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 176ad6694af..19b40e4403a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -352,7 +352,7 @@ void WiredTigerRecoveryUnit::doAbandonSnapshot() {
if (_isActive()) {
// Can't be in a WriteUnitOfWork, so safe to rollback if the AbandonSnapshotMode is
// kAbort. If kCommit, however, then any active cursors will remain positioned and valid.
- _txnClose(_abandonSnapshotMode == AbandonSnapshotMode::kCommit /* commit */);
+ _txnClose(abandonSnapshotMode() == AbandonSnapshotMode::kCommit /* commit */);
}
_setState(State::kInactive);
}
@@ -379,7 +379,7 @@ void WiredTigerRecoveryUnit::refreshSnapshot() {
invariant(_isActive());
invariant(!_inUnitOfWork());
invariant(!_noEvictionAfterRollback);
- invariant(_abandonSnapshotMode == AbandonSnapshotMode::kAbort);
+ invariant(abandonSnapshotMode() == AbandonSnapshotMode::kAbort);
auto newSession = _sessionCache->getSession();
WiredTigerBeginTxnBlock txnOpen(newSession->getSession(),
@@ -493,7 +493,6 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
LOGV2_DEBUG(
22412, 3, "WT commit_transaction", "snapshotId"_attr = getSnapshotId().toNumber());
} else {
- invariant(_abandonSnapshotMode == AbandonSnapshotMode::kAbort);
StringBuilder config;
if (_noEvictionAfterRollback) {
// The only point at which rollback_transaction() can time out is in the bonus-eviction
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
index 07480a1ce6a..db277ca22b2 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
@@ -816,7 +816,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, MultiTimestampConstraintsInternalState
}
TEST_F(WiredTigerRecoveryUnitTestFixture, AbandonSnapshotAbortMode) {
- ru1->setAbandonSnapshotMode(RecoveryUnit::AbandonSnapshotMode::kAbort);
+ ASSERT(ru1->abandonSnapshotMode() == RecoveryUnit::AbandonSnapshotMode::kAbort);
OperationContext* opCtx = clientAndCtx1.second.get();
const char* const key = "key";
diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp
index b01259efd5e..be5acc542cd 100644
--- a/src/mongo/db/transaction_history_iterator.cpp
+++ b/src/mongo/db/transaction_history_iterator.cpp
@@ -90,6 +90,7 @@ BSONObj findOneOplogEntry(OperationContext* opCtx,
&oplogRead.getCollection(),
std::move(cq),
nullptr /*extractAndAttachPipelineStages */,
+ true, /* allow saving cursors across commands */
permitYield));
PlanExecutor::ExecState getNextResult;
diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp
index 1cc8ee1f535..142642b88d9 100644
--- a/src/mongo/dbtests/cursor_manager_test.cpp
+++ b/src/mongo/dbtests/cursor_manager_test.cpp
@@ -100,6 +100,7 @@ public:
ClientCursorParams makeParams(OperationContext* opCtx) {
return {
makeFakePlanExecutor(opCtx),
+ nullptr, /* recoveryUnit */
kTestNss,
{},
APIParameters(),
@@ -152,6 +153,7 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursor) {
auto cursorPin = cursorManager->registerCursor(
pinningOpCtx,
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
kTestNss,
{},
APIParameters(),
@@ -179,6 +181,7 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursorMultiClient) {
auto cursorPin = cursorManager->registerCursor(
pinningOpCtx,
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
kTestNss,
{},
APIParameters(),
@@ -217,6 +220,7 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) {
cursorManager->registerCursor(_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
NamespaceString{"test.collection"},
{},
APIParameters(),
@@ -234,6 +238,7 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) {
cursorManager->registerCursor(_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
NamespaceString{"test.collection"},
{},
APIParameters(),
@@ -256,6 +261,7 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) {
auto cursorPin = cursorManager->registerCursor(
_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
NamespaceString{"test.collection"},
{},
APIParameters(),
@@ -282,6 +288,7 @@ TEST_F(CursorManagerTest, MarkedAsKilledCursorsShouldBeDeletedOnCursorPin) {
auto cursorPin = cursorManager->registerCursor(
_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
NamespaceString{"test.collection"},
{},
APIParameters(),
@@ -317,6 +324,7 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) {
auto cursorPin = cursorManager->registerCursor(
_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
NamespaceString{"test.collection"},
{},
APIParameters(),
@@ -351,6 +359,7 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) {
auto cursorPin = cursorManager->registerCursor(
_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
kTestNss,
{},
APIParameters(),
@@ -366,6 +375,7 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) {
// schedule.
cursorManager->registerCursor(_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
kTestNss,
{},
APIParameters(),
@@ -405,6 +415,7 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing
auto cursorPin = cursorManager->registerCursor(
_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
kTestNss,
{},
APIParameters(),
@@ -448,6 +459,7 @@ TEST_F(CursorManagerTest, CursorStoresAPIParameters) {
auto cursorPin = cursorManager->registerCursor(
_opCtx.get(),
{makeFakePlanExecutor(),
+ nullptr, /* recoveryUnit */
kTestNss,
{},
apiParams,