/** * Copyright (c) 2012-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/index_iterator.h" #include "mongo/db/exec/multi_iterator.h" #include "mongo/db/exec/shard_filter.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/collation/collation_serializer.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" #include "mongo/s/chunk_version.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { using boost::intrusive_ptr; using std::shared_ptr; using std::string; using std::unique_ptr; namespace { class MongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface { public: MongodImplementation(const intrusive_ptr& ctx) : _ctx(ctx), _client(ctx->opCtx) {} void setOperationContext(OperationContext* opCtx) { invariant(_ctx->opCtx == opCtx); _client.setOpCtx(opCtx); } DBClientBase* directClient() final { return &_client; } bool isSharded(const NamespaceString& ns) final { const ChunkVersion unsharded(0, 0, OID()); return !( ShardingState::get(_ctx->opCtx)->getVersion(ns.ns()).isWriteCompatibleWith(unsharded)); } bool isCapped(const NamespaceString& ns) final { AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns()); Collection* collection = ctx.getCollection(); return collection && collection->isCapped(); } BSONObj insert(const NamespaceString& ns, const std::vector& objs) final { boost::optional maybeDisableValidation; if (_ctx->bypassDocumentValidation) maybeDisableValidation.emplace(_ctx->opCtx); _client.insert(ns.ns(), objs); return _client.getLastErrorDetailed(); } CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final { AutoGetCollectionForRead autoColl(opCtx, ns); Collection* collection = autoColl.getCollection(); if (!collection) { LOG(2) << "Collection not found on index stats retrieval: " << ns.ns(); return CollectionIndexUsageMap(); } return collection->infoCache()->getIndexUsageStats(); } bool hasUniqueIdIndex(const NamespaceString& ns) const final { AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns()); Collection* collection = ctx.getCollection(); if (!collection) { // Collection doesn't exist; the correct return value is questionable. return false; } return collection->getIndexCatalog()->findIdIndex(_ctx->opCtx); } private: intrusive_ptr _ctx; DBDirectClient _client; }; /** * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {} * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough * percentage of the collection. */ shared_ptr createRandomCursorExecutor(Collection* collection, OperationContext* txn, long long sampleSize, long long numRecords) { double kMaxSampleRatioForRandCursor = 0.05; if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) return {}; // Attempt to get a random cursor from the RecordStore. If the RecordStore does not support // random cursors, attempt to get one from the _id index. std::unique_ptr rsRandCursor = collection->getRecordStore()->getRandomCursor(txn); auto ws = stdx::make_unique(); std::unique_ptr stage; if (rsRandCursor) { stage = stdx::make_unique(txn, ws.get(), collection); static_cast(stage.get())->addIterator(std::move(rsRandCursor)); } else { auto indexCatalog = collection->getIndexCatalog(); auto indexDescriptor = indexCatalog->findIdIndex(txn); if (!indexDescriptor) { // There was no _id index. return {}; } IndexAccessMethod* idIam = indexCatalog->getIndex(indexDescriptor); auto idxRandCursor = idIam->newRandomCursor(txn); if (!idxRandCursor) { // Storage engine does not support any type of random cursor. return {}; } auto idxIterator = stdx::make_unique(txn, ws.get(), collection, idIam, indexDescriptor->keyPattern(), std::move(idxRandCursor)); stage = stdx::make_unique( txn, ws.get(), idxIterator.release(), nullptr, collection); } ShardingState* const shardingState = ShardingState::get(txn); // If we're in a sharded environment, we need to filter out documents we don't own. if (shardingState->needCollectionMetadata(txn, collection->ns().ns())) { auto shardFilterStage = stdx::make_unique( txn, shardingState->getCollectionMetadata(collection->ns().ns()), ws.get(), stage.release()); return uassertStatusOK(PlanExecutor::make( txn, std::move(ws), std::move(shardFilterStage), collection, PlanExecutor::YIELD_AUTO)); } return uassertStatusOK(PlanExecutor::make( txn, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO)); } StatusWith> attemptToGetExecutor( OperationContext* txn, Collection* collection, const intrusive_ptr& pExpCtx, BSONObj queryObj, BSONObj projectionObj, BSONObj sortObj, const size_t plannerOpts) { auto lpq = stdx::make_unique(pExpCtx->ns); lpq->setFilter(queryObj); lpq->setProj(projectionObj); lpq->setSort(sortObj); if (pExpCtx->collator) { lpq->setCollation(CollationSerializer::specToBSON(pExpCtx->collator->getSpec())); } const ExtensionsCallbackReal extensionsCallback(pExpCtx->opCtx, &pExpCtx->ns); auto cq = CanonicalQuery::canonicalize(txn, std::move(lpq), extensionsCallback); if (!cq.isOK()) { // Return an error instead of uasserting, since there are cases where the combination of // sort and projection will result in a bad query, but when we try with a different // combination it will be ok. e.g. a sort by {$meta: 'textScore'}, without any projection // will fail, but will succeed when the corresponding '$meta' projection is passed in // another attempt. return {cq.getStatus()}; } return getExecutor( txn, collection, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts); } } // namespace shared_ptr PipelineD::prepareCursorSource( OperationContext* txn, Collection* collection, const NamespaceString& nss, const intrusive_ptr& pPipeline, const intrusive_ptr& pExpCtx) { // We will be modifying the source vector as we go. Pipeline::SourceContainer& sources = pPipeline->sources; // Inject a MongodImplementation to sources that need them. for (auto&& source : sources) { DocumentSourceNeedsMongod* needsMongod = dynamic_cast(source.get()); if (needsMongod) { needsMongod->injectMongodInterface(std::make_shared(pExpCtx)); } } if (!sources.empty()) { if (sources.front()->isValidInitialSource()) { if (dynamic_cast(sources.front().get())) { // Enable the hooks for setting up authentication on the subsequent internal // connections we are going to create. This would normally have been done // when SetShardVersion was called, but since SetShardVersion is never called // on secondaries, this is needed. ShardedConnectionInfo::addHook(); } return std::shared_ptr(); // don't need a cursor } auto sampleStage = dynamic_cast(sources.front().get()); // Optimize an initial $sample stage if possible. if (collection && sampleStage) { const long long sampleSize = sampleStage->getSampleSize(); const long long numRecords = collection->getRecordStore()->numRecords(txn); auto exec = createRandomCursorExecutor(collection, txn, sampleSize, numRecords); if (exec) { // Replace $sample stage with $sampleFromRandomCursor stage. sources.pop_front(); std::string idString = collection->ns().isOplog() ? "ts" : "_id"; sources.emplace_front(DocumentSourceSampleFromRandomCursor::create( pExpCtx, sampleSize, idString, numRecords)); const BSONObj initialQuery; return addCursorSource( pPipeline, pExpCtx, exec, pPipeline->getDependencies(initialQuery)); } } } // Look for an initial match. This works whether we got an initial query or not. If not, it // results in a "{}" query, which will be what we want in that case. const BSONObj queryObj = pPipeline->getInitialQuery(); if (!queryObj.isEmpty()) { if (dynamic_cast(sources.front().get())) { // If a $match query is pulled into the cursor, the $match is redundant, and can be // removed from the pipeline. sources.pop_front(); } else { // A $geoNear stage, the only other stage that can produce an initial query, is also // a valid initial stage and will be handled above. MONGO_UNREACHABLE; } } // Find the set of fields in the source documents depended on by this pipeline. DepsTracker deps = pPipeline->getDependencies(queryObj); BSONObj projForQuery = deps.toProjection(); /* Look for an initial sort; we'll try to add this to the Cursor we create. If we're successful in doing that (further down), we'll remove the $sort from the pipeline, because the documents will already come sorted in the specified order as a result of the index scan. */ intrusive_ptr sortStage; BSONObj sortObj; if (!sources.empty()) { sortStage = dynamic_cast(sources.front().get()); if (sortStage) { // build the sort key sortObj = sortStage->serializeSortKey(/*explain*/ false).toBson(); } } // Create the PlanExecutor. auto exec = prepareExecutor(txn, collection, nss, pPipeline, pExpCtx, sortStage, deps, queryObj, &sortObj, &projForQuery); return addCursorSource(pPipeline, pExpCtx, exec, deps, queryObj, sortObj, projForQuery); } std::shared_ptr PipelineD::prepareExecutor( OperationContext* txn, Collection* collection, const NamespaceString& nss, const intrusive_ptr& pipeline, const intrusive_ptr& expCtx, const intrusive_ptr& sortStage, const DepsTracker& deps, const BSONObj& queryObj, BSONObj* sortObj, BSONObj* projectionObj) { // The query system has the potential to use an index to provide a non-blocking sort and/or to // use the projection to generate a covered plan. If this is possible, it is more efficient to // let the query system handle those parts of the pipeline. If not, it is more efficient to use // a $sort and/or a ParsedDeps object. Thus, we will determine whether the query system can // provide a non-blocking sort or a covered projection before we commit to a PlanExecutor. // // To determine if the query system can provide a non-blocking sort, we pass the // NO_BLOCKING_SORT planning option, meaning 'getExecutor' will not produce a PlanExecutor if // the query system would use a blocking sort stage. // // To determine if the query system can provide a covered projection, we pass the // NO_UNCOVERED_PROJECTS planning option, meaning 'getExecutor' will not produce a PlanExecutor // if the query system would need to fetch the document to do the projection. The following // logic uses the above strategies, with multiple calls to 'attemptToGetExecutor' to determine // the most efficient way to handle the $sort and $project stages. // // LATER - We should attempt to determine if the results from the query are returned in some // order so we can then apply other optimizations there are tickets for, such as SERVER-4507. size_t plannerOpts = QueryPlannerParams::DEFAULT | QueryPlannerParams::NO_BLOCKING_SORT; // If we are connecting directly to the shard rather than through a mongos, don't filter out // orphaned documents. if (ShardingState::get(txn)->needCollectionMetadata(txn, nss.ns())) { plannerOpts |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } if (deps.hasNoRequirements()) { // If we don't need any fields from the input document, performing a count is faster, and // will output empty documents, which is okay. plannerOpts |= QueryPlannerParams::IS_COUNT; } // The only way to get a text score is to let the query system handle the projection. In all // other cases, unless the query system can do an index-covered projection and avoid going to // the raw record at all, it is faster to have ParsedDeps filter the fields we need. if (!deps.needTextScore) { plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS; } std::shared_ptr exec; BSONObj emptyProjection; if (sortStage) { // See if the query system can provide a non-blocking sort. auto swExecutorSort = attemptToGetExecutor( txn, collection, expCtx, queryObj, emptyProjection, *sortObj, plannerOpts); if (swExecutorSort.isOK()) { // Success! Now see if the query system can also cover the projection. auto swExecutorSortAndProj = attemptToGetExecutor( txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts); if (swExecutorSortAndProj.isOK()) { // Success! We have a non-blocking sort and a covered projection. exec = std::move(swExecutorSortAndProj.getValue()); } else { // The query system couldn't cover the projection. *projectionObj = BSONObj(); exec = std::move(swExecutorSort.getValue()); } // We know the sort is being handled by the query system, so remove the $sort stage. pipeline->sources.pop_front(); if (sortStage->getLimitSrc()) { // We need to reinsert the coalesced $limit after removing the $sort. pipeline->sources.push_front(sortStage->getLimitSrc()); } return exec; } // The query system can't provide a non-blocking sort. *sortObj = BSONObj(); } // Either there was no $sort stage, or the query system could not provide a non-blocking // sort. dassert(sortObj->isEmpty()); // See if the query system can cover the projection. auto swExecutorProj = attemptToGetExecutor( txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts); if (swExecutorProj.isOK()) { // Success! We have a covered projection. return std::move(swExecutorProj.getValue()); } // The query system couldn't provide a covered projection. *projectionObj = BSONObj(); // If this doesn't work, nothing will. return uassertStatusOK(attemptToGetExecutor( txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts)); } shared_ptr PipelineD::addCursorSource(const intrusive_ptr& pipeline, const intrusive_ptr& expCtx, shared_ptr exec, DepsTracker deps, const BSONObj& queryObj, const BSONObj& sortObj, const BSONObj& projectionObj) { // Get the full "namespace" name. const string& fullName = expCtx->ns.ns(); // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. intrusive_ptr pSource = DocumentSourceCursor::create(fullName, exec, expCtx); // Note the query, sort, and projection for explain. pSource->setQuery(queryObj); pSource->setSort(sortObj); if (deps.hasNoRequirements()) { pSource->shouldProduceEmptyDocs(); } if (!projectionObj.isEmpty()) { pSource->setProjection(projectionObj, boost::none); } else { // There may be fewer dependencies now if the sort was covered. if (!sortObj.isEmpty()) { deps = pipeline->getDependencies(queryObj); } pSource->setProjection(deps.toProjection(), deps.toParsedDeps()); } // Add the initial DocumentSourceCursor to the front of the pipeline. Then optimize again in // case the new stage can be absorbed with the first stages of the pipeline. pipeline->addInitialSource(pSource); pipeline->optimizePipeline(); // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We // deregister the PlanExecutor so that it can be registered with ClientCursor. exec->deregisterExec(); exec->saveState(); return exec; } } // namespace mongo