summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/pipeline_command.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp71
1 files changed, 37 insertions, 34 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 0f07e38c830..4fcfb97f574 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -88,7 +88,7 @@ namespace {
* namespace used in the returned cursor. In the case of views, this can be different from that
* in 'request'.
*/
-bool handleCursorCommand(OperationContext* txn,
+bool handleCursorCommand(OperationContext* opCtx,
const string& nsForCursor,
ClientCursor* cursor,
PlanExecutor* exec,
@@ -150,16 +150,16 @@ bool handleCursorCommand(OperationContext* txn,
if (cursor) {
// If a time limit was set on the pipeline, remaining time is "rolled over" to the
// cursor (for use by future getmore ops).
- cursor->setLeftoverMaxTimeMicros(txn->getRemainingMaxTimeMicros());
+ cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
- CurOp::get(txn)->debug().cursorid = cursor->cursorid();
+ CurOp::get(opCtx)->debug().cursorid = cursor->cursorid();
// Cursor needs to be in a saved state while we yield locks for getmore. State
// will be restored in getMore().
exec->saveState();
exec->detachFromOperationContext();
} else {
- CurOp::get(txn)->debug().cursorExhausted = true;
+ CurOp::get(opCtx)->debug().cursorExhausted = true;
}
const long long cursorId = cursor ? cursor->cursorid() : 0LL;
@@ -169,12 +169,12 @@ bool handleCursorCommand(OperationContext* txn,
}
StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNamespaces(
- OperationContext* txn, const AggregationRequest& request) {
+ OperationContext* opCtx, const AggregationRequest& request) {
// We intentionally do not drop and reacquire our DB lock after resolving the view definition in
// order to prevent the definition for any view namespaces we've already resolved from changing.
// This is necessary to prevent a cycle from being formed among the view definitions cached in
// 'resolvedNamespaces' because we won't re-resolve a view namespace we've already encountered.
- AutoGetDb autoDb(txn, request.getNamespaceString().db(), MODE_IS);
+ AutoGetDb autoDb(opCtx, request.getNamespaceString().db(), MODE_IS);
Database* const db = autoDb.getDb();
ViewCatalog* viewCatalog = db ? db->getViewCatalog() : nullptr;
@@ -199,9 +199,9 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
// pipeline because 'involvedNs' doesn't refer to a view namespace in our consistent
// snapshot of the view catalog.
resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}};
- } else if (viewCatalog->lookup(txn, involvedNs.ns())) {
+ } else if (viewCatalog->lookup(opCtx, involvedNs.ns())) {
// If 'involvedNs' refers to a view namespace, then we resolve its definition.
- auto resolvedView = viewCatalog->resolveView(txn, involvedNs);
+ auto resolvedView = viewCatalog->resolveView(opCtx, involvedNs);
if (!resolvedView.isOK()) {
return {ErrorCodes::FailedToParse,
str::stream() << "Failed to resolve view '" << involvedNs.ns() << "': "
@@ -265,7 +265,7 @@ boost::intrusive_ptr<Pipeline> reparsePipeline(
* Returns Status::OK if each view namespace in 'pipeline' has a default collator equivalent to
* 'collator'. Otherwise, returns ErrorCodes::OptionNotSupportedOnView.
*/
-Status collatorCompatibleWithPipeline(OperationContext* txn,
+Status collatorCompatibleWithPipeline(OperationContext* opCtx,
Database* db,
const CollatorInterface* collator,
const intrusive_ptr<Pipeline> pipeline) {
@@ -277,7 +277,7 @@ Status collatorCompatibleWithPipeline(OperationContext* txn,
continue;
}
- auto view = db->getViewCatalog()->lookup(txn, potentialViewNs.ns());
+ auto view = db->getViewCatalog()->lookup(opCtx, potentialViewNs.ns());
if (!view) {
continue;
}
@@ -339,7 +339,7 @@ public:
return AuthorizationSession::get(client)->checkAuthForAggregate(nss, cmdObj);
}
- bool runParsed(OperationContext* txn,
+ bool runParsed(OperationContext* opCtx,
const NamespaceString& origNss,
const AggregationRequest& request,
BSONObj& cmdObj,
@@ -351,14 +351,14 @@ public:
// Parse the user-specified collation, if any.
std::unique_ptr<CollatorInterface> userSpecifiedCollator = request.getCollation().isEmpty()
? nullptr
- : uassertStatusOK(CollatorFactoryInterface::get(txn->getServiceContext())
+ : uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(request.getCollation()));
boost::optional<ClientCursorPin> pin; // either this OR the exec will be non-null
unique_ptr<PlanExecutor> exec;
boost::intrusive_ptr<ExpressionContext> expCtx;
boost::intrusive_ptr<Pipeline> pipeline;
- auto curOp = CurOp::get(txn);
+ auto curOp = CurOp::get(opCtx);
{
// This will throw if the sharding version for this connection is out of date. If the
// namespace is a view, the lock will be released before re-running the aggregation.
@@ -367,7 +367,7 @@ public:
// same sharding version that we synchronize on here. This is also why we always need to
// create a ClientCursor even when we aren't outputting to a cursor. See the comment on
// ShardFilterStage for more details.
- AutoGetCollectionOrViewForRead ctx(txn, nss);
+ AutoGetCollectionOrViewForRead ctx(opCtx, nss);
Collection* collection = ctx.getCollection();
// If this is a view, resolve it by finding the underlying collection and stitching view
@@ -390,7 +390,7 @@ public:
}
auto viewDefinition =
- ViewShardingCheck::getResolvedViewIfSharded(txn, ctx.getDb(), ctx.getView());
+ ViewShardingCheck::getResolvedViewIfSharded(opCtx, ctx.getDb(), ctx.getView());
if (!viewDefinition.isOK()) {
return appendCommandStatus(result, viewDefinition.getStatus());
}
@@ -400,7 +400,7 @@ public:
return false;
}
- auto resolvedView = ctx.getDb()->getViewCatalog()->resolveView(txn, nss);
+ auto resolvedView = ctx.getDb()->getViewCatalog()->resolveView(opCtx, nss);
if (!resolvedView.isOK()) {
return appendCommandStatus(result, resolvedView.getStatus());
}
@@ -425,11 +425,11 @@ public:
newRequest.getValue().setCollation(collationSpec);
bool status = runParsed(
- txn, origNss, newRequest.getValue(), newCmd.getValue(), errmsg, result);
+ opCtx, origNss, newRequest.getValue(), newCmd.getValue(), errmsg, result);
{
// Set the namespace of the curop back to the view namespace so ctx records
// stats on this view namespace on destruction.
- stdx::lock_guard<Client> lk(*txn->getClient());
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setNS_inlock(nss.ns());
}
return status;
@@ -449,10 +449,10 @@ public:
}
expCtx.reset(
- new ExpressionContext(txn,
+ new ExpressionContext(opCtx,
request,
std::move(collatorToUse),
- uassertStatusOK(resolveInvolvedNamespaces(txn, request))));
+ uassertStatusOK(resolveInvolvedNamespaces(opCtx, request))));
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
// Parse the pipeline.
@@ -465,7 +465,7 @@ public:
// Check that the view's collation matches the collation of any views involved
// in the pipeline.
auto pipelineCollationStatus =
- collatorCompatibleWithPipeline(txn, ctx.getDb(), expCtx->getCollator(), pipeline);
+ collatorCompatibleWithPipeline(opCtx, ctx.getDb(), expCtx->getCollator(), pipeline);
if (!pipelineCollationStatus.isOK()) {
return appendCommandStatus(result, pipelineCollationStatus);
}
@@ -488,19 +488,22 @@ public:
// ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
// PlanExecutor.
auto ws = make_unique<WorkingSet>();
- auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, ws.get());
+ auto proxy = make_unique<PipelineProxyStage>(opCtx, pipeline, ws.get());
auto statusWithPlanExecutor = (NULL == collection)
? PlanExecutor::make(
- txn, std::move(ws), std::move(proxy), nss.ns(), PlanExecutor::YIELD_MANUAL)
- : PlanExecutor::make(
- txn, std::move(ws), std::move(proxy), collection, PlanExecutor::YIELD_MANUAL);
+ opCtx, std::move(ws), std::move(proxy), nss.ns(), PlanExecutor::YIELD_MANUAL)
+ : PlanExecutor::make(opCtx,
+ std::move(ws),
+ std::move(proxy),
+ collection,
+ PlanExecutor::YIELD_MANUAL);
invariant(statusWithPlanExecutor.isOK());
exec = std::move(statusWithPlanExecutor.getValue());
{
auto planSummary = Explain::getPlanSummary(exec.get());
- stdx::lock_guard<Client> lk(*txn->getClient());
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setPlanSummary_inlock(std::move(planSummary));
}
@@ -509,7 +512,7 @@ public:
pin.emplace(collection->getCursorManager()->registerCursor(
{exec.release(),
nss.ns(),
- txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
0,
cmdObj.getOwned(),
isAggCursor}));
@@ -533,7 +536,7 @@ public:
result << "stages" << Value(pipeline->writeExplainOps());
} else {
// Cursor must be specified, if explain is not.
- keepCursor = handleCursorCommand(txn,
+ keepCursor = handleCursorCommand(opCtx,
origNss.ns(),
pin ? pin->getCursor() : nullptr,
pin ? pin->getCursor()->getExecutor() : exec.get(),
@@ -556,8 +559,8 @@ public:
// AutoGetCollectionForRead. AutoGetCollectionForRead will throw if the
// sharding version is out of date, and we don't care if the sharding version
// has changed.
- Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS);
- Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS);
+ Lock::DBLock dbLock(opCtx->lockState(), nss.db(), MODE_IS);
+ Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS);
if (keepCursor) {
pin->release();
} else {
@@ -567,8 +570,8 @@ public:
} catch (...) {
// On our way out of scope, we clean up our ClientCursorPin if needed.
if (pin) {
- Lock::DBLock dbLock(txn->lockState(), nss.db(), MODE_IS);
- Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IS);
+ Lock::DBLock dbLock(opCtx->lockState(), nss.db(), MODE_IS);
+ Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS);
pin->deleteUnderlying();
}
throw;
@@ -577,7 +580,7 @@ public:
return appendCommandStatus(result, Status::OK());
}
- virtual bool run(OperationContext* txn,
+ virtual bool run(OperationContext* opCtx,
const string& db,
BSONObj& cmdObj,
int options,
@@ -607,7 +610,7 @@ public:
"http://dochub.mongodb.org/core/3.4-feature-compatibility."));
}
- return runParsed(txn, nss, request.getValue(), cmdObj, errmsg, result);
+ return runParsed(opCtx, nss, request.getValue(), cmdObj, errmsg, result);
}
};