summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/map_reduce_agg.cpp
diff options
context:
space:
mode:
authorNicholas Zolnierz <nicholas.zolnierz@mongodb.com>2019-09-13 15:55:58 +0000
committerevergreen <evergreen@mongodb.com>2019-09-13 15:55:58 +0000
commitf0c5f3000d83f806272c010002f8c64513a09330 (patch)
treec5da861e0563ddc10b75d885629958b7b998ca50 /src/mongo/db/commands/map_reduce_agg.cpp
parentb5c38b96c50e13be68a5a6968b441a879c0a3c53 (diff)
downloadmongo-f0c5f3000d83f806272c010002f8c64513a09330.tar.gz
SERVER-42903 Enable execution of translated M/R pipeline
Diffstat (limited to 'src/mongo/db/commands/map_reduce_agg.cpp')
-rw-r--r--src/mongo/db/commands/map_reduce_agg.cpp26
1 files changed, 18 insertions, 8 deletions
diff --git a/src/mongo/db/commands/map_reduce_agg.cpp b/src/mongo/db/commands/map_reduce_agg.cpp
index a1866c9372c..774237627fa 100644
--- a/src/mongo/db/commands/map_reduce_agg.cpp
+++ b/src/mongo/db/commands/map_reduce_agg.cpp
@@ -241,28 +241,38 @@ bool runAggregationMapReduce(OperationContext* opCtx,
};
auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
- bool inMemory = parsedMr.getOutOptions().getOutputType() == OutputType::InMemory;
+ auto expCtx = makeExpressionContext(opCtx, parsedMr);
+ auto runnablePipeline = [&]() {
+ auto pipeline = translateFromMR(parsedMr, expCtx);
+ return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
+ expCtx, pipeline.release());
+ }();
- auto pipe = translateFromMR(parsedMr, makeExpressionContext(opCtx, parsedMr));
+ if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) {
+ map_reduce_output_format::appendInlineResponse(
+ exhaustPipelineIntoBSONArray(runnablePipeline),
+ boost::get_optional_value_or(parsedMr.getVerbose(), false),
+ false,
+ &result);
+ } else {
+ // For non-inline output, the pipeline should not return any results however getNext() still
+ // needs to be called once to ensure documents are written to the output collection.
+ invariant(!runnablePipeline->getNext());
- if (inMemory)
- map_reduce_output_format::appendInlineResponse(exhaustPipelineIntoBSONArray(pipe),
- parsedMr.getVerbose().get_value_or(false),
- false,
- &result);
- else
map_reduce_output_format::appendOutResponse(
parsedMr.getOutOptions().getDatabaseName(),
parsedMr.getOutOptions().getCollectionName(),
boost::get_optional_value_or(parsedMr.getVerbose(), false),
false,
&result);
+ }
return true;
}
std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx) {
+
// TODO: It would be good to figure out what kind of errors this would produce in the Status.
// It would be better not to produce something incomprehensible out of an internal translation.
return uassertStatusOK(Pipeline::create(