diff options
author | Nicholas Zolnierz <nicholas.zolnierz@mongodb.com> | 2019-09-13 15:55:58 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-13 15:55:58 +0000 |
commit | f0c5f3000d83f806272c010002f8c64513a09330 (patch) | |
tree | c5da861e0563ddc10b75d885629958b7b998ca50 /src/mongo/db/commands/map_reduce_agg.cpp | |
parent | b5c38b96c50e13be68a5a6968b441a879c0a3c53 (diff) | |
download | mongo-f0c5f3000d83f806272c010002f8c64513a09330.tar.gz |
SERVER-42903 Enable execution of translated M/R pipeline
Diffstat (limited to 'src/mongo/db/commands/map_reduce_agg.cpp')
-rw-r--r-- | src/mongo/db/commands/map_reduce_agg.cpp | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/src/mongo/db/commands/map_reduce_agg.cpp b/src/mongo/db/commands/map_reduce_agg.cpp index a1866c9372c..774237627fa 100644 --- a/src/mongo/db/commands/map_reduce_agg.cpp +++ b/src/mongo/db/commands/map_reduce_agg.cpp @@ -241,28 +241,38 @@ bool runAggregationMapReduce(OperationContext* opCtx, }; auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); - bool inMemory = parsedMr.getOutOptions().getOutputType() == OutputType::InMemory; + auto expCtx = makeExpressionContext(opCtx, parsedMr); + auto runnablePipeline = [&]() { + auto pipeline = translateFromMR(parsedMr, expCtx); + return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( + expCtx, pipeline.release()); + }(); - auto pipe = translateFromMR(parsedMr, makeExpressionContext(opCtx, parsedMr)); + if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) { + map_reduce_output_format::appendInlineResponse( + exhaustPipelineIntoBSONArray(runnablePipeline), + boost::get_optional_value_or(parsedMr.getVerbose(), false), + false, + &result); + } else { + // For non-inline output, the pipeline should not return any results however getNext() still + // needs to be called once to ensure documents are written to the output collection. + invariant(!runnablePipeline->getNext()); - if (inMemory) - map_reduce_output_format::appendInlineResponse(exhaustPipelineIntoBSONArray(pipe), - parsedMr.getVerbose().get_value_or(false), - false, - &result); - else map_reduce_output_format::appendOutResponse( parsedMr.getOutOptions().getDatabaseName(), parsedMr.getOutOptions().getCollectionName(), boost::get_optional_value_or(parsedMr.getVerbose(), false), false, &result); + } return true; } std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR( MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx) { + // TODO: It would be good to figure out what kind of errors this would produce in the Status. // It would be better not to produce something incomprehensible out of an internal translation. return uassertStatusOK(Pipeline::create( |