diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/map_reduce_agg.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/commands/map_reduce_agg.h | 5 | ||||
-rw-r--r-- | src/mongo/db/commands/map_reduce_command.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/commands/map_reduce_command_base.h | 29 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_writer.h | 88 | ||||
-rw-r--r-- | src/mongo/db/query/map_reduce_output_format.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/query/map_reduce_output_format.h | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_agg.cpp | 35 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_agg.h | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_cmd.cpp | 9 | ||||
-rw-r--r-- | src/mongo/shell/explainable.js | 7 |
11 files changed, 163 insertions, 84 deletions
diff --git a/src/mongo/db/commands/map_reduce_agg.cpp b/src/mongo/db/commands/map_reduce_agg.cpp index f4c61322c03..214c930046e 100644 --- a/src/mongo/db/commands/map_reduce_agg.cpp +++ b/src/mongo/db/commands/map_reduce_agg.cpp @@ -49,13 +49,16 @@ #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/query/explain_common.h" #include "mongo/db/query/map_reduce_output_format.h" namespace mongo::map_reduce_agg { namespace { -auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) { +auto makeExpressionContext(OperationContext* opCtx, + const MapReduce& parsedMr, + boost::optional<ExplainOptions::Verbosity> verbosity) { // AutoGetCollectionForReadCommand will throw if the sharding version for this connection is // out of date. AutoGetCollectionForReadCommand ctx( @@ -81,10 +84,10 @@ auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) { // the $group stage of the translated pipeline to spill to disk. auto expCtx = make_intrusive<ExpressionContext>( opCtx, - boost::none, // explain - false, // fromMongos - false, // needsmerge - true, // allowDiskUse + verbosity, + false, // fromMongos + false, // needsmerge + true, // allowDiskUse parsedMr.getBypassDocumentValidation().get_value_or(false), parsedMr.getNamespace(), runtimeConstants, @@ -99,10 +102,9 @@ auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) { } // namespace bool runAggregationMapReduce(OperationContext* opCtx, - const std::string& dbname, const BSONObj& cmd, - std::string& errmsg, - BSONObjBuilder& result) { + BSONObjBuilder& result, + boost::optional<ExplainOptions::Verbosity> verbosity) { auto exhaustPipelineIntoBSONArray = [](auto&& pipeline) { BSONArrayBuilder bab; while (auto&& doc = pipeline->getNext()) @@ -113,7 +115,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, Timer cmdTimer; auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); - auto expCtx = makeExpressionContext(opCtx, parsedMr); + auto expCtx = makeExpressionContext(opCtx, parsedMr, verbosity); auto runnablePipeline = [&]() { auto pipeline = map_reduce_common::translateFromMR(parsedMr, expCtx); return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( @@ -130,21 +132,27 @@ bool runAggregationMapReduce(OperationContext* opCtx, try { auto resultArray = exhaustPipelineIntoBSONArray(runnablePipeline); + if (expCtx->explain) { + result << "stages" << Value(runnablePipeline->writeExplainOps(*(expCtx->explain))); + explain_common::generateServerInfo(&result); + } + PlanSummaryStats planSummaryStats; PipelineD::getPlanSummaryStats(runnablePipeline.get(), &planSummaryStats); CurOp::get(opCtx)->debug().setPlanSummaryMetrics(planSummaryStats); - - if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) { - map_reduce_output_format::appendInlineResponse(std::move(resultArray), &result); - } else { - // For output to collection, pipeline execution should not return any results. - invariant(resultArray.isEmpty()); - - map_reduce_output_format::appendOutResponse( - parsedMr.getOutOptions().getDatabaseName(), - parsedMr.getOutOptions().getCollectionName(), - &result); + if (!expCtx->explain) { + if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) { + map_reduce_output_format::appendInlineResponse(std::move(resultArray), &result); + } else { + // For output to collection, pipeline execution should not return any results. + invariant(resultArray.isEmpty()); + + map_reduce_output_format::appendOutResponse( + parsedMr.getOutOptions().getDatabaseName(), + parsedMr.getOutOptions().getCollectionName(), + &result); + } } // The aggregation pipeline may change the namespace of the curop and we need to set it back diff --git a/src/mongo/db/commands/map_reduce_agg.h b/src/mongo/db/commands/map_reduce_agg.h index 50752250a3a..e8448de66c7 100644 --- a/src/mongo/db/commands/map_reduce_agg.h +++ b/src/mongo/db/commands/map_reduce_agg.h @@ -45,9 +45,8 @@ namespace mongo::map_reduce_agg { * Executes a mapReduce command against a replica set/standalone. */ bool runAggregationMapReduce(OperationContext* opCtx, - const std::string& dbname, const BSONObj& cmd, - std::string& errmsg, - BSONObjBuilder& result); + BSONObjBuilder& result, + boost::optional<ExplainOptions::Verbosity> verbosity); } // namespace mongo::map_reduce_agg diff --git a/src/mongo/db/commands/map_reduce_command.cpp b/src/mongo/db/commands/map_reduce_command.cpp index 8d0e9bf70ac..c605affd325 100644 --- a/src/mongo/db/commands/map_reduce_command.cpp +++ b/src/mongo/db/commands/map_reduce_command.cpp @@ -62,6 +62,13 @@ public: return FindCommon::kInitReplyBufferSize; } + void _explainImpl(OperationContext* opCtx, + const BSONObj& cmd, + BSONObjBuilder& result, + boost::optional<ExplainOptions::Verbosity> verbosity) const override { + map_reduce_agg::runAggregationMapReduce(opCtx, cmd, result, verbosity); + } + private: bool _runImpl(OperationContext* opCtx, const std::string& dbname, @@ -74,7 +81,7 @@ private: if (internalQueryUseAggMapReduce.load() && serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44) { - return map_reduce_agg::runAggregationMapReduce(opCtx, dbname, cmd, errmsg, result); + return map_reduce_agg::runAggregationMapReduce(opCtx, cmd, result, boost::none); } return mr::runMapReduce(opCtx, dbname, cmd, errmsg, result); } diff --git a/src/mongo/db/commands/map_reduce_command_base.h b/src/mongo/db/commands/map_reduce_command_base.h index 4de4676e02d..5166cdec327 100644 --- a/src/mongo/db/commands/map_reduce_command_base.h +++ b/src/mongo/db/commands/map_reduce_command_base.h @@ -65,12 +65,39 @@ public: map_reduce_common::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out); } + virtual void _explainImpl(OperationContext* opCtx, + const BSONObj& cmd, + BSONObjBuilder& result, + boost::optional<ExplainOptions::Verbosity> verbosity) const = 0; + + Status explain(OperationContext* opCtx, + const OpMsgRequest& request, + ExplainOptions::Verbosity verbosity, + rpc::ReplyBuilderInterface* result) const override { + if (internalQueryUseAggMapReduce.load() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44) { + auto builder = result->getBodyBuilder(); + auto explain = boost::make_optional(verbosity); + try { + _explainImpl(opCtx, request.body, builder, explain); + } catch (...) { + return exceptionToStatus(); + } + return Status::OK(); + } else { + return Status( + ErrorCodes::IllegalOperation, + "explain for mapReduce is not available prior to featureCompatibilityVersion 4.4"); + } + MONGO_UNREACHABLE; + } + bool errmsgRun(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmd, std::string& errmsg, BSONObjBuilder& result) { - return _runImpl(opCtx, dbname, cmd, errmsg, result); } diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index dd5dd0c37a5..5110582faeb 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -170,54 +170,56 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() { return GetNextResult::makeEOF(); } - if (!_initialized) { - // Explain should never try to actually execute any writes. We only ever expect - // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain - // modes. This assertion should not be triggered for 'queryPlanner' explain, which - // is perfectly legal. - uassert(51029, - "explain of {} is not allowed with verbosity {}"_format( - getSourceName(), ExplainOptions::verbosityString(*pExpCtx->explain)), - !pExpCtx->explain); - initialize(); - _initialized = true; - } - - BatchedObjects batch; - int bufferedBytes = 0; - - auto nextInput = pSource->getNext(); - for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { - waitWhileFailPointEnabled(); - - auto doc = nextInput.releaseDocument(); - auto [obj, objSize] = makeBatchObject(std::move(doc)); + // Ignore writes and exhaust input if we are in explain mode. + if (pExpCtx->explain) { + auto nextInput = pSource->getNext(); + for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { + } + _done = nextInput.getStatus() == GetNextResult::ReturnStatus::kEOF; + return nextInput; + } else { + if (!_initialized) { + initialize(); + _initialized = true; + } - bufferedBytes += objSize; - if (!batch.empty() && - (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) { + BatchedObjects batch; + int bufferedBytes = 0; + + auto nextInput = pSource->getNext(); + for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { + waitWhileFailPointEnabled(); + + auto doc = nextInput.releaseDocument(); + auto [obj, objSize] = makeBatchObject(std::move(doc)); + + bufferedBytes += objSize; + if (!batch.empty() && + (bufferedBytes > BSONObjMaxUserSize || + batch.size() >= write_ops::kMaxWriteBatchSize)) { + spill(std::move(batch)); + batch.clear(); + bufferedBytes = objSize; + } + batch.push_back(obj); + } + if (!batch.empty()) { spill(std::move(batch)); batch.clear(); - bufferedBytes = objSize; } - batch.push_back(obj); - } - if (!batch.empty()) { - spill(std::move(batch)); - batch.clear(); - } - switch (nextInput.getStatus()) { - case GetNextResult::ReturnStatus::kAdvanced: { - MONGO_UNREACHABLE; // We consumed all advances above. - } - case GetNextResult::ReturnStatus::kPauseExecution: { - return nextInput; // Propagate the pause. - } - case GetNextResult::ReturnStatus::kEOF: { - _done = true; - finalize(); - return nextInput; + switch (nextInput.getStatus()) { + case GetNextResult::ReturnStatus::kAdvanced: { + MONGO_UNREACHABLE; // We consumed all advances above. + } + case GetNextResult::ReturnStatus::kPauseExecution: { + return nextInput; // Propagate the pause. + } + case GetNextResult::ReturnStatus::kEOF: { + _done = true; + finalize(); + return nextInput; + } } } MONGO_UNREACHABLE; diff --git a/src/mongo/db/query/map_reduce_output_format.cpp b/src/mongo/db/query/map_reduce_output_format.cpp index cafbe483319..9c8b6da9c9d 100644 --- a/src/mongo/db/query/map_reduce_output_format.cpp +++ b/src/mongo/db/query/map_reduce_output_format.cpp @@ -46,4 +46,11 @@ void appendOutResponse(boost::optional<std::string> outDb, resultBuilder->append("result", outColl); } } + +void appendExplainResponse(BSONObjBuilder& resultBuilder, BSONObj& aggResults) { + for (const auto& elem : aggResults) { + resultBuilder << elem.fieldNameStringData() << elem; + } +} + } // namespace mongo::map_reduce_output_format diff --git a/src/mongo/db/query/map_reduce_output_format.h b/src/mongo/db/query/map_reduce_output_format.h index 5189323d80e..72f1dbd2ecf 100644 --- a/src/mongo/db/query/map_reduce_output_format.h +++ b/src/mongo/db/query/map_reduce_output_format.h @@ -49,4 +49,9 @@ void appendOutResponse(boost::optional<std::string> outDb, std::string outColl, BSONObjBuilder* resultBuilder); +/** + * Appends a mapReduce explain command response to 'resultBuilder'. + */ +void appendExplainResponse(BSONObjBuilder& resultBuilder, BSONObj& aggResults); + } // namespace mongo::map_reduce_output_format diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index a7f12be3980..023d4d5ac2a 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/explain_common.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/map_reduce_output_format.h" #include "mongo/s/catalog_cache.h" @@ -55,7 +56,8 @@ namespace { auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr, - boost::optional<CachedCollectionRoutingInfo> routingInfo) { + boost::optional<CachedCollectionRoutingInfo> routingInfo, + boost::optional<ExplainOptions::Verbosity> verbosity) { // Populate the collection UUID and the appropriate collation to use. auto nss = parsedMr.getNamespace(); auto [collationObj, uuid] = sharded_agg_helpers::getCollationAndUUID( @@ -84,10 +86,10 @@ auto makeExpressionContext(OperationContext* opCtx, } auto expCtx = make_intrusive<ExpressionContext>( opCtx, - boost::none, // explain - false, // fromMongos - false, // needsmerge - true, // allowDiskUse + verbosity, + false, // fromMongos + false, // needsmerge + true, // allowDiskUse parsedMr.getBypassDocumentValidation().get_value_or(false), nss, runtimeConstants, @@ -121,10 +123,9 @@ Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipe } // namespace bool runAggregationMapReduce(OperationContext* opCtx, - const std::string& dbname, const BSONObj& cmd, - std::string& errmsg, - BSONObjBuilder& result) { + BSONObjBuilder& result, + boost::optional<ExplainOptions::Verbosity> verbosity) { auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); stdx::unordered_set<NamespaceString> involvedNamespaces{parsedMr.getNamespace()}; auto hasOutDB = parsedMr.getOutOptions().getDatabaseName(); @@ -137,7 +138,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, auto routingInfo = uassertStatusOK( sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, parsedMr.getNamespace())); - auto expCtx = makeExpressionContext(opCtx, parsedMr, routingInfo); + auto expCtx = makeExpressionContext(opCtx, parsedMr, routingInfo, verbosity); const auto pipelineBuilder = [&]() { return map_reduce_common::translateFromMR(parsedMr, expCtx); @@ -171,7 +172,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, sharded_agg_helpers::runPipelineOnPrimaryShard(expCtx, namespaces, targeter.routingInfo->db(), - boost::none, // explain + verbosity, std::move(serialized), privileges, &tempResults)); @@ -185,7 +186,13 @@ bool runAggregationMapReduce(OperationContext* opCtx, } case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kAnyShard: { + if (verbosity) { + explain_common::generateServerInfo(&result); + } auto serialized = serializeToCommand(cmd, parsedMr, targeter.pipeline.get()); + // When running explain, we don't explicitly pass the specified verbosity here because + // each stage of the constructed pipeline is aware of said verbosity through a pointer + // to the constructed ExpressionContext. uassertStatusOK( sharded_agg_helpers::dispatchPipelineAndMerge(opCtx, std::move(targeter), @@ -200,8 +207,12 @@ bool runAggregationMapReduce(OperationContext* opCtx, } auto aggResults = tempResults.done(); - // TODO SERVER-43290: Add support for cluster MapReduce statistics. - if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) { + + // If explain() was run, we simply append the output to result. + if (verbosity) { + map_reduce_output_format::appendExplainResponse(result, aggResults); + } else if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) { + // TODO SERVER-43290: Add support for cluster MapReduce statistics. // If the inline results could not fit into a single batch, then kill the remote // operation(s) and return an error since mapReduce does not support a cursor-style // response. diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.h b/src/mongo/s/commands/cluster_map_reduce_agg.h index 71d0f9c1b53..8b57e1aad3f 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.h +++ b/src/mongo/s/commands/cluster_map_reduce_agg.h @@ -34,9 +34,8 @@ namespace mongo { bool runAggregationMapReduce(OperationContext* opCtx, - const std::string& dbname, const BSONObj& cmd, - std::string& errmsg, - BSONObjBuilder& result); + BSONObjBuilder& result, + boost::optional<ExplainOptions::Verbosity> verbosity); } // namespace mongo diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index f3f0be6e384..bc9af46c0bb 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -49,13 +49,20 @@ public: return AllowedOnSecondary::kAlways; } + void _explainImpl(OperationContext* opCtx, + const BSONObj& cmd, + BSONObjBuilder& result, + boost::optional<ExplainOptions::Verbosity> verbosity) const override { + runAggregationMapReduce(opCtx, cmd, result, verbosity); + } + bool _runImpl(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmd, std::string& errmsg, BSONObjBuilder& result) final { if (internalQueryUseAggMapReduce.load()) { - return runAggregationMapReduce(opCtx, dbname, cmd, errmsg, result); + return runAggregationMapReduce(opCtx, cmd, result, boost::none); } return runMapReduce(opCtx, dbname, applyReadWriteConcern(opCtx, this, cmd), errmsg, result); } diff --git a/src/mongo/shell/explainable.js b/src/mongo/shell/explainable.js index 4f32af22221..5a3e94f4791 100644 --- a/src/mongo/shell/explainable.js +++ b/src/mongo/shell/explainable.js @@ -67,6 +67,7 @@ var Explainable = (function() { print("\t.distinct(...) - explain a distinct operation"); print("\t.find(...) - get an explainable query"); print("\t.findAndModify(...) - explain a findAndModify operation"); + print("\t.mapReduce(...) - explain a mapReduce operation"); print("\t.remove(...) - explain a remove operation"); print("\t.update(...) - explain an update operation"); print("Explainable collection methods"); @@ -223,6 +224,12 @@ var Explainable = (function() { var explainResult = this._collection.runCommand(explainCmd); return throwOrReturn(explainResult); }; + + this.mapReduce = function(mr) { + var explainCmd = {"explain": mr, "verbosity": this._verbosity}; + var explainResult = this._collection.runCommand(explainCmd); + return throwOrReturn(explainResult); + }; } // |