summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_map_reduce_agg.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_map_reduce_agg.cpp')
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp197
1 files changed, 163 insertions, 34 deletions
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index 9375d1c2176..2c2012c3af5 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -31,56 +31,185 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/connpool.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/map_reduce_agg.h"
#include "mongo/db/commands/map_reduce_gen.h"
+#include "mongo/db/commands/mr_common.h"
+#include "mongo/db/pipeline/mongos_process_interface.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/map_reduce_output_format.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/commands/cluster_map_reduce_agg.h"
namespace mongo {
+namespace {
-// Exhaust the cursor from the aggregation response and extract results and statistics.
-std::vector<BSONObj> getAllAggregationResults(OperationContext* opCtx,
- const std::string& dbname,
- CursorResponse& response) {
- CursorId cursorId = response.getCursorId();
- auto fullBatch = response.releaseBatch();
- while (cursorId != 0) {
- GetMoreRequest request(
- response.getNSS(), cursorId, boost::none, boost::none, boost::none, boost::none);
- BSONObj getMoreResponse = CommandHelpers::runCommandDirectly(
- opCtx, OpMsgRequest::fromDBAndBody(dbname, request.toBSON()));
- auto getMoreCursorResponse = CursorResponse::parseFromBSONThrowing(getMoreResponse);
- auto nextBatch = getMoreCursorResponse.releaseBatch();
- fullBatch.insert(fullBatch.end(), nextBatch.begin(), nextBatch.end());
- cursorId = getMoreCursorResponse.getCursorId();
+auto makeExpressionContext(OperationContext* opCtx,
+ const MapReduce& parsedMr,
+ boost::optional<CachedCollectionRoutingInfo> routingInfo) {
+ // Populate the collection UUID and the appropriate collation to use.
+ auto nss = parsedMr.getNamespace();
+ auto [collationObj, uuid] = sharded_agg_helpers::getCollationAndUUID(
+ routingInfo, nss, parsedMr.getCollation().get_value_or(BSONObj()));
+
+ std::unique_ptr<CollatorInterface> resolvedCollator;
+ if (!collationObj.isEmpty()) {
+ // This will be null if attempting to build an interface for the simple collator.
+ resolvedCollator = uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collationObj));
+ }
+
+ // Resolve involved namespaces.
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
+ if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) {
+ auto outNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName()
+ ? *parsedMr.getOutOptions().getDatabaseName()
+ : parsedMr.getNamespace().db(),
+ parsedMr.getOutOptions().getCollectionName()};
+ resolvedNamespaces.try_emplace(outNss.coll(), outNss, std::vector<BSONObj>{});
}
- return fullBatch;
+
+ auto expCtx = make_intrusive<ExpressionContext>(
+ opCtx,
+ boost::none, // explain
+ false, // fromMongos
+ false, // needsmerge
+ true, // allowDiskUse
+ parsedMr.getBypassDocumentValidation().get_value_or(false),
+ nss,
+ collationObj,
+ boost::none, // runtimeConstants
+ std::move(resolvedCollator),
+ std::make_shared<MongoSInterface>(),
+ std::move(resolvedNamespaces),
+ boost::none); // uuid
+ expCtx->inMongos = true;
+ return expCtx;
}
+Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipeline* pipeline) {
+ MutableDocument translatedCmd;
+
+ translatedCmd["aggregate"] = Value(parsedMr.getNamespace().coll());
+ translatedCmd["pipeline"] = Value(pipeline->serialize());
+ translatedCmd["cursor"] = Value(Document{{"batchSize", std::numeric_limits<long long>::max()}});
+ translatedCmd["allowDiskUse"] = Value(true);
+ translatedCmd["fromMongos"] = Value(true);
+
+ // Append generic command options.
+ for (const auto& elem : CommandHelpers::appendPassthroughFields(originalCmd, BSONObj())) {
+ translatedCmd[elem.fieldNameStringData()] = Value(elem);
+ }
+ return translatedCmd.freeze();
+}
+
+} // namespace
+
bool runAggregationMapReduce(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmd,
std::string& errmsg,
BSONObjBuilder& result) {
- // Pretend we have built the appropriate pipeline and aggregation request.
- auto mrRequest = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
- const BSONObj aggRequest =
- fromjson(str::stream() << "{aggregate: '" << mrRequest.getNamespace().coll()
- << "', pipeline: [ { $group: { _id: { user: \"$user\" },"
- << "count: { $sum: 1 } } } ], cursor: {}}");
- BSONObj aggResult = CommandHelpers::runCommandDirectly(
- opCtx, OpMsgRequest::fromDBAndBody(dbname, std::move(aggRequest)));
-
- bool inMemory = mrRequest.getOutOptions().getOutputType() == OutputType::InMemory;
- std::string outColl = mrRequest.getOutOptions().getCollectionName();
- // Either inline response specified or we have an output collection.
- invariant(inMemory ^ !outColl.empty());
-
- auto cursorResponse = CursorResponse::parseFromBSONThrowing(aggResult);
- auto completeBatch = getAllAggregationResults(opCtx, dbname, cursorResponse);
- [[maybe_unused]] CursorResponse completeCursor(
- cursorResponse.getNSS(), cursorResponse.getCursorId(), std::move(completeBatch));
+ auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
+ stdx::unordered_set<NamespaceString> involvedNamespaces{parsedMr.getNamespace()};
+ auto resolvedOutNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName()
+ ? *parsedMr.getOutOptions().getDatabaseName()
+ : parsedMr.getNamespace().db(),
+ parsedMr.getOutOptions().getCollectionName()};
+
+ if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) {
+ involvedNamespaces.insert(resolvedOutNss);
+ }
+
+ const auto pipelineBuilder = [&](boost::optional<CachedCollectionRoutingInfo> routingInfo) {
+ return map_reduce_common::translateFromMR(
+ parsedMr, makeExpressionContext(opCtx, parsedMr, routingInfo));
+ };
+
+ auto namespaces =
+ ClusterAggregate::Namespaces{parsedMr.getNamespace(), parsedMr.getNamespace()};
+
+ // Auth has already been checked for the original mapReduce command, no need to recheck here.
+ PrivilegeVector privileges;
+
+ // This holds the raw results from the aggregation, which will be reformatted to match the
+ // expected mapReduce output.
+ BSONObjBuilder tempResults;
+
+ auto targeter = uassertStatusOK(
+ sharded_agg_helpers::AggregationTargeter::make(opCtx,
+ parsedMr.getNamespace(),
+ pipelineBuilder,
+ involvedNamespaces,
+ false, // hasChangeStream
+ true)); // allowedToPassthrough
+ switch (targeter.policy) {
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kPassthrough: {
+ // For the passthrough case, the targeter will not build a pipeline since its not needed
+ // in the normal aggregation path. For this translation, though, we need to build the
+ // pipeline to serialize and send to the primary shard.
+ auto serialized =
+ serializeToCommand(cmd, parsedMr, pipelineBuilder(targeter.routingInfo).get());
+ uassertStatusOK(
+ sharded_agg_helpers::runPipelineOnPrimaryShard(opCtx,
+ namespaces,
+ targeter.routingInfo->db(),
+ boost::none, // explain
+ std::move(serialized),
+ privileges,
+ &tempResults));
+ break;
+ }
+
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kMongosRequired: {
+ // Pipelines generated from mapReduce should never be required to run on mongos.
+ uasserted(31291, "Internal error during mapReduce translation");
+ break;
+ }
+
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kAnyShard: {
+ auto serialized = serializeToCommand(cmd, parsedMr, targeter.pipeline.get());
+ uassertStatusOK(
+ sharded_agg_helpers::dispatchPipelineAndMerge(opCtx,
+ std::move(targeter),
+ std::move(serialized),
+ std::numeric_limits<long long>::max(),
+ namespaces,
+ privileges,
+ &tempResults,
+ false)); // hasChangeStream
+ break;
+ }
+ }
+
+ auto aggResults = tempResults.done();
+ if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) {
+ auto exhaustedResults = [&]() {
+ BSONArrayBuilder bab;
+ for (auto&& elem : aggResults["cursor"]["firstBatch"].Obj())
+ bab.append(elem.embeddedObject());
+ return bab.arr();
+ }();
+ map_reduce_output_format::appendInlineResponse(std::move(exhaustedResults),
+ parsedMr.getVerbose().get_value_or(false),
+ true, // inMongos
+ &result);
+ } else {
+ map_reduce_output_format::appendOutResponse(
+ parsedMr.getOutOptions().getDatabaseName(),
+ parsedMr.getOutOptions().getCollectionName(),
+ boost::get_optional_value_or(parsedMr.getVerbose(), false),
+ true, // inMongos
+ &result);
+ }
return true;
}