summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/pipeline_command.cpp
diff options
context:
space:
mode:
authorGeert Bosch <geert@mongodb.com>2016-08-18 19:36:15 +0100
committerGeert Bosch <geert@mongodb.com>2016-08-22 17:04:08 +0100
commitc14515630a02136b60e49c8e15e7135cf8153497 (patch)
tree057758a3c991dc988f75e4df6a0e61789ad3f3e7 /src/mongo/db/commands/pipeline_command.cpp
parente576de40f3629649c453f437ad18a2a86b433509 (diff)
downloadmongo-c14515630a02136b60e49c8e15e7135cf8153497.tar.gz
SERVER-24771 Use view namespace in cursors for aggregate/getMore
Diffstat (limited to 'src/mongo/db/commands/pipeline_command.cpp')
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp90
1 files changed, 54 insertions, 36 deletions
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 2aa42c8f736..4f64276be72 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -79,10 +79,12 @@ namespace {
/**
* Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore
- * requests). Otherwise, returns false.
+ * requests). Otherwise, returns false. The passed 'nsForCursor' is only used to determine the
+ * namespace used in the returned cursor. In the case of views, this can be different from that
+ * in 'request'.
*/
bool handleCursorCommand(OperationContext* txn,
- const string& ns,
+ const string& nsForCursor,
ClientCursorPin* pin,
PlanExecutor* exec,
const AggregationRequest& request,
@@ -139,7 +141,7 @@ bool handleCursorCommand(OperationContext* txn,
17391,
str::stream() << "Aggregation has more results than fit in initial batch, but can't "
<< "create cursor since collection "
- << ns
+ << nsForCursor
<< " doesn't exist");
}
@@ -159,7 +161,7 @@ bool handleCursorCommand(OperationContext* txn,
}
const long long cursorId = cursor ? cursor->cursorid() : 0LL;
- appendCursorResponseObject(cursorId, ns, resultsArray.arr(), &result);
+ appendCursorResponseObject(cursorId, nsForCursor, resultsArray.arr(), &result);
return static_cast<bool>(cursor);
}
@@ -303,31 +305,21 @@ public:
return Pipeline::checkAuthForCommand(client, dbname, cmdObj);
}
- virtual bool run(OperationContext* txn,
- const string& db,
- BSONObj& cmdObj,
- int options,
- string& errmsg,
- BSONObjBuilder& result) {
- const std::string ns = parseNs(db, cmdObj);
- if (nsToCollectionSubstring(ns).empty()) {
- errmsg = "missing collection name";
- return false;
- }
- NamespaceString nss(ns);
-
- // Parse the options for this request.
- auto request = AggregationRequest::parseFromBSON(nss, cmdObj);
- if (!request.isOK()) {
- return appendCommandStatus(result, request.getStatus());
- }
+ bool runParsed(OperationContext* txn,
+ const NamespaceString& origNss,
+ const AggregationRequest& request,
+ BSONObj& cmdObj,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ // For operations on views, this will be the underlying namespace.
+ const NamespaceString& nss = request.getNamespaceString();
// Set up the ExpressionContext.
- intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(txn, request.getValue());
+ intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(txn, request);
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
// Parse the pipeline.
- auto statusWithPipeline = Pipeline::parse(request.getValue().getPipeline(), expCtx);
+ auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx);
if (!statusWithPipeline.isOK()) {
return appendCommandStatus(result, statusWithPipeline.getStatus());
}
@@ -388,13 +380,18 @@ public:
ctx.releaseLocksForView();
// Parse the resolved view into a new aggregation request.
- auto viewCmd =
- resolvedView.getValue().asExpandedViewAggregation(request.getValue());
- if (!viewCmd.isOK()) {
- return appendCommandStatus(result, viewCmd.getStatus());
+ auto newCmd = resolvedView.getValue().asExpandedViewAggregation(request);
+ if (!newCmd.isOK()) {
+ return appendCommandStatus(result, newCmd.getStatus());
+ }
+ auto newNss = resolvedView.getValue().getNamespace();
+ auto newRequest = AggregationRequest::parseFromBSON(newNss, newCmd.getValue());
+ if (!newRequest.isOK()) {
+ return appendCommandStatus(result, newRequest.getStatus());
}
- bool status = this->run(txn, db, viewCmd.getValue(), options, errmsg, result);
+ bool status = runParsed(
+ txn, origNss, newRequest.getValue(), newCmd.getValue(), errmsg, result);
{
// Set the namespace of the curop back to the view namespace so ctx records
// stats on this view namespace on destruction.
@@ -406,7 +403,7 @@ public:
// If the pipeline does not have a user-specified collation, set it from the collection
// default.
- if (request.getValue().getCollation().isEmpty() && collection &&
+ if (request.getCollation().isEmpty() && collection &&
collection->getDefaultCollator()) {
invariant(!expCtx->getCollator());
expCtx->setCollator(collection->getDefaultCollator()->clone());
@@ -427,7 +424,7 @@ public:
// re-parsing every command in debug builds. This is important because sharded
// aggregations rely on this ability. Skipping when inShard because this has
// already been through the transformation (and this un-sets expCtx->inShard).
- pipeline = reparsePipeline(pipeline, request.getValue(), expCtx);
+ pipeline = reparsePipeline(pipeline, request, expCtx);
}
// This does mongod-specific stuff like creating the input PlanExecutor and adding
@@ -492,7 +489,7 @@ public:
// BSONObj will also fit inside a single batch.
//
// We occasionally log a deprecation warning.
- if (!request.getValue().isCursorCommand()) {
+ if (!request.isCursorCommand()) {
RARELY {
warning()
<< "Use of the aggregate command without the 'cursor' "
@@ -504,12 +501,12 @@ public:
// If both explain and cursor are specified, explain wins.
if (expCtx->isExplain) {
result << "stages" << Value(pipeline->writeExplainOps());
- } else if (request.getValue().isCursorCommand()) {
+ } else if (request.isCursorCommand()) {
keepCursor = handleCursorCommand(txn,
- nss.ns(),
+ origNss.ns(),
pin.get(),
pin ? pin->c()->getExecutor() : exec.get(),
- request.getValue(),
+ request,
result);
} else {
pipeline->run(result);
@@ -547,9 +544,30 @@ public:
throw;
}
// Any code that needs the cursor pinned must be inside the try block, above.
-
return appendCommandStatus(result, Status::OK());
}
+
+ virtual bool run(OperationContext* txn,
+ const string& db,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ const std::string ns = parseNs(db, cmdObj);
+ if (nsToCollectionSubstring(ns).empty()) {
+ errmsg = "missing collection name";
+ return false;
+ }
+ NamespaceString nss(ns);
+
+ // Parse the options for this request.
+ auto request = AggregationRequest::parseFromBSON(nss, cmdObj);
+ if (!request.isOK()) {
+ return appendCommandStatus(result, request.getStatus());
+ }
+
+ return runParsed(txn, nss, request.getValue(), cmdObj, errmsg, result);
+ }
};
MONGO_INITIALIZER(PipelineCommand)(InitializerContext* context) {