summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/map_reduce_agg.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/map_reduce_agg.cpp')
-rw-r--r--src/mongo/db/commands/map_reduce_agg.cpp26
1 files changed, 25 insertions, 1 deletions
diff --git a/src/mongo/db/commands/map_reduce_agg.cpp b/src/mongo/db/commands/map_reduce_agg.cpp
index 68ad9cb031d..8ca42eec7b0 100644
--- a/src/mongo/db/commands/map_reduce_agg.cpp
+++ b/src/mongo/db/commands/map_reduce_agg.cpp
@@ -43,9 +43,11 @@
#include "mongo/db/commands/map_reduce_javascript_code.h"
#include "mongo/db/commands/map_reduce_stats.h"
#include "mongo/db/commands/mr_common.h"
+#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/query/map_reduce_output_format.h"
@@ -78,7 +80,7 @@ auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) {
// Manually build an ExpressionContext with the desired options for the translated
// aggregation. The one option worth noting here is allowDiskUse, which is required to allow
// the $group stage of the translated pipeline to spill to disk.
- return make_intrusive<ExpressionContext>(
+ auto expCtx = make_intrusive<ExpressionContext>(
opCtx,
boost::none, // explain
false, // fromMongos
@@ -91,6 +93,8 @@ auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) {
MongoProcessInterface::create(opCtx),
StringMap<ExpressionContext::ResolvedNamespace>{}, // resolvedNamespaces
uuid);
+ expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
+ return expCtx;
}
std::vector<CommonStats> extractStats(const Pipeline& pipeline) {
@@ -127,8 +131,19 @@ bool runAggregationMapReduce(OperationContext* opCtx,
expCtx, pipeline.release());
}();
+ {
+ auto planSummaryStr = PipelineD::getPlanSummaryStr(runnablePipeline.get());
+
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setPlanSummary_inlock(std::move(planSummaryStr));
+ }
+
auto resultArray = exhaustPipelineIntoBSONArray(runnablePipeline);
+ PlanSummaryStats planSummaryStats;
+ PipelineD::getPlanSummaryStats(runnablePipeline.get(), &planSummaryStats);
+ CurOp::get(opCtx)->debug().setPlanSummaryMetrics(planSummaryStats);
+
MapReduceStats mapReduceStats(extractStats(*runnablePipeline),
MapReduceStats::ResponseType::kUnsharded,
boost::get_optional_value_or(parsedMr.getVerbose(), false),
@@ -147,6 +162,15 @@ bool runAggregationMapReduce(OperationContext* opCtx,
&result);
}
+ // The aggregation pipeline may change the namespace of the curop and we need to set it back to
+ // the original namespace to correctly report command stats. One example when the namespace can
+ // be changed is when the pipeline contains an $out stage, which executes an internal command to
+ // create a temp collection, changing the curop namespace to the name of this temp collection.
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setNS_inlock(parsedMr.getNamespace().ns());
+ }
+
return true;
}