/** * Copyright (c) 2012-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/multi_iterator.h" #include "mongo/db/exec/shard_filter.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_geo_near_cursor.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" #include "mongo/db/transaction_participant.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/cluster_write.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/time_support.h" namespace mongo { using boost::intrusive_ptr; using std::shared_ptr; using std::string; using std::unique_ptr; using write_ops::Insert; namespace { /** * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {} * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough * percentage of the collection. */ StatusWith> createRandomCursorExecutor( Collection* collection, OperationContext* opCtx, long long sampleSize, long long numRecords) { double kMaxSampleRatioForRandCursor = 0.05; if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) { return {nullptr}; } // Attempt to get a random cursor from the RecordStore. auto rsRandCursor = collection->getRecordStore()->getRandomCursor(opCtx); if (!rsRandCursor) { // The storage engine has no random cursor support. return {nullptr}; } auto ws = stdx::make_unique(); auto stage = stdx::make_unique(opCtx, ws.get(), collection); stage->addIterator(std::move(rsRandCursor)); { AutoGetCollectionForRead autoColl(opCtx, collection->ns()); // If we're in a sharded environment, we need to filter out documents we don't own. if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) { auto shardFilterStage = stdx::make_unique( opCtx, CollectionShardingState::get(opCtx, collection->ns())->getMetadata(opCtx), ws.get(), stage.release()); return PlanExecutor::make(opCtx, std::move(ws), std::move(shardFilterStage), collection, PlanExecutor::YIELD_AUTO); } } return PlanExecutor::make( opCtx, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO); } StatusWith> attemptToGetExecutor( OperationContext* opCtx, Collection* collection, const NamespaceString& nss, const intrusive_ptr& pExpCtx, bool oplogReplay, BSONObj queryObj, BSONObj projectionObj, BSONObj sortObj, const AggregationRequest* aggRequest, const size_t plannerOpts, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures) { auto qr = stdx::make_unique(nss); qr->setTailableMode(pExpCtx->tailableMode); qr->setOplogReplay(oplogReplay); qr->setFilter(queryObj); qr->setProj(projectionObj); qr->setSort(sortObj); if (aggRequest) { qr->setExplain(static_cast(aggRequest->getExplain())); qr->setHint(aggRequest->getHint()); } // If the pipeline has a non-null collator, set the collation option to the result of // serializing the collator's spec back into BSON. We do this in order to fill in all options // that the user omitted. // // If pipeline has a null collator (representing the "simple" collation), we simply set the // collation option to the original user BSON, which is either the empty object (unspecified), // or the specification for the "simple" collation. qr->setCollation(pExpCtx->getCollator() ? pExpCtx->getCollator()->getSpec().toBSON() : pExpCtx->collation); const ExtensionsCallbackReal extensionsCallback(pExpCtx->opCtx, &nss); auto cq = CanonicalQuery::canonicalize( opCtx, std::move(qr), pExpCtx, extensionsCallback, matcherFeatures); if (!cq.isOK()) { // Return an error instead of uasserting, since there are cases where the combination of // sort and projection will result in a bad query, but when we try with a different // combination it will be ok. e.g. a sort by {$meta: 'textScore'}, without any projection // will fail, but will succeed when the corresponding '$meta' projection is passed in // another attempt. return {cq.getStatus()}; } return getExecutorFind(opCtx, collection, nss, std::move(cq.getValue()), plannerOpts); } BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) { if (!projectionObj[Document::metaFieldSortKey]) { return projectionObj; } return projectionObj.removeField(Document::metaFieldSortKey); } /** * Examines the indexes in 'collection' and returns the field name of a geo-indexed field suitable * for use in $geoNear. 2d indexes are given priority over 2dsphere indexes. * * The 'collection' is required to exist. Throws if no usable 2d or 2dsphere index could be found. */ StringData extractGeoNearFieldFromIndexes(OperationContext* opCtx, Collection* collection) { invariant(collection); std::vector idxs; collection->getIndexCatalog()->findIndexByType(opCtx, IndexNames::GEO_2D, idxs); uassert(ErrorCodes::IndexNotFound, str::stream() << "There is more than one 2d index on " << collection->ns().ns() << "; unsure which to use for $geoNear", idxs.size() <= 1U); if (idxs.size() == 1U) { for (auto&& elem : idxs.front()->keyPattern()) { if (elem.type() == BSONType::String && elem.valueStringData() == IndexNames::GEO_2D) { return elem.fieldNameStringData(); } } MONGO_UNREACHABLE; } // If there are no 2d indexes, look for a 2dsphere index. idxs.clear(); collection->getIndexCatalog()->findIndexByType(opCtx, IndexNames::GEO_2DSPHERE, idxs); uassert(ErrorCodes::IndexNotFound, "$geoNear requires a 2d or 2dsphere index, but none were found", !idxs.empty()); uassert(ErrorCodes::IndexNotFound, str::stream() << "There is more than one 2dsphere index on " << collection->ns().ns() << "; unsure which to use for $geoNear", idxs.size() <= 1U); invariant(idxs.size() == 1U); for (auto&& elem : idxs.front()->keyPattern()) { if (elem.type() == BSONType::String && elem.valueStringData() == IndexNames::GEO_2DSPHERE) { return elem.fieldNameStringData(); } } MONGO_UNREACHABLE; } } // namespace void PipelineD::prepareCursorSource(Collection* collection, const NamespaceString& nss, const AggregationRequest* aggRequest, Pipeline* pipeline) { auto expCtx = pipeline->getContext(); // We will be modifying the source vector as we go. Pipeline::SourceContainer& sources = pipeline->_sources; if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) { return; } // We are going to generate an input cursor, so we need to be holding the collection lock. dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IS)); if (!sources.empty()) { auto sampleStage = dynamic_cast(sources.front().get()); // Optimize an initial $sample stage if possible. if (collection && sampleStage) { const long long sampleSize = sampleStage->getSampleSize(); const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx); auto exec = uassertStatusOK( createRandomCursorExecutor(collection, expCtx->opCtx, sampleSize, numRecords)); if (exec) { // Replace $sample stage with $sampleFromRandomCursor stage. sources.pop_front(); std::string idString = collection->ns().isOplog() ? "ts" : "_id"; sources.emplace_front(DocumentSourceSampleFromRandomCursor::create( expCtx, sampleSize, idString, numRecords)); addCursorSource( pipeline, DocumentSourceCursor::create(collection, std::move(exec), expCtx), pipeline->getDependencies(DepsTracker::MetadataAvailable::kNoMetadata)); return; } } } // If the first stage is $geoNear, prepare a special DocumentSourceGeoNearCursor stage; // otherwise, create a generic DocumentSourceCursor. const auto geoNearStage = sources.empty() ? nullptr : dynamic_cast(sources.front().get()); if (geoNearStage) { prepareGeoNearCursorSource(collection, nss, aggRequest, pipeline); } else { prepareGenericCursorSource(collection, nss, aggRequest, pipeline); } } void PipelineD::prepareGenericCursorSource(Collection* collection, const NamespaceString& nss, const AggregationRequest* aggRequest, Pipeline* pipeline) { Pipeline::SourceContainer& sources = pipeline->_sources; auto expCtx = pipeline->getContext(); // Look for an initial match. This works whether we got an initial query or not. If not, it // results in a "{}" query, which will be what we want in that case. bool oplogReplay = false; const BSONObj queryObj = pipeline->getInitialQuery(); if (!queryObj.isEmpty()) { auto matchStage = dynamic_cast(sources.front().get()); if (matchStage) { oplogReplay = dynamic_cast(matchStage) != nullptr; // If a $match query is pulled into the cursor, the $match is redundant, and can be // removed from the pipeline. sources.pop_front(); } else { // A $geoNear stage, the only other stage that can produce an initial query, is also // a valid initial stage. However, we should be in prepareGeoNearCursorSource() instead. MONGO_UNREACHABLE; } } // Find the set of fields in the source documents depended on by this pipeline. DepsTracker deps = pipeline->getDependencies(DocumentSourceMatch::isTextQuery(queryObj) ? DepsTracker::MetadataAvailable::kTextScore : DepsTracker::MetadataAvailable::kNoMetadata); BSONObj projForQuery = deps.toProjection(); // Look for an initial sort; we'll try to add this to the Cursor we create. If we're successful // in doing that, we'll remove the $sort from the pipeline, because the documents will already // come sorted in the specified order as a result of the index scan. intrusive_ptr sortStage; BSONObj sortObj; if (!sources.empty()) { sortStage = dynamic_cast(sources.front().get()); if (sortStage) { sortObj = sortStage ->sortKeyPattern( DocumentSourceSort::SortKeySerialization::kForPipelineSerialization) .toBson(); } } // Create the PlanExecutor. auto exec = uassertStatusOK(prepareExecutor(expCtx->opCtx, collection, nss, pipeline, expCtx, oplogReplay, sortStage, deps, queryObj, aggRequest, Pipeline::kAllowedMatcherFeatures, &sortObj, &projForQuery)); if (!projForQuery.isEmpty() && !sources.empty()) { // Check for redundant $project in query with the same specification as the inclusion // projection generated by the dependency optimization. auto proj = dynamic_cast(sources.front().get()); if (proj && proj->isSubsetOfProjection(projForQuery)) { sources.pop_front(); } } addCursorSource(pipeline, DocumentSourceCursor::create(collection, std::move(exec), expCtx), deps, queryObj, sortObj, projForQuery); } void PipelineD::prepareGeoNearCursorSource(Collection* collection, const NamespaceString& nss, const AggregationRequest* aggRequest, Pipeline* pipeline) { uassert(ErrorCodes::NamespaceNotFound, str::stream() << "$geoNear requires a geo index to run, but " << nss.ns() << " does not exist", collection); Pipeline::SourceContainer& sources = pipeline->_sources; auto expCtx = pipeline->getContext(); const auto geoNearStage = dynamic_cast(sources.front().get()); invariant(geoNearStage); auto deps = pipeline->getDependencies(DepsTracker::kAllGeoNearDataAvailable); // If the user specified a "key" field, use that field to satisfy the "near" query. Otherwise, // look for a geo-indexed field in 'collection' that can. auto nearFieldName = (geoNearStage->getKeyField() ? geoNearStage->getKeyField()->fullPath() : extractGeoNearFieldFromIndexes(expCtx->opCtx, collection)) .toString(); // Create a PlanExecutor whose query is the "near" predicate on 'nearFieldName' combined with // the optional "query" argument in the $geoNear stage. BSONObj fullQuery = geoNearStage->asNearQuery(nearFieldName); BSONObj proj = deps.toProjection(); BSONObj sortFromQuerySystem; auto exec = uassertStatusOK(prepareExecutor(expCtx->opCtx, collection, nss, pipeline, expCtx, false, /* oplogReplay */ nullptr, /* sortStage */ deps, std::move(fullQuery), aggRequest, Pipeline::kGeoNearMatcherFeatures, &sortFromQuerySystem, &proj)); invariant(sortFromQuerySystem.isEmpty(), str::stream() << "Unexpectedly got the following sort from the query system: " << sortFromQuerySystem.jsonString()); auto geoNearCursor = DocumentSourceGeoNearCursor::create(collection, std::move(exec), expCtx, geoNearStage->getDistanceField(), geoNearStage->getLocationField(), geoNearStage->getDistanceMultiplier().value_or(1.0)); // Remove the initial $geoNear; it will be replaced by $geoNearCursor. sources.pop_front(); addCursorSource(pipeline, std::move(geoNearCursor), std::move(deps)); } StatusWith> PipelineD::prepareExecutor( OperationContext* opCtx, Collection* collection, const NamespaceString& nss, Pipeline* pipeline, const intrusive_ptr& expCtx, bool oplogReplay, const intrusive_ptr& sortStage, const DepsTracker& deps, const BSONObj& queryObj, const AggregationRequest* aggRequest, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, BSONObj* sortObj, BSONObj* projectionObj) { // The query system has the potential to use an index to provide a non-blocking sort and/or to // use the projection to generate a covered plan. If this is possible, it is more efficient to // let the query system handle those parts of the pipeline. If not, it is more efficient to use // a $sort and/or a ParsedDeps object. Thus, we will determine whether the query system can // provide a non-blocking sort or a covered projection before we commit to a PlanExecutor. // // To determine if the query system can provide a non-blocking sort, we pass the // NO_BLOCKING_SORT planning option, meaning 'getExecutor' will not produce a PlanExecutor if // the query system would use a blocking sort stage. // // To determine if the query system can provide a covered projection, we pass the // NO_UNCOVERED_PROJECTS planning option, meaning 'getExecutor' will not produce a PlanExecutor // if the query system would need to fetch the document to do the projection. The following // logic uses the above strategies, with multiple calls to 'attemptToGetExecutor' to determine // the most efficient way to handle the $sort and $project stages. // // LATER - We should attempt to determine if the results from the query are returned in some // order so we can then apply other optimizations there are tickets for, such as SERVER-4507. size_t plannerOpts = QueryPlannerParams::DEFAULT | QueryPlannerParams::NO_BLOCKING_SORT; if (deps.hasNoRequirements()) { // If we don't need any fields from the input document, performing a count is faster, and // will output empty documents, which is okay. plannerOpts |= QueryPlannerParams::IS_COUNT; } // The only way to get meta information (e.g. the text score) is to let the query system handle // the projection. In all other cases, unless the query system can do an index-covered // projection and avoid going to the raw record at all, it is faster to have ParsedDeps filter // the fields we need. if (!deps.getNeedsAnyMetadata()) { plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } if (expCtx->needsMerge && expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; } const BSONObj emptyProjection; const BSONObj metaSortProjection = BSON("$meta" << "sortKey"); if (sortStage) { // See if the query system can provide a non-blocking sort. auto swExecutorSort = attemptToGetExecutor(opCtx, collection, nss, expCtx, oplogReplay, queryObj, expCtx->needsMerge ? metaSortProjection : emptyProjection, *sortObj, aggRequest, plannerOpts, matcherFeatures); if (swExecutorSort.isOK()) { // Success! Now see if the query system can also cover the projection. auto swExecutorSortAndProj = attemptToGetExecutor(opCtx, collection, nss, expCtx, oplogReplay, queryObj, *projectionObj, *sortObj, aggRequest, plannerOpts, matcherFeatures); std::unique_ptr exec; if (swExecutorSortAndProj.isOK()) { // Success! We have a non-blocking sort and a covered projection. exec = std::move(swExecutorSortAndProj.getValue()); } else if (swExecutorSortAndProj == ErrorCodes::QueryPlanKilled) { return {ErrorCodes::OperationFailed, str::stream() << "Failed to determine whether query system can provide a " "covered projection in addition to a non-blocking sort: " << swExecutorSortAndProj.getStatus().toString()}; } else { // The query system couldn't cover the projection. *projectionObj = BSONObj(); exec = std::move(swExecutorSort.getValue()); } // We know the sort is being handled by the query system, so remove the $sort stage. pipeline->_sources.pop_front(); if (sortStage->getLimitSrc()) { // We need to reinsert the coalesced $limit after removing the $sort. pipeline->_sources.push_front(sortStage->getLimitSrc()); } return std::move(exec); } else if (swExecutorSort == ErrorCodes::QueryPlanKilled) { return { ErrorCodes::OperationFailed, str::stream() << "Failed to determine whether query system can provide a non-blocking sort: " << swExecutorSort.getStatus().toString()}; } // The query system can't provide a non-blocking sort. *sortObj = BSONObj(); } // Either there was no $sort stage, or the query system could not provide a non-blocking // sort. dassert(sortObj->isEmpty()); *projectionObj = removeSortKeyMetaProjection(*projectionObj); const auto metadataRequired = deps.getAllRequiredMetadataTypes(); if (metadataRequired.size() == 1 && metadataRequired.front() == DepsTracker::MetadataType::SORT_KEY) { // A sort key requirement would have prevented us from being able to add this parameter // before, but now we know the query system won't cover the sort, so we will be able to // compute the sort key ourselves during the $sort stage, and thus don't need a query // projection to do so. plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } // See if the query system can cover the projection. auto swExecutorProj = attemptToGetExecutor(opCtx, collection, nss, expCtx, oplogReplay, queryObj, *projectionObj, *sortObj, aggRequest, plannerOpts, matcherFeatures); if (swExecutorProj.isOK()) { // Success! We have a covered projection. return std::move(swExecutorProj.getValue()); } else if (swExecutorProj == ErrorCodes::QueryPlanKilled) { return {ErrorCodes::OperationFailed, str::stream() << "Failed to determine whether query system can provide a covered projection: " << swExecutorProj.getStatus().toString()}; } // The query system couldn't provide a covered projection. *projectionObj = BSONObj(); // If this doesn't work, nothing will. return attemptToGetExecutor(opCtx, collection, nss, expCtx, oplogReplay, queryObj, *projectionObj, *sortObj, aggRequest, plannerOpts, matcherFeatures); } void PipelineD::addCursorSource(Pipeline* pipeline, boost::intrusive_ptr cursor, DepsTracker deps, const BSONObj& queryObj, const BSONObj& sortObj, const BSONObj& projectionObj) { cursor->setQuery(queryObj); cursor->setSort(sortObj); if (deps.hasNoRequirements()) { cursor->shouldProduceEmptyDocs(); } if (!projectionObj.isEmpty()) { cursor->setProjection(projectionObj, boost::none); } else { // There may be fewer dependencies now if the sort was covered. if (!sortObj.isEmpty()) { deps = pipeline->getDependencies(DocumentSourceMatch::isTextQuery(queryObj) ? DepsTracker::MetadataAvailable::kTextScore : DepsTracker::MetadataAvailable::kNoMetadata); } cursor->setProjection(deps.toProjection(), deps.toParsedDeps()); } pipeline->addInitialSource(std::move(cursor)); } Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) { if (auto docSourceCursor = dynamic_cast(pipeline->_sources.front().get())) { return docSourceCursor->getLatestOplogTimestamp(); } return Timestamp(); } std::string PipelineD::getPlanSummaryStr(const Pipeline* pipeline) { if (auto docSourceCursor = dynamic_cast(pipeline->_sources.front().get())) { return docSourceCursor->getPlanSummaryStr(); } return ""; } void PipelineD::getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut) { invariant(statsOut); if (auto docSourceCursor = dynamic_cast(pipeline->_sources.front().get())) { *statsOut = docSourceCursor->getPlanSummaryStats(); } bool hasSortStage{false}; bool usedDisk{false}; for (auto&& source : pipeline->_sources) { if (dynamic_cast(source.get())) hasSortStage = true; usedDisk = usedDisk || source->usedDisk(); if (usedDisk && hasSortStage) break; } statsOut->hasSortStage = hasSortStage; statsOut->usedDisk = usedDisk; } } // namespace mongo