diff options
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 71 |
1 files changed, 37 insertions, 34 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 0f07e38c830..4fcfb97f574 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -88,7 +88,7 @@ namespace { * namespace used in the returned cursor. In the case of views, this can be different from that * in 'request'. */ -bool handleCursorCommand(OperationContext* txn, +bool handleCursorCommand(OperationContext* opCtx, const string& nsForCursor, ClientCursor* cursor, PlanExecutor* exec, @@ -150,16 +150,16 @@ bool handleCursorCommand(OperationContext* txn, if (cursor) { // If a time limit was set on the pipeline, remaining time is "rolled over" to the // cursor (for use by future getmore ops). - cursor->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros()); + cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); - CurOp::get(txn)->debug().cursorid = cursor->cursorid(); + CurOp::get(opCtx)->debug().cursorid = cursor->cursorid(); // Cursor needs to be in a saved state while we yield locks for getmore. State // will be restored in getMore(). exec->saveState(); exec->detachFromOperationContext(); } else { - CurOp::get(txn)->debug().cursorExhausted = true; + CurOp::get(opCtx)->debug().cursorExhausted = true; } const long long cursorId = cursor ? cursor->cursorid() : 0LL; @@ -169,12 +169,12 @@ bool handleCursorCommand(OperationContext* txn, } StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNamespaces( - OperationContext* txn, const AggregationRequest& request) { + OperationContext* opCtx, const AggregationRequest& request) { // We intentionally do not drop and reacquire our DB lock after resolving the view definition in // order to prevent the definition for any view namespaces we've already resolved from changing. // This is necessary to prevent a cycle from being formed among the view definitions cached in // 'resolvedNamespaces' because we won't re-resolve a view namespace we've already encountered. - AutoGetDb autoDb(txn, request.getNamespaceString().db(), MODE_IS); + AutoGetDb autoDb(opCtx, request.getNamespaceString().db(), MODE_IS); Database* const db = autoDb.getDb(); ViewCatalog* viewCatalog = db ? db->getViewCatalog() : nullptr; @@ -199,9 +199,9 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames // pipeline because 'involvedNs' doesn't refer to a view namespace in our consistent // snapshot of the view catalog. resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}}; - } else if (viewCatalog->lookup(txn, involvedNs.ns())) { + } else if (viewCatalog->lookup(opCtx, involvedNs.ns())) { // If 'involvedNs' refers to a view namespace, then we resolve its definition. - auto resolvedView = viewCatalog->resolveView(txn, involvedNs); + auto resolvedView = viewCatalog->resolveView(opCtx, involvedNs); if (!resolvedView.isOK()) { return {ErrorCodes::FailedToParse, str::stream() << "Failed to resolve view '" << involvedNs.ns() << "': " @@ -265,7 +265,7 @@ boost::intrusive_ptr<Pipeline> reparsePipeline( * Returns Status::OK if each view namespace in 'pipeline' has a default collator equivalent to * 'collator'. Otherwise, returns ErrorCodes::OptionNotSupportedOnView. */ -Status collatorCompatibleWithPipeline(OperationContext* txn, +Status collatorCompatibleWithPipeline(OperationContext* opCtx, Database* db, const CollatorInterface* collator, const intrusive_ptr<Pipeline> pipeline) { @@ -277,7 +277,7 @@ Status collatorCompatibleWithPipeline(OperationContext* txn, continue; } - auto view = db->getViewCatalog()->lookup(txn, potentialViewNs.ns()); + auto view = db->getViewCatalog()->lookup(opCtx, potentialViewNs.ns()); if (!view) { continue; } @@ -339,7 +339,7 @@ public: return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj); } - bool runParsed(OperationContext* txn, + bool runParsed(OperationContext* opCtx, const NamespaceString& origNss, const AggregationRequest& request, BSONObj& cmdObj, @@ -351,14 +351,14 @@ public: // Parse the user-specified collation, if any. std::unique_ptr<CollatorInterface> userSpecifiedCollator = request.getCollation().isEmpty() ? nullptr - : uassertStatusOK(CollatorFactoryInterface::get(txn->getServiceContext()) + : uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(request.getCollation())); boost::optional<ClientCursorPin> pin; // either this OR the exec will be non-null unique_ptr<PlanExecutor> exec; boost::intrusive_ptr<ExpressionContext> expCtx; boost::intrusive_ptr<Pipeline> pipeline; - auto curOp = CurOp::get(txn); + auto curOp = CurOp::get(opCtx); { // This will throw if the sharding version for this connection is out of date. If the // namespace is a view, the lock will be released before re-running the aggregation. @@ -367,7 +367,7 @@ public: // same sharding version that we synchronize on here. This is also why we always need to // create a ClientCursor even when we aren't outputting to a cursor. See the comment on // ShardFilterStage for more details. - AutoGetCollectionOrViewForRead ctx(txn, nss); + AutoGetCollectionOrViewForRead ctx(opCtx, nss); Collection* collection = ctx.getCollection(); // If this is a view, resolve it by finding the underlying collection and stitching view @@ -390,7 +390,7 @@ public: } auto viewDefinition = - ViewShardingCheck::getResolvedViewIfSharded(txn, ctx.getDb(), ctx.getView()); + ViewShardingCheck::getResolvedViewIfSharded(opCtx, ctx.getDb(), ctx.getView()); if (!viewDefinition.isOK()) { return appendCommandStatus(result, viewDefinition.getStatus()); } @@ -400,7 +400,7 @@ public: return false; } - auto resolvedView = ctx.getDb()->getViewCatalog()->resolveView(txn, nss); + auto resolvedView = ctx.getDb()->getViewCatalog()->resolveView(opCtx, nss); if (!resolvedView.isOK()) { return appendCommandStatus(result, resolvedView.getStatus()); } @@ -425,11 +425,11 @@ public: newRequest.getValue().setCollation(collationSpec); bool status = runParsed( - txn, origNss, newRequest.getValue(), newCmd.getValue(), errmsg, result); + opCtx, origNss, newRequest.getValue(), newCmd.getValue(), errmsg, result); { // Set the namespace of the curop back to the view namespace so ctx records // stats on this view namespace on destruction. - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setNS_inlock(nss.ns()); } return status; @@ -449,10 +449,10 @@ public: } expCtx.reset( - new ExpressionContext(txn, + new ExpressionContext(opCtx, request, std::move(collatorToUse), - uassertStatusOK(resolveInvolvedNamespaces(txn, request)))); + uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)))); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; // Parse the pipeline. @@ -465,7 +465,7 @@ public: // Check that the view's collation matches the collation of any views involved // in the pipeline. auto pipelineCollationStatus = - collatorCompatibleWithPipeline(txn, ctx.getDb(), expCtx->getCollator(), pipeline); + collatorCompatibleWithPipeline(opCtx, ctx.getDb(), expCtx->getCollator(), pipeline); if (!pipelineCollationStatus.isOK()) { return appendCommandStatus(result, pipelineCollationStatus); } @@ -488,19 +488,22 @@ public: // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created // PlanExecutor. auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, ws.get()); + auto proxy = make_unique<PipelineProxyStage>(opCtx, pipeline, ws.get()); auto statusWithPlanExecutor = (NULL == collection) ? PlanExecutor::make( - txn, std::move(ws), std::move(proxy), nss.ns(), PlanExecutor::YIELD_MANUAL) - : PlanExecutor::make( - txn, std::move(ws), std::move(proxy), collection, PlanExecutor::YIELD_MANUAL); + opCtx, std::move(ws), std::move(proxy), nss.ns(), PlanExecutor::YIELD_MANUAL) + : PlanExecutor::make(opCtx, + std::move(ws), + std::move(proxy), + collection, + PlanExecutor::YIELD_MANUAL); invariant(statusWithPlanExecutor.isOK()); exec = std::move(statusWithPlanExecutor.getValue()); { auto planSummary = Explain::getPlanSummary(exec.get()); - stdx::lock_guard<Client> lk(*txn->getClient()); + stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(std::move(planSummary)); } @@ -509,7 +512,7 @@ public: pin.emplace(collection->getCursorManager()->registerCursor( {exec.release(), nss.ns(), - txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), 0, cmdObj.getOwned(), isAggCursor})); @@ -533,7 +536,7 @@ public: result << "stages" << Value(pipeline->writeExplainOps()); } else { // Cursor must be specified, if explain is not. - keepCursor = handleCursorCommand(txn, + keepCursor = handleCursorCommand(opCtx, origNss.ns(), pin ? pin->getCursor() : nullptr, pin ? pin->getCursor()->getExecutor() : exec.get(), @@ -556,8 +559,8 @@ public: // AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the // sharding version is out of date, and we don't care if the sharding version // has changed. - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS); - Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS); + Lock::DBLock dbLock(opCtx->lockState(), nss.db(), MODE_IS); + Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS); if (keepCursor) { pin->release(); } else { @@ -567,8 +570,8 @@ public: } catch (...) { // On our way out of scope, we clean up our ClientCursorPin if needed. if (pin) { - Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS); - Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS); + Lock::DBLock dbLock(opCtx->lockState(), nss.db(), MODE_IS); + Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS); pin->deleteUnderlying(); } throw; @@ -577,7 +580,7 @@ public: return appendCommandStatus(result, Status::OK()); } - virtual bool run(OperationContext* txn, + virtual bool run(OperationContext* opCtx, const string& db, BSONObj& cmdObj, int options, @@ -607,7 +610,7 @@ public: "http://dochub.mongodb.org/core/3.4-feature-compatibility.")); } - return runParsed(txn, nss, request.getValue(), cmdObj, errmsg, result); + return runParsed(opCtx, nss, request.getValue(), cmdObj, errmsg, result); } }; |