summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-11-22 21:27:40 +0000
committerevergreen <evergreen@mongodb.com>2019-11-22 21:27:40 +0000
commit9f1e78ea0a51bae65b557db53eee1fb6489f31c4 (patch)
tree77b2a53955e4a0d2b5941d49c1381246300d6444 /src
parent44497e0e5c0388f9ef790d9c95737816a377d29d (diff)
downloadmongo-9f1e78ea0a51bae65b557db53eee1fb6489f31c4.tar.gz
SERVER-44583 M/R Agg: Add explain support for mapReduce command
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/map_reduce_agg.cpp48
-rw-r--r--src/mongo/db/commands/map_reduce_agg.h5
-rw-r--r--src/mongo/db/commands/map_reduce_command.cpp9
-rw-r--r--src/mongo/db/commands/map_reduce_command_base.h29
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h88
-rw-r--r--src/mongo/db/query/map_reduce_output_format.cpp7
-rw-r--r--src/mongo/db/query/map_reduce_output_format.h5
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp35
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.h5
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp9
-rw-r--r--src/mongo/shell/explainable.js7
11 files changed, 163 insertions, 84 deletions
diff --git a/src/mongo/db/commands/map_reduce_agg.cpp b/src/mongo/db/commands/map_reduce_agg.cpp
index f4c61322c03..214c930046e 100644
--- a/src/mongo/db/commands/map_reduce_agg.cpp
+++ b/src/mongo/db/commands/map_reduce_agg.cpp
@@ -49,13 +49,16 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/query/explain_common.h"
#include "mongo/db/query/map_reduce_output_format.h"
namespace mongo::map_reduce_agg {
namespace {
-auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) {
+auto makeExpressionContext(OperationContext* opCtx,
+ const MapReduce& parsedMr,
+ boost::optional<ExplainOptions::Verbosity> verbosity) {
// AutoGetCollectionForReadCommand will throw if the sharding version for this connection is
// out of date.
AutoGetCollectionForReadCommand ctx(
@@ -81,10 +84,10 @@ auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) {
// the $group stage of the translated pipeline to spill to disk.
auto expCtx = make_intrusive<ExpressionContext>(
opCtx,
- boost::none, // explain
- false, // fromMongos
- false, // needsmerge
- true, // allowDiskUse
+ verbosity,
+ false, // fromMongos
+ false, // needsmerge
+ true, // allowDiskUse
parsedMr.getBypassDocumentValidation().get_value_or(false),
parsedMr.getNamespace(),
runtimeConstants,
@@ -99,10 +102,9 @@ auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) {
} // namespace
bool runAggregationMapReduce(OperationContext* opCtx,
- const std::string& dbname,
const BSONObj& cmd,
- std::string& errmsg,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result,
+ boost::optional<ExplainOptions::Verbosity> verbosity) {
auto exhaustPipelineIntoBSONArray = [](auto&& pipeline) {
BSONArrayBuilder bab;
while (auto&& doc = pipeline->getNext())
@@ -113,7 +115,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
Timer cmdTimer;
auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
- auto expCtx = makeExpressionContext(opCtx, parsedMr);
+ auto expCtx = makeExpressionContext(opCtx, parsedMr, verbosity);
auto runnablePipeline = [&]() {
auto pipeline = map_reduce_common::translateFromMR(parsedMr, expCtx);
return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
@@ -130,21 +132,27 @@ bool runAggregationMapReduce(OperationContext* opCtx,
try {
auto resultArray = exhaustPipelineIntoBSONArray(runnablePipeline);
+ if (expCtx->explain) {
+ result << "stages" << Value(runnablePipeline->writeExplainOps(*(expCtx->explain)));
+ explain_common::generateServerInfo(&result);
+ }
+
PlanSummaryStats planSummaryStats;
PipelineD::getPlanSummaryStats(runnablePipeline.get(), &planSummaryStats);
CurOp::get(opCtx)->debug().setPlanSummaryMetrics(planSummaryStats);
-
- if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) {
- map_reduce_output_format::appendInlineResponse(std::move(resultArray), &result);
- } else {
- // For output to collection, pipeline execution should not return any results.
- invariant(resultArray.isEmpty());
-
- map_reduce_output_format::appendOutResponse(
- parsedMr.getOutOptions().getDatabaseName(),
- parsedMr.getOutOptions().getCollectionName(),
- &result);
+ if (!expCtx->explain) {
+ if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) {
+ map_reduce_output_format::appendInlineResponse(std::move(resultArray), &result);
+ } else {
+ // For output to collection, pipeline execution should not return any results.
+ invariant(resultArray.isEmpty());
+
+ map_reduce_output_format::appendOutResponse(
+ parsedMr.getOutOptions().getDatabaseName(),
+ parsedMr.getOutOptions().getCollectionName(),
+ &result);
+ }
}
// The aggregation pipeline may change the namespace of the curop and we need to set it back
diff --git a/src/mongo/db/commands/map_reduce_agg.h b/src/mongo/db/commands/map_reduce_agg.h
index 50752250a3a..e8448de66c7 100644
--- a/src/mongo/db/commands/map_reduce_agg.h
+++ b/src/mongo/db/commands/map_reduce_agg.h
@@ -45,9 +45,8 @@ namespace mongo::map_reduce_agg {
* Executes a mapReduce command against a replica set/standalone.
*/
bool runAggregationMapReduce(OperationContext* opCtx,
- const std::string& dbname,
const BSONObj& cmd,
- std::string& errmsg,
- BSONObjBuilder& result);
+ BSONObjBuilder& result,
+ boost::optional<ExplainOptions::Verbosity> verbosity);
} // namespace mongo::map_reduce_agg
diff --git a/src/mongo/db/commands/map_reduce_command.cpp b/src/mongo/db/commands/map_reduce_command.cpp
index 8d0e9bf70ac..c605affd325 100644
--- a/src/mongo/db/commands/map_reduce_command.cpp
+++ b/src/mongo/db/commands/map_reduce_command.cpp
@@ -62,6 +62,13 @@ public:
return FindCommon::kInitReplyBufferSize;
}
+ void _explainImpl(OperationContext* opCtx,
+ const BSONObj& cmd,
+ BSONObjBuilder& result,
+ boost::optional<ExplainOptions::Verbosity> verbosity) const override {
+ map_reduce_agg::runAggregationMapReduce(opCtx, cmd, result, verbosity);
+ }
+
private:
bool _runImpl(OperationContext* opCtx,
const std::string& dbname,
@@ -74,7 +81,7 @@ private:
if (internalQueryUseAggMapReduce.load() &&
serverGlobalParams.featureCompatibility.getVersion() ==
ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44) {
- return map_reduce_agg::runAggregationMapReduce(opCtx, dbname, cmd, errmsg, result);
+ return map_reduce_agg::runAggregationMapReduce(opCtx, cmd, result, boost::none);
}
return mr::runMapReduce(opCtx, dbname, cmd, errmsg, result);
}
diff --git a/src/mongo/db/commands/map_reduce_command_base.h b/src/mongo/db/commands/map_reduce_command_base.h
index 4de4676e02d..5166cdec327 100644
--- a/src/mongo/db/commands/map_reduce_command_base.h
+++ b/src/mongo/db/commands/map_reduce_command_base.h
@@ -65,12 +65,39 @@ public:
map_reduce_common::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
}
+ virtual void _explainImpl(OperationContext* opCtx,
+ const BSONObj& cmd,
+ BSONObjBuilder& result,
+ boost::optional<ExplainOptions::Verbosity> verbosity) const = 0;
+
+ Status explain(OperationContext* opCtx,
+ const OpMsgRequest& request,
+ ExplainOptions::Verbosity verbosity,
+ rpc::ReplyBuilderInterface* result) const override {
+ if (internalQueryUseAggMapReduce.load() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44) {
+ auto builder = result->getBodyBuilder();
+ auto explain = boost::make_optional(verbosity);
+ try {
+ _explainImpl(opCtx, request.body, builder, explain);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ return Status::OK();
+ } else {
+ return Status(
+ ErrorCodes::IllegalOperation,
+ "explain for mapReduce is not available prior to featureCompatibilityVersion 4.4");
+ }
+ MONGO_UNREACHABLE;
+ }
+
bool errmsgRun(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmd,
std::string& errmsg,
BSONObjBuilder& result) {
-
return _runImpl(opCtx, dbname, cmd, errmsg, result);
}
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index dd5dd0c37a5..5110582faeb 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -170,54 +170,56 @@ DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() {
return GetNextResult::makeEOF();
}
- if (!_initialized) {
- // Explain should never try to actually execute any writes. We only ever expect
- // getNext() to be called for the 'executionStats' and 'allPlansExecution' explain
- // modes. This assertion should not be triggered for 'queryPlanner' explain, which
- // is perfectly legal.
- uassert(51029,
- "explain of {} is not allowed with verbosity {}"_format(
- getSourceName(), ExplainOptions::verbosityString(*pExpCtx->explain)),
- !pExpCtx->explain);
- initialize();
- _initialized = true;
- }
-
- BatchedObjects batch;
- int bufferedBytes = 0;
-
- auto nextInput = pSource->getNext();
- for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
- waitWhileFailPointEnabled();
-
- auto doc = nextInput.releaseDocument();
- auto [obj, objSize] = makeBatchObject(std::move(doc));
+ // Ignore writes and exhaust input if we are in explain mode.
+ if (pExpCtx->explain) {
+ auto nextInput = pSource->getNext();
+ for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
+ }
+ _done = nextInput.getStatus() == GetNextResult::ReturnStatus::kEOF;
+ return nextInput;
+ } else {
+ if (!_initialized) {
+ initialize();
+ _initialized = true;
+ }
- bufferedBytes += objSize;
- if (!batch.empty() &&
- (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) {
+ BatchedObjects batch;
+ int bufferedBytes = 0;
+
+ auto nextInput = pSource->getNext();
+ for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
+ waitWhileFailPointEnabled();
+
+ auto doc = nextInput.releaseDocument();
+ auto [obj, objSize] = makeBatchObject(std::move(doc));
+
+ bufferedBytes += objSize;
+ if (!batch.empty() &&
+ (bufferedBytes > BSONObjMaxUserSize ||
+ batch.size() >= write_ops::kMaxWriteBatchSize)) {
+ spill(std::move(batch));
+ batch.clear();
+ bufferedBytes = objSize;
+ }
+ batch.push_back(obj);
+ }
+ if (!batch.empty()) {
spill(std::move(batch));
batch.clear();
- bufferedBytes = objSize;
}
- batch.push_back(obj);
- }
- if (!batch.empty()) {
- spill(std::move(batch));
- batch.clear();
- }
- switch (nextInput.getStatus()) {
- case GetNextResult::ReturnStatus::kAdvanced: {
- MONGO_UNREACHABLE; // We consumed all advances above.
- }
- case GetNextResult::ReturnStatus::kPauseExecution: {
- return nextInput; // Propagate the pause.
- }
- case GetNextResult::ReturnStatus::kEOF: {
- _done = true;
- finalize();
- return nextInput;
+ switch (nextInput.getStatus()) {
+ case GetNextResult::ReturnStatus::kAdvanced: {
+ MONGO_UNREACHABLE; // We consumed all advances above.
+ }
+ case GetNextResult::ReturnStatus::kPauseExecution: {
+ return nextInput; // Propagate the pause.
+ }
+ case GetNextResult::ReturnStatus::kEOF: {
+ _done = true;
+ finalize();
+ return nextInput;
+ }
}
}
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/query/map_reduce_output_format.cpp b/src/mongo/db/query/map_reduce_output_format.cpp
index cafbe483319..9c8b6da9c9d 100644
--- a/src/mongo/db/query/map_reduce_output_format.cpp
+++ b/src/mongo/db/query/map_reduce_output_format.cpp
@@ -46,4 +46,11 @@ void appendOutResponse(boost::optional<std::string> outDb,
resultBuilder->append("result", outColl);
}
}
+
+void appendExplainResponse(BSONObjBuilder& resultBuilder, BSONObj& aggResults) {
+ for (const auto& elem : aggResults) {
+ resultBuilder << elem.fieldNameStringData() << elem;
+ }
+}
+
} // namespace mongo::map_reduce_output_format
diff --git a/src/mongo/db/query/map_reduce_output_format.h b/src/mongo/db/query/map_reduce_output_format.h
index 5189323d80e..72f1dbd2ecf 100644
--- a/src/mongo/db/query/map_reduce_output_format.h
+++ b/src/mongo/db/query/map_reduce_output_format.h
@@ -49,4 +49,9 @@ void appendOutResponse(boost::optional<std::string> outDb,
std::string outColl,
BSONObjBuilder* resultBuilder);
+/**
+ * Appends a mapReduce explain command response to 'resultBuilder'.
+ */
+void appendExplainResponse(BSONObjBuilder& resultBuilder, BSONObj& aggResults);
+
} // namespace mongo::map_reduce_output_format
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index a7f12be3980..023d4d5ac2a 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/explain_common.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/map_reduce_output_format.h"
#include "mongo/s/catalog_cache.h"
@@ -55,7 +56,8 @@ namespace {
auto makeExpressionContext(OperationContext* opCtx,
const MapReduce& parsedMr,
- boost::optional<CachedCollectionRoutingInfo> routingInfo) {
+ boost::optional<CachedCollectionRoutingInfo> routingInfo,
+ boost::optional<ExplainOptions::Verbosity> verbosity) {
// Populate the collection UUID and the appropriate collation to use.
auto nss = parsedMr.getNamespace();
auto [collationObj, uuid] = sharded_agg_helpers::getCollationAndUUID(
@@ -84,10 +86,10 @@ auto makeExpressionContext(OperationContext* opCtx,
}
auto expCtx = make_intrusive<ExpressionContext>(
opCtx,
- boost::none, // explain
- false, // fromMongos
- false, // needsmerge
- true, // allowDiskUse
+ verbosity,
+ false, // fromMongos
+ false, // needsmerge
+ true, // allowDiskUse
parsedMr.getBypassDocumentValidation().get_value_or(false),
nss,
runtimeConstants,
@@ -121,10 +123,9 @@ Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipe
} // namespace
bool runAggregationMapReduce(OperationContext* opCtx,
- const std::string& dbname,
const BSONObj& cmd,
- std::string& errmsg,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result,
+ boost::optional<ExplainOptions::Verbosity> verbosity) {
auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
stdx::unordered_set<NamespaceString> involvedNamespaces{parsedMr.getNamespace()};
auto hasOutDB = parsedMr.getOutOptions().getDatabaseName();
@@ -137,7 +138,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
auto routingInfo = uassertStatusOK(
sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, parsedMr.getNamespace()));
- auto expCtx = makeExpressionContext(opCtx, parsedMr, routingInfo);
+ auto expCtx = makeExpressionContext(opCtx, parsedMr, routingInfo, verbosity);
const auto pipelineBuilder = [&]() {
return map_reduce_common::translateFromMR(parsedMr, expCtx);
@@ -171,7 +172,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
sharded_agg_helpers::runPipelineOnPrimaryShard(expCtx,
namespaces,
targeter.routingInfo->db(),
- boost::none, // explain
+ verbosity,
std::move(serialized),
privileges,
&tempResults));
@@ -185,7 +186,13 @@ bool runAggregationMapReduce(OperationContext* opCtx,
}
case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kAnyShard: {
+ if (verbosity) {
+ explain_common::generateServerInfo(&result);
+ }
auto serialized = serializeToCommand(cmd, parsedMr, targeter.pipeline.get());
+ // When running explain, we don't explicitly pass the specified verbosity here because
+ // each stage of the constructed pipeline is aware of said verbosity through a pointer
+ // to the constructed ExpressionContext.
uassertStatusOK(
sharded_agg_helpers::dispatchPipelineAndMerge(opCtx,
std::move(targeter),
@@ -200,8 +207,12 @@ bool runAggregationMapReduce(OperationContext* opCtx,
}
auto aggResults = tempResults.done();
- // TODO SERVER-43290: Add support for cluster MapReduce statistics.
- if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) {
+
+ // If explain() was run, we simply append the output to result.
+ if (verbosity) {
+ map_reduce_output_format::appendExplainResponse(result, aggResults);
+ } else if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) {
+ // TODO SERVER-43290: Add support for cluster MapReduce statistics.
// If the inline results could not fit into a single batch, then kill the remote
// operation(s) and return an error since mapReduce does not support a cursor-style
// response.
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.h b/src/mongo/s/commands/cluster_map_reduce_agg.h
index 71d0f9c1b53..8b57e1aad3f 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.h
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.h
@@ -34,9 +34,8 @@
namespace mongo {
bool runAggregationMapReduce(OperationContext* opCtx,
- const std::string& dbname,
const BSONObj& cmd,
- std::string& errmsg,
- BSONObjBuilder& result);
+ BSONObjBuilder& result,
+ boost::optional<ExplainOptions::Verbosity> verbosity);
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index f3f0be6e384..bc9af46c0bb 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -49,13 +49,20 @@ public:
return AllowedOnSecondary::kAlways;
}
+ void _explainImpl(OperationContext* opCtx,
+ const BSONObj& cmd,
+ BSONObjBuilder& result,
+ boost::optional<ExplainOptions::Verbosity> verbosity) const override {
+ runAggregationMapReduce(opCtx, cmd, result, verbosity);
+ }
+
bool _runImpl(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmd,
std::string& errmsg,
BSONObjBuilder& result) final {
if (internalQueryUseAggMapReduce.load()) {
- return runAggregationMapReduce(opCtx, dbname, cmd, errmsg, result);
+ return runAggregationMapReduce(opCtx, cmd, result, boost::none);
}
return runMapReduce(opCtx, dbname, applyReadWriteConcern(opCtx, this, cmd), errmsg, result);
}
diff --git a/src/mongo/shell/explainable.js b/src/mongo/shell/explainable.js
index 4f32af22221..5a3e94f4791 100644
--- a/src/mongo/shell/explainable.js
+++ b/src/mongo/shell/explainable.js
@@ -67,6 +67,7 @@ var Explainable = (function() {
print("\t.distinct(...) - explain a distinct operation");
print("\t.find(...) - get an explainable query");
print("\t.findAndModify(...) - explain a findAndModify operation");
+ print("\t.mapReduce(...) - explain a mapReduce operation");
print("\t.remove(...) - explain a remove operation");
print("\t.update(...) - explain an update operation");
print("Explainable collection methods");
@@ -223,6 +224,12 @@ var Explainable = (function() {
var explainResult = this._collection.runCommand(explainCmd);
return throwOrReturn(explainResult);
};
+
+ this.mapReduce = function(mr) {
+ var explainCmd = {"explain": mr, "verbosity": this._verbosity};
+ var explainResult = this._collection.runCommand(explainCmd);
+ return throwOrReturn(explainResult);
+ };
}
//