/**
* Copyright (c) 2012-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/index_iterator.h"
#include "mongo/db/exec/multi_iterator.h"
#include "mongo/db/exec/shard_filter.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/s/chunk_version.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
namespace mongo {
using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
namespace {
class MongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface {
public:
MongodImplementation(const intrusive_ptr& ctx)
: _ctx(ctx), _client(ctx->opCtx) {}
void setOperationContext(OperationContext* opCtx) {
invariant(_ctx->opCtx == opCtx);
_client.setOpCtx(opCtx);
}
DBClientBase* directClient() final {
return &_client;
}
bool isSharded(const NamespaceString& nss) final {
AutoGetCollectionForReadCommand autoColl(_ctx->opCtx, nss);
// TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
// state.
auto css = CollectionShardingState::get(_ctx->opCtx, nss);
return bool(css->getMetadata());
}
BSONObj insert(const NamespaceString& ns, const std::vector& objs) final {
boost::optional maybeDisableValidation;
if (_ctx->bypassDocumentValidation)
maybeDisableValidation.emplace(_ctx->opCtx);
_client.insert(ns.ns(), objs);
return _client.getLastErrorDetailed();
}
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) final {
AutoGetCollectionForReadCommand autoColl(opCtx, ns);
Collection* collection = autoColl.getCollection();
if (!collection) {
LOG(2) << "Collection not found on index stats retrieval: " << ns.ns();
return CollectionIndexUsageMap();
}
return collection->infoCache()->getIndexUsageStats();
}
void appendLatencyStats(const NamespaceString& nss,
bool includeHistograms,
BSONObjBuilder* builder) const final {
Top::get(_ctx->opCtx->getServiceContext())
.appendLatencyStats(nss.ns(), includeHistograms, builder);
}
Status appendStorageStats(const NamespaceString& nss,
const BSONObj& param,
BSONObjBuilder* builder) const final {
return appendCollectionStorageStats(_ctx->opCtx, nss, param, builder);
}
Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const final {
return appendCollectionRecordCount(_ctx->opCtx, nss, builder);
}
BSONObj getCollectionOptions(const NamespaceString& nss) final {
const auto infos =
_client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
}
Status renameIfOptionsAndIndexesHaveNotChanged(
const BSONObj& renameCommandObj,
const NamespaceString& targetNs,
const BSONObj& originalCollectionOptions,
const std::list& originalIndexes) final {
Lock::GlobalWrite globalLock(_ctx->opCtx);
if (SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions !=
getCollectionOptions(targetNs))) {
return {ErrorCodes::CommandFailed,
str::stream() << "collection options of target collection " << targetNs.ns()
<< " changed during processing. Original options: "
<< originalCollectionOptions
<< ", new options: "
<< getCollectionOptions(targetNs)};
}
auto currentIndexes = _client.getIndexSpecs(targetNs.ns());
if (originalIndexes.size() != currentIndexes.size() ||
!std::equal(originalIndexes.begin(),
originalIndexes.end(),
currentIndexes.begin(),
SimpleBSONObjComparator::kInstance.makeEqualTo())) {
return {ErrorCodes::CommandFailed,
str::stream() << "indexes of target collection " << targetNs.ns()
<< " changed during processing."};
}
BSONObj info;
bool ok = _client.runCommand("admin", renameCommandObj, info);
return ok ? Status::OK() : Status{ErrorCodes::CommandFailed,
str::stream() << "renameCollection failed: " << info};
}
StatusWith> makePipeline(
const std::vector& rawPipeline,
const boost::intrusive_ptr& expCtx) final {
// 'expCtx' may represent the settings for an aggregation pipeline on a different namespace
// than the DocumentSource this MongodImplementation is injected into, but both
// ExpressionContext instances should still have the same OperationContext.
invariant(_ctx->opCtx == expCtx->opCtx);
auto pipeline = Pipeline::parse(rawPipeline, expCtx);
if (!pipeline.isOK()) {
return pipeline.getStatus();
}
pipeline.getValue()->optimizePipeline();
AutoGetCollectionForReadCommand autoColl(expCtx->opCtx, expCtx->ns);
// makePipeline() is only called to perform secondary aggregation requests and expects the
// collection representing the document source to be not-sharded. We confirm sharding state
// here to avoid taking a collection lock elsewhere for this purpose alone.
// TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor
// until after we release the lock, leaving room for a collection to be sharded inbetween.
// TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
// state.
auto css = CollectionShardingState::get(_ctx->opCtx, expCtx->ns);
uassert(4567, "from collection cannot be sharded", !bool(css->getMetadata()));
PipelineD::prepareCursorSource(
autoColl.getCollection(), expCtx->ns, nullptr, pipeline.getValue().get());
return pipeline;
}
std::vector getCurrentOps(CurrentOpConnectionsMode connMode,
CurrentOpUserMode userMode,
CurrentOpTruncateMode truncateMode) const {
AuthorizationSession* ctxAuth = AuthorizationSession::get(_ctx->opCtx->getClient());
const std::string hostName = getHostNameCachedAndPort();
std::vector ops;
for (ServiceContext::LockedClientsCursor cursor(
_ctx->opCtx->getClient()->getServiceContext());
Client* client = cursor.next();) {
invariant(client);
stdx::lock_guard lk(*client);
// If auth is disabled, ignore the allUsers parameter.
if (ctxAuth->getAuthorizationManager().isAuthEnabled() &&
userMode == CurrentOpUserMode::kExcludeOthers &&
!ctxAuth->isCoauthorizedWithClient(client)) {
continue;
}
const OperationContext* clientOpCtx = client->getOperationContext();
if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) {
continue;
}
BSONObjBuilder infoBuilder;
infoBuilder.append("host", hostName);
client->reportState(infoBuilder);
const auto& clientMetadata =
ClientMetadataIsMasterState::get(client).getClientMetadata();
if (clientMetadata) {
auto appName = clientMetadata.get().getApplicationName();
if (!appName.empty()) {
infoBuilder.append("appName", appName);
}
auto clientMetadataDocument = clientMetadata.get().getDocument();
infoBuilder.append("clientMetadata", clientMetadataDocument);
}
// Fill out the rest of the BSONObj with opCtx specific details.
infoBuilder.appendBool("active", static_cast(clientOpCtx));
infoBuilder.append(
"currentOpTime",
_ctx->opCtx->getServiceContext()->getPreciseClockSource()->now().toString());
if (clientOpCtx) {
infoBuilder.append("opid", clientOpCtx->getOpID());
if (clientOpCtx->isKillPending()) {
infoBuilder.append("killPending", true);
}
CurOp::get(clientOpCtx)
->reportState(&infoBuilder,
(truncateMode == CurrentOpTruncateMode::kTruncateOps));
Locker::LockerInfo lockerInfo;
clientOpCtx->lockState()->getLockerInfo(&lockerInfo);
fillLockerInfo(lockerInfo, infoBuilder);
}
ops.emplace_back(infoBuilder.obj());
}
return ops;
}
std::string getShardName(OperationContext* opCtx) const {
if (ShardingState::get(opCtx)->enabled()) {
return ShardingState::get(opCtx)->getShardName();
}
return std::string();
}
private:
intrusive_ptr _ctx;
DBDirectClient _client;
};
/**
* Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {}
* if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough
* percentage of the collection.
*/
StatusWith> createRandomCursorExecutor(
Collection* collection, OperationContext* opCtx, long long sampleSize, long long numRecords) {
double kMaxSampleRatioForRandCursor = 0.05;
if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) {
return {nullptr};
}
// Attempt to get a random cursor from the RecordStore. If the RecordStore does not support
// random cursors, attempt to get one from the _id index.
std::unique_ptr rsRandCursor =
collection->getRecordStore()->getRandomCursor(opCtx);
auto ws = stdx::make_unique();
std::unique_ptr stage;
if (rsRandCursor) {
stage = stdx::make_unique(opCtx, ws.get(), collection);
static_cast(stage.get())->addIterator(std::move(rsRandCursor));
} else {
auto indexCatalog = collection->getIndexCatalog();
auto indexDescriptor = indexCatalog->findIdIndex(opCtx);
if (!indexDescriptor) {
// There was no _id index.
return {nullptr};
}
IndexAccessMethod* idIam = indexCatalog->getIndex(indexDescriptor);
auto idxRandCursor = idIam->newRandomCursor(opCtx);
if (!idxRandCursor) {
// Storage engine does not support any type of random cursor.
return {nullptr};
}
auto idxIterator = stdx::make_unique(opCtx,
ws.get(),
collection,
idIam,
indexDescriptor->keyPattern(),
std::move(idxRandCursor));
stage = stdx::make_unique(
opCtx, ws.get(), idxIterator.release(), nullptr, collection);
}
{
AutoGetCollectionForRead autoColl(opCtx, collection->ns());
// If we're in a sharded environment, we need to filter out documents we don't own.
if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) {
auto shardFilterStage = stdx::make_unique(
opCtx,
CollectionShardingState::get(opCtx, collection->ns())->getMetadata(),
ws.get(),
stage.release());
return PlanExecutor::make(opCtx,
std::move(ws),
std::move(shardFilterStage),
collection,
PlanExecutor::YIELD_AUTO);
}
}
return PlanExecutor::make(
opCtx, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO);
}
StatusWith> attemptToGetExecutor(
OperationContext* opCtx,
Collection* collection,
const NamespaceString& nss,
const intrusive_ptr& pExpCtx,
BSONObj queryObj,
BSONObj projectionObj,
BSONObj sortObj,
const AggregationRequest* aggRequest,
const size_t plannerOpts) {
auto qr = stdx::make_unique(nss);
switch (pExpCtx->tailableMode) {
case ExpressionContext::TailableMode::kNormal:
break;
case ExpressionContext::TailableMode::kTailableAndAwaitData:
qr->setTailable(true);
qr->setAwaitData(true);
break;
}
qr->setFilter(queryObj);
qr->setProj(projectionObj);
qr->setSort(sortObj);
if (aggRequest) {
qr->setExplain(static_cast(aggRequest->getExplain()));
qr->setHint(aggRequest->getHint());
}
// If the pipeline has a non-null collator, set the collation option to the result of
// serializing the collator's spec back into BSON. We do this in order to fill in all options
// that the user omitted.
//
// If pipeline has a null collator (representing the "simple" collation), we simply set the
// collation option to the original user BSON, which is either the empty object (unspecified),
// or the specification for the "simple" collation.
qr->setCollation(pExpCtx->getCollator() ? pExpCtx->getCollator()->getSpec().toBSON()
: pExpCtx->collation);
const ExtensionsCallbackReal extensionsCallback(pExpCtx->opCtx, &nss);
auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback);
if (!cq.isOK()) {
// Return an error instead of uasserting, since there are cases where the combination of
// sort and projection will result in a bad query, but when we try with a different
// combination it will be ok. e.g. a sort by {$meta: 'textScore'}, without any projection
// will fail, but will succeed when the corresponding '$meta' projection is passed in
// another attempt.
return {cq.getStatus()};
}
return getExecutor(
opCtx, collection, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts);
}
} // namespace
void PipelineD::prepareCursorSource(Collection* collection,
const NamespaceString& nss,
const AggregationRequest* aggRequest,
Pipeline* pipeline) {
auto expCtx = pipeline->getContext();
// We will be modifying the source vector as we go.
Pipeline::SourceContainer& sources = pipeline->_sources;
// Inject a MongodImplementation to sources that need them.
for (auto&& source : sources) {
DocumentSourceNeedsMongod* needsMongod =
dynamic_cast(source.get());
if (needsMongod) {
needsMongod->injectMongodInterface(std::make_shared(expCtx));
}
}
if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) {
return;
}
// We are going to generate an input cursor, so we need to be holding the collection lock.
dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IS));
if (!sources.empty()) {
auto sampleStage = dynamic_cast(sources.front().get());
// Optimize an initial $sample stage if possible.
if (collection && sampleStage) {
const long long sampleSize = sampleStage->getSampleSize();
const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx);
auto exec = uassertStatusOK(
createRandomCursorExecutor(collection, expCtx->opCtx, sampleSize, numRecords));
if (exec) {
// Replace $sample stage with $sampleFromRandomCursor stage.
sources.pop_front();
std::string idString = collection->ns().isOplog() ? "ts" : "_id";
sources.emplace_front(DocumentSourceSampleFromRandomCursor::create(
expCtx, sampleSize, idString, numRecords));
addCursorSource(
collection,
pipeline,
expCtx,
std::move(exec),
pipeline->getDependencies(DepsTracker::MetadataAvailable::kNoMetadata));
return;
}
}
}
// Look for an initial match. This works whether we got an initial query or not. If not, it
// results in a "{}" query, which will be what we want in that case.
const BSONObj queryObj = pipeline->getInitialQuery();
if (!queryObj.isEmpty()) {
if (dynamic_cast(sources.front().get())) {
// If a $match query is pulled into the cursor, the $match is redundant, and can be
// removed from the pipeline.
sources.pop_front();
} else {
// A $geoNear stage, the only other stage that can produce an initial query, is also
// a valid initial stage and will be handled above.
MONGO_UNREACHABLE;
}
}
// Find the set of fields in the source documents depended on by this pipeline.
DepsTracker deps = pipeline->getDependencies(DocumentSourceMatch::isTextQuery(queryObj)
? DepsTracker::MetadataAvailable::kTextScore
: DepsTracker::MetadataAvailable::kNoMetadata);
BSONObj projForQuery = deps.toProjection();
/*
Look for an initial sort; we'll try to add this to the
Cursor we create. If we're successful in doing that (further down),
we'll remove the $sort from the pipeline, because the documents
will already come sorted in the specified order as a result of the
index scan.
*/
intrusive_ptr sortStage;
BSONObj sortObj;
if (!sources.empty()) {
sortStage = dynamic_cast(sources.front().get());
if (sortStage) {
// build the sort key
sortObj = sortStage->serializeSortKey(/*explain*/ false).toBson();
}
}
// Create the PlanExecutor.
auto exec = uassertStatusOK(prepareExecutor(expCtx->opCtx,
collection,
nss,
pipeline,
expCtx,
sortStage,
deps,
queryObj,
aggRequest,
&sortObj,
&projForQuery));
if (!projForQuery.isEmpty() && !sources.empty()) {
// Check for redundant $project in query with the same specification as the inclusion
// projection generated by the dependency optimization.
auto proj =
dynamic_cast(sources.front().get());
if (proj && proj->isSubsetOfProjection(projForQuery)) {
sources.pop_front();
}
}
addCursorSource(
collection, pipeline, expCtx, std::move(exec), deps, queryObj, sortObj, projForQuery);
}
StatusWith> PipelineD::prepareExecutor(
OperationContext* opCtx,
Collection* collection,
const NamespaceString& nss,
Pipeline* pipeline,
const intrusive_ptr& expCtx,
const intrusive_ptr& sortStage,
const DepsTracker& deps,
const BSONObj& queryObj,
const AggregationRequest* aggRequest,
BSONObj* sortObj,
BSONObj* projectionObj) {
// The query system has the potential to use an index to provide a non-blocking sort and/or to
// use the projection to generate a covered plan. If this is possible, it is more efficient to
// let the query system handle those parts of the pipeline. If not, it is more efficient to use
// a $sort and/or a ParsedDeps object. Thus, we will determine whether the query system can
// provide a non-blocking sort or a covered projection before we commit to a PlanExecutor.
//
// To determine if the query system can provide a non-blocking sort, we pass the
// NO_BLOCKING_SORT planning option, meaning 'getExecutor' will not produce a PlanExecutor if
// the query system would use a blocking sort stage.
//
// To determine if the query system can provide a covered projection, we pass the
// NO_UNCOVERED_PROJECTS planning option, meaning 'getExecutor' will not produce a PlanExecutor
// if the query system would need to fetch the document to do the projection. The following
// logic uses the above strategies, with multiple calls to 'attemptToGetExecutor' to determine
// the most efficient way to handle the $sort and $project stages.
//
// LATER - We should attempt to determine if the results from the query are returned in some
// order so we can then apply other optimizations there are tickets for, such as SERVER-4507.
size_t plannerOpts = QueryPlannerParams::DEFAULT | QueryPlannerParams::NO_BLOCKING_SORT;
// If we are connecting directly to the shard rather than through a mongos, don't filter out
// orphaned documents.
if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) {
plannerOpts |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
if (deps.hasNoRequirements()) {
// If we don't need any fields from the input document, performing a count is faster, and
// will output empty documents, which is okay.
plannerOpts |= QueryPlannerParams::IS_COUNT;
}
// The only way to get a text score is to let the query system handle the projection. In all
// other cases, unless the query system can do an index-covered projection and avoid going to
// the raw record at all, it is faster to have ParsedDeps filter the fields we need.
if (!deps.getNeedTextScore()) {
plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS;
}
BSONObj emptyProjection;
if (sortStage) {
// See if the query system can provide a non-blocking sort.
auto swExecutorSort = attemptToGetExecutor(opCtx,
collection,
nss,
expCtx,
queryObj,
emptyProjection,
*sortObj,
aggRequest,
plannerOpts);
if (swExecutorSort.isOK()) {
// Success! Now see if the query system can also cover the projection.
auto swExecutorSortAndProj = attemptToGetExecutor(opCtx,
collection,
nss,
expCtx,
queryObj,
*projectionObj,
*sortObj,
aggRequest,
plannerOpts);
std::unique_ptr exec;
if (swExecutorSortAndProj.isOK()) {
// Success! We have a non-blocking sort and a covered projection.
exec = std::move(swExecutorSortAndProj.getValue());
} else if (swExecutorSortAndProj == ErrorCodes::QueryPlanKilled) {
return {ErrorCodes::OperationFailed,
str::stream() << "Failed to determine whether query system can provide a "
"covered projection in addition to a non-blocking sort: "
<< swExecutorSortAndProj.getStatus().toString()};
} else {
// The query system couldn't cover the projection.
*projectionObj = BSONObj();
exec = std::move(swExecutorSort.getValue());
}
// We know the sort is being handled by the query system, so remove the $sort stage.
pipeline->_sources.pop_front();
if (sortStage->getLimitSrc()) {
// We need to reinsert the coalesced $limit after removing the $sort.
pipeline->_sources.push_front(sortStage->getLimitSrc());
}
return std::move(exec);
} else if (swExecutorSort == ErrorCodes::QueryPlanKilled) {
return {
ErrorCodes::OperationFailed,
str::stream()
<< "Failed to determine whether query system can provide a non-blocking sort: "
<< swExecutorSort.getStatus().toString()};
}
// The query system can't provide a non-blocking sort.
*sortObj = BSONObj();
}
// Either there was no $sort stage, or the query system could not provide a non-blocking
// sort.
dassert(sortObj->isEmpty());
// See if the query system can cover the projection.
auto swExecutorProj = attemptToGetExecutor(opCtx,
collection,
nss,
expCtx,
queryObj,
*projectionObj,
*sortObj,
aggRequest,
plannerOpts);
if (swExecutorProj.isOK()) {
// Success! We have a covered projection.
return std::move(swExecutorProj.getValue());
} else if (swExecutorProj == ErrorCodes::QueryPlanKilled) {
return {ErrorCodes::OperationFailed,
str::stream()
<< "Failed to determine whether query system can provide a covered projection: "
<< swExecutorProj.getStatus().toString()};
}
// The query system couldn't provide a covered projection.
*projectionObj = BSONObj();
// If this doesn't work, nothing will.
return attemptToGetExecutor(opCtx,
collection,
nss,
expCtx,
queryObj,
*projectionObj,
*sortObj,
aggRequest,
plannerOpts);
}
void PipelineD::addCursorSource(Collection* collection,
Pipeline* pipeline,
const intrusive_ptr& expCtx,
unique_ptr exec,
DepsTracker deps,
const BSONObj& queryObj,
const BSONObj& sortObj,
const BSONObj& projectionObj) {
// DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved.
exec->saveState();
// Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
intrusive_ptr pSource =
DocumentSourceCursor::create(collection, std::move(exec), expCtx);
// Note the query, sort, and projection for explain.
pSource->setQuery(queryObj);
pSource->setSort(sortObj);
if (deps.hasNoRequirements()) {
pSource->shouldProduceEmptyDocs();
}
if (!projectionObj.isEmpty()) {
pSource->setProjection(projectionObj, boost::none);
} else {
// There may be fewer dependencies now if the sort was covered.
if (!sortObj.isEmpty()) {
deps = pipeline->getDependencies(DocumentSourceMatch::isTextQuery(queryObj)
? DepsTracker::MetadataAvailable::kTextScore
: DepsTracker::MetadataAvailable::kNoMetadata);
}
pSource->setProjection(deps.toProjection(), deps.toParsedDeps());
}
// Add the initial DocumentSourceCursor to the front of the pipeline. Then optimize again in
// case the new stage can be absorbed with the first stages of the pipeline.
pipeline->addInitialSource(pSource);
pipeline->optimizePipeline();
}
std::string PipelineD::getPlanSummaryStr(const Pipeline* pPipeline) {
if (auto docSourceCursor =
dynamic_cast(pPipeline->_sources.front().get())) {
return docSourceCursor->getPlanSummaryStr();
}
return "";
}
void PipelineD::getPlanSummaryStats(const Pipeline* pPipeline, PlanSummaryStats* statsOut) {
invariant(statsOut);
if (auto docSourceCursor =
dynamic_cast(pPipeline->_sources.front().get())) {
*statsOut = docSourceCursor->getPlanSummaryStats();
}
bool hasSortStage{false};
for (auto&& source : pPipeline->_sources) {
if (dynamic_cast(source.get())) {
hasSortStage = true;
break;
}
}
statsOut->hasSortStage = hasSortStage;
}
} // namespace mongo