/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/s/query/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_merge.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/semantic_analysis.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_query_knobs_gen.h" #include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/document_source_update_on_add_shard.h" #include "mongo/s/query/owned_remote_cursor.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" #include "mongo/s/shard_id.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/transaction_router.h" namespace mongo { namespace cluster_aggregation_planner { namespace { /** * Moves everything before a splittable stage to the shards. If there are no splittable stages, * moves everything to the shards. * * It is not safe to call this optimization multiple times. * * Returns the sort specification if the input streams are sorted, and false otherwise. */ boost::optional findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) { while (!mergePipe->getSources().empty()) { boost::intrusive_ptr current = mergePipe->popFront(); // Check if this source is splittable. auto distributedPlanLogic = current->distributedPlanLogic(); if (!distributedPlanLogic) { // Move the source from the merger _sources to the shard _sources. shardPipe->push_back(current); continue; } // A source may not simultaneously be present on both sides of the split. invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); if (distributedPlanLogic->shardsStage) shardPipe->push_back(std::move(distributedPlanLogic->shardsStage)); if (distributedPlanLogic->mergingStage) mergePipe->addInitialSource(std::move(distributedPlanLogic->mergingStage)); return distributedPlanLogic->inputSortPattern; } return boost::none; } /** * If the final stage on shards is to unwind an array, move that stage to the merger. This cuts down * on network traffic and allows us to take advantage of reduced copying in unwind. */ void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) { while (!shardPipe->getSources().empty() && dynamic_cast(shardPipe->getSources().back().get())) { mergePipe->addInitialSource(shardPipe->popBack()); } } /** * Returns true if the final stage of the pipeline limits the number of documents it could output * (such as a $limit stage). * * This function is not meant to exhaustively catch every single case where a pipeline might have * some kind of limit. It's only here so that propagateDocLimitsToShards() can avoid adding an * obviously unnecessary $limit to a shard's pipeline. */ boost::optional getPipelineLimit(Pipeline* pipeline) { for (auto source_it = pipeline->getSources().rbegin(); source_it != pipeline->getSources().rend(); ++source_it) { const auto source = source_it->get(); auto limitStage = dynamic_cast(source); if (limitStage) { return limitStage->getLimit(); } auto sortStage = dynamic_cast(source); if (sortStage) { return (sortStage->getLimit() >= 0) ? boost::optional(sortStage->getLimit()) : boost::none; } auto cursorStage = dynamic_cast(source); if (cursorStage) { return (cursorStage->getLimit() >= 0) ? boost::optional(cursorStage->getLimit()) : boost::none; } // If this stage is one that can swap with a $limit stage, then we can look at the previous // stage to see if it includes a limit. Otherwise, we give up trying to find a limit on this // stage's output. if (!source->constraints().canSwapWithLimitAndSample) { break; } } return boost::none; } /** * If the merging pipeline includes a $limit stage that creates an upper bound on how many input * documents it needs to compute the aggregation, we can use that as an upper bound on how many * documents each of the shards needs to produce. Propagating that upper bound to the shards (using * a $limit in the shard pipeline) can reduce the number of documents the shards need to process and * transfer over the network (see SERVER-36881). * * If there are $skip stages before the $limit, the skipped documents also contribute to the upper * bound. */ void propagateDocLimitToShards(Pipeline* shardPipe, Pipeline* mergePipe) { long long numDocumentsNeeded = 0; for (auto&& source : mergePipe->getSources()) { auto skipStage = dynamic_cast(source.get()); if (skipStage) { numDocumentsNeeded += skipStage->getSkip(); continue; } auto limitStage = dynamic_cast(source.get()); if (limitStage) { numDocumentsNeeded += limitStage->getLimit(); auto existingShardLimit = getPipelineLimit(shardPipe); if (existingShardLimit && *existingShardLimit <= numDocumentsNeeded) { // The sharding pipeline already has a limit that is no greater than the limit we // were going to add, so no changes are necessary. return; } auto shardLimit = DocumentSourceLimit::create(mergePipe->getContext(), numDocumentsNeeded); shardPipe->addFinalSource(shardLimit); // We have successfully applied a limit to the number of documents we need from each // shard. return; } // If there are any stages in the merge pipeline before the $skip and $limit stages, then we // cannot use the $limit to determine an upper bound, unless those stages could be swapped // with the $limit. if (!source->constraints().canSwapWithLimitAndSample) { return; } } // We did not find any limit in the merge pipeline that would allow us to set an upper bound on // the number of documents we need from each shard. return; } /** * Adds a stage to the end of 'shardPipe' explicitly requesting all fields that 'mergePipe' needs. * This is only done if it heuristically determines that it is needed. This optimization can reduce * the amount of network traffic and can also enable the shards to convert less source BSON into * Documents. */ void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) { DepsTracker mergeDeps(mergePipe->getDependencies(DepsTracker::kAllMetadataAvailable)); if (mergeDeps.needWholeDocument) return; // the merge needs all fields, so nothing we can do. // Empty project is "special" so if no fields are needed, we just ask for _id instead. if (mergeDeps.fields.empty()) mergeDeps.fields.insert("_id"); // Remove metadata from dependencies since it automatically flows through projection and we // don't want to project it in to the document. mergeDeps.setNeedsMetadata(DepsTracker::MetadataType::TEXT_SCORE, false); // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of // field dependencies. While this may not be 100% ideal in all cases, it is simple and // avoids the worst cases by ensuring that: // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of // dependencies. This situation can happen when a $sort is before the first $project or // $group. Without the optimization, the shards would have to reify and transmit full // objects even though only a subset of fields are needed. // 2) Optimization IS NOT applied immediately following a $project or $group since it would // add an unnecessary project (and therefore a deep-copy). for (auto&& source : shardPipe->getSources()) { DepsTracker dt(DepsTracker::kAllMetadataAvailable); if (source->getDependencies(&dt) & DepsTracker::State::EXHAUSTIVE_FIELDS) return; } // if we get here, add the project. boost::intrusive_ptr project = DocumentSourceProject::createFromBson( BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->getContext()); shardPipe->pushBack(project); } bool isMergeSkipOrLimit(const boost::intrusive_ptr& stage) { return (dynamic_cast(stage.get()) || dynamic_cast(stage.get()) || dynamic_cast(stage.get())); } bool isAllLimitsAndSkips(Pipeline* pipeline) { const auto stages = pipeline->getSources(); return std::all_of( stages.begin(), stages.end(), [](const auto& stage) { return isMergeSkipOrLimit(stage); }); } ClusterClientCursorGuard convertPipelineToRouterStages( std::unique_ptr pipeline, ClusterClientCursorParams&& cursorParams) { auto* opCtx = pipeline->getContext()->opCtx; // We expect the pipeline to be fully executable at this point, so if the pipeline was all skips // and limits we expect it to start with a $mergeCursors stage. auto mergeCursors = checked_cast(pipeline->getSources().front().get()); // Replace the pipeline with RouterExecStages. std::unique_ptr root = mergeCursors->convertToRouterStage(); pipeline->popFront(); while (!pipeline->getSources().empty()) { if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) { root = std::make_unique( opCtx, std::move(root), static_cast(skip.get())->getSkip()); } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) { root = std::make_unique( opCtx, std::move(root), static_cast(limit.get())->getLimit()); } else { // We previously checked that everything was a $mergeCursors, $skip, or $limit. We // already popped off the $mergeCursors, so everything else should be a $skip or a // $limit. MONGO_UNREACHABLE; } } // We are executing the pipeline without using an actual Pipeline, so we need to strip out any // Document metadata ourselves. return ClusterClientCursorImpl::make( opCtx, std::make_unique( opCtx, std::move(root), Document::allMetadataFieldNames), std::move(cursorParams)); } bool stageCanRunInParallel(const boost::intrusive_ptr& stage, const std::set& nameOfShardKeyFieldsUponEntryToStage) { if (stage->distributedPlanLogic()) { return stage->canRunInParallelBeforeWriteStage(nameOfShardKeyFieldsUponEntryToStage); } else { // This stage is fine to execute in parallel on each stream. For example, a $match can be // applied to each stream in parallel. return true; } } std::string mapToString(const StringMap& map) { StringBuilder sb; sb << "{"; for (auto&& entry : map) { if (sb.len() != 1) { sb << ", "; } sb << entry.first << ": " << entry.second; } sb << "}"; return sb.str(); } BSONObj buildNewKeyPattern(const ShardKeyPattern& shardKey, StringMap renames) { BSONObjBuilder newPattern; for (auto&& elem : shardKey.getKeyPattern().toBSON()) { auto it = renames.find(elem.fieldNameStringData()); invariant(it != renames.end(), str::stream() << "Could not find new name of shard key field \"" << elem.fieldName() << "\": rename map was " << mapToString(renames)); newPattern.appendAs(elem, it->second); } return newPattern.obj(); } StringMap computeShardKeyRenameMap(const Pipeline* mergePipeline, std::set&& pathsOfShardKey) { auto traversalStart = mergePipeline->getSources().crbegin(); auto traversalEnd = mergePipeline->getSources().crend(); const auto leadingGroup = dynamic_cast(mergePipeline->getSources().front().get()); if (leadingGroup && leadingGroup->doingMerge()) { // A leading $group stage will not report to preserve any fields, since it blows away the // _id and replaces it with something new. It possibly renames some fields, but when // computing the new shard key we are interested in the name of the shard key *in the middle // of the $group*. The $exchange will be inserted between the shard-local groups and the // global groups. Thus we want to exclude this stage from our rename tracking. traversalEnd = std::prev(traversalEnd); } auto renameMap = semantic_analysis::renamedPaths(traversalStart, traversalEnd, pathsOfShardKey); invariant(renameMap, str::stream() << "Analyzed pipeline was thought to preserve the shard key fields, but did not: " << Value(mergePipeline->serialize()).toString()); return *renameMap; } /** * Returns true if any stage in the pipeline would modify any of the fields in 'shardKeyPaths', or * if there is any stage in the pipeline requires a unified stream to do its computation like a * $limit would. * * Purposefully takes 'shardKeyPaths' by value so that it can be modified throughout. */ bool anyStageModifiesShardKeyOrNeedsMerge(std::set shardKeyPaths, const Pipeline* mergePipeline) { const auto& stages = mergePipeline->getSources(); for (auto it = stages.crbegin(); it != stages.crend(); ++it) { const auto& stage = *it; auto renames = semantic_analysis::renamedPaths( std::move(shardKeyPaths), *stage, semantic_analysis::Direction::kBackward); if (!renames) { return true; } shardKeyPaths.clear(); for (auto&& rename : *renames) { shardKeyPaths.insert(rename.second); } if (!stageCanRunInParallel(stage, shardKeyPaths)) { // In order for this stage to work it needs a single input stream which it wouldn't get // if we inserted an exchange before it. return true; } } return false; } boost::optional walkPipelineBackwardsTrackingShardKey( OperationContext* opCtx, const Pipeline* mergePipeline, const ChunkManager& chunkManager) { const ShardKeyPattern& shardKey = chunkManager.getShardKeyPattern(); std::set shardKeyPaths; for (auto&& path : shardKey.getKeyPatternFields()) { shardKeyPaths.emplace(path->dottedField().toString()); } if (anyStageModifiesShardKeyOrNeedsMerge(shardKeyPaths, mergePipeline)) { return boost::none; } // All the fields of the shard key are preserved by the pipeline, but they might be renamed. To // set up the $exchange, we need to build a fake shard key pattern which uses the names of the // shard key fields as they are at the split point of the pipeline. auto renames = computeShardKeyRenameMap(mergePipeline, std::move(shardKeyPaths)); ShardKeyPattern newShardKey(buildNewKeyPattern(shardKey, renames)); // Append the boundaries with the new names from the new shard key. auto translateBoundary = [&renames](const BSONObj& oldBoundary) { BSONObjBuilder bob; for (auto&& elem : oldBoundary) { bob.appendAs(elem, renames[elem.fieldNameStringData()]); } return bob.obj(); }; // Given the new shard key fields, build the distribution map. ExchangeSpec exchangeSpec; std::vector boundaries; std::vector consumerIds; std::map shardToConsumer; std::vector consumerShards; int numConsumers = 0; // The chunk manager enumerates the chunks in the ascending order from MinKey to MaxKey. Every // chunk has an associated range [from, to); i.e. inclusive lower bound and exclusive upper // bound. The chunk ranges must cover all domain without any holes. For the exchange we coalesce // ranges into a single vector of points. E.g. chunks [min,5], [5,10], [10,max] will produce // [min,5,10,max] vector. Number of points in the vector is always one greater than number of // chunks. // We also compute consumer indices for every chunk. From the example above (3 chunks) we may // get the vector [0,1,2]; i.e. the first chunk goes to the consumer 0 and so on. Note that // the consumer id may be repeated if the consumer hosts more than 1 chunk. boundaries.emplace_back(translateBoundary((*chunkManager.chunks().begin()).getMin())); for (auto&& chunk : chunkManager.chunks()) { boundaries.emplace_back(translateBoundary(chunk.getMax())); if (shardToConsumer.find(chunk.getShardId()) == shardToConsumer.end()) { shardToConsumer.emplace(chunk.getShardId(), numConsumers++); consumerShards.emplace_back(chunk.getShardId()); } consumerIds.emplace_back(shardToConsumer[chunk.getShardId()]); } exchangeSpec.setPolicy(ExchangePolicyEnum::kKeyRange); exchangeSpec.setKey(newShardKey.toBSON()); exchangeSpec.setBoundaries(std::move(boundaries)); exchangeSpec.setConsumers(shardToConsumer.size()); exchangeSpec.setConsumerIds(std::move(consumerIds)); return ShardedExchangePolicy{std::move(exchangeSpec), std::move(consumerShards)}; } /** * Non-correlated pipeline caching is only supported locally. When the * DocumentSourceSequentialDocumentCache stage has been moved to the shards pipeline, abandon the * associated local cache. */ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) { for (auto&& stage : shardsPipeline->getSources()) { if (StringData(stage->getSourceName()) == DocumentSourceSequentialDocumentCache::kStageName) { static_cast(stage.get())->abandonCache(); } } } } // namespace SplitPipeline splitPipeline(std::unique_ptr pipeline) { auto& expCtx = pipeline->getContext(); // Re-brand 'pipeline' as the merging pipeline. We will move stages one by one from the merging // half to the shards, as possible. auto mergePipeline = std::move(pipeline); Pipeline::SourceContainer shardStages; boost::optional inputsSort = findSplitPoint(&shardStages, mergePipeline.get()); auto shardsPipeline = uassertStatusOK(Pipeline::create(std::move(shardStages), expCtx)); // The order in which optimizations are applied can have significant impact on the efficiency of // the final pipeline. Be Careful! moveFinalUnwindFromShardsToMerger(shardsPipeline.get(), mergePipeline.get()); propagateDocLimitToShards(shardsPipeline.get(), mergePipeline.get()); limitFieldsSentFromShardsToMerger(shardsPipeline.get(), mergePipeline.get()); abandonCacheIfSentToShards(shardsPipeline.get()); shardsPipeline->setSplitState(Pipeline::SplitState::kSplitForShards); mergePipeline->setSplitState(Pipeline::SplitState::kSplitForMerge); return {std::move(shardsPipeline), std::move(mergePipeline), std::move(inputsSort)}; } void addMergeCursorsSource(Pipeline* mergePipeline, const LiteParsedPipeline& liteParsedPipeline, BSONObj cmdSentToShards, std::vector ownedCursors, const std::vector& targetedShards, boost::optional shardCursorsSortSpec, std::shared_ptr executor) { auto* opCtx = mergePipeline->getContext()->opCtx; AsyncResultsMergerParams armParams; armParams.setSort(shardCursorsSortSpec); armParams.setTailableMode(mergePipeline->getContext()->tailableMode); armParams.setNss(mergePipeline->getContext()->ns); OperationSessionInfoFromClient sessionInfo; boost::optional lsidFromClient; auto lsid = opCtx->getLogicalSessionId(); if (lsid) { lsidFromClient.emplace(lsid->getId()); lsidFromClient->setUid(lsid->getUid()); } sessionInfo.setSessionId(lsidFromClient); sessionInfo.setTxnNumber(opCtx->getTxnNumber()); if (TransactionRouter::get(opCtx)) { sessionInfo.setAutocommit(false); } armParams.setOperationSessionInfo(sessionInfo); // Convert owned cursors into a vector of remote cursors to be transferred to the merge // pipeline. std::vector remoteCursors; for (auto&& cursor : ownedCursors) { // Transfer ownership of the remote cursor to the $mergeCursors stage. remoteCursors.emplace_back(cursor.releaseCursor()); } armParams.setRemotes(std::move(remoteCursors)); // For change streams, we need to set up a custom stage to establish cursors on new shards when // they are added, to ensure we don't miss results from the new shards. auto mergeCursorsStage = DocumentSourceMergeCursors::create( std::move(executor), std::move(armParams), mergePipeline->getContext()); if (liteParsedPipeline.hasChangeStream()) { mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create( mergePipeline->getContext(), Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), mergeCursorsStage, targetedShards, cmdSentToShards)); } mergePipeline->addInitialSource(std::move(mergeCursorsStage)); } ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx, std::unique_ptr pipeline, ClusterClientCursorParams&& cursorParams) { if (isAllLimitsAndSkips(pipeline.get())) { // We can optimize this Pipeline to avoid going through any DocumentSources at all and thus // skip the expensive BSON->Document->BSON conversion. return convertPipelineToRouterStages(std::move(pipeline), std::move(cursorParams)); } return ClusterClientCursorImpl::make( opCtx, std::make_unique(std::move(pipeline)), std::move(cursorParams)); } boost::optional checkIfEligibleForExchange(OperationContext* opCtx, const Pipeline* mergePipeline) { if (internalQueryDisableExchange.load()) { return boost::none; } if (mergePipeline->getSources().empty()) { return boost::none; } auto mergeStage = dynamic_cast(mergePipeline->getSources().back().get()); if (!mergeStage) { // If there's no $merge stage we won't try to do an $exchange. For the $out stage there's no // point doing an $exchange because all the writes will go to a single node, so we should // just perform the merge on that host. return boost::none; } const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, mergeStage->getOutputNs())); if (!routingInfo.cm()) { return boost::none; } // The collection is sharded and we have a $merge stage! Here we assume the $merge stage has // already verified that the shard key pattern is compatible with the unique key being used. // Assuming this, we just have to make sure the shard key is preserved (though possibly renamed) // all the way to the front of the merge pipeline. If this is the case then for any document // entering the merging pipeline we can predict which shard it will need to end up being // inserted on. With this ability we can insert an exchange on the shards to partition the // documents based on which shard will end up owning them. Then each shard can perform a merge // of only those documents which belong to it (optimistically, barring chunk migrations). return walkPipelineBackwardsTrackingShardKey(opCtx, mergePipeline, *routingInfo.cm()); } } // namespace cluster_aggregation_planner } // namespace mongo