/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" #include "mongo/db/commands/run_aggregate.h" #include #include #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/database.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/read_concern.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/views/view.h" #include "mongo/db/views/view_catalog.h" #include "mongo/db/views/view_sharding_check.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "mongo/util/string_map.h" namespace mongo { using boost::intrusive_ptr; using std::endl; using std::shared_ptr; using std::string; using std::stringstream; using std::unique_ptr; using stdx::make_unique; namespace { /** * Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore * requests). Otherwise, returns false. The passed 'nsForCursor' is only used to determine the * namespace used in the returned cursor, which will be registered with the global cursor manager, * and thus will be different from that in 'request'. */ bool handleCursorCommand(OperationContext* opCtx, const NamespaceString& nsForCursor, ClientCursor* cursor, const AggregationRequest& request, BSONObjBuilder& result) { invariant(cursor); long long batchSize = request.getBatchSize(); CursorResponseBuilder responseBuilder(true, &result); BSONObj next; for (int objCount = 0; objCount < batchSize; objCount++) { // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; try { state = cursor->getExecutor()->getNext(&next, nullptr); } catch (const ExceptionFor&) { // This exception is thrown when a $changeStream stage encounters an event // that invalidates the cursor. We should close the cursor and return without // error. cursor = nullptr; break; } if (state == PlanExecutor::IS_EOF) { responseBuilder.setLatestOplogTimestamp( cursor->getExecutor()->getLatestOplogTimestamp()); if (!cursor->isTailable()) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; } break; } if (PlanExecutor::ADVANCED != state) { uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(next).withContext( "PlanExecutor error during aggregation")); } // If adding this object will cause us to exceed the message size limit, then we stash it // for later. if (!FindCommon::haveSpaceForNext(next, objCount, responseBuilder.bytesUsed())) { cursor->getExecutor()->enqueue(next); break; } responseBuilder.setLatestOplogTimestamp(cursor->getExecutor()->getLatestOplogTimestamp()); responseBuilder.append(next); } if (cursor) { // If a time limit was set on the pipeline, remaining time is "rolled over" to the // cursor (for use by future getmore ops). cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); CurOp::get(opCtx)->debug().cursorid = cursor->cursorid(); // Cursor needs to be in a saved state while we yield locks for getmore. State // will be restored in getMore(). cursor->getExecutor()->saveState(); cursor->getExecutor()->detachFromOperationContext(); } else { CurOp::get(opCtx)->debug().cursorExhausted = true; } const CursorId cursorId = cursor ? cursor->cursorid() : 0LL; responseBuilder.done(cursorId, nsForCursor.ns()); return static_cast(cursor); } StatusWith> resolveInvolvedNamespaces( OperationContext* opCtx, const AggregationRequest& request) { const LiteParsedPipeline liteParsedPipeline(request); const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces(); // If there are no involved namespaces, return before attempting to take any locks. This is // important for collectionless aggregations, which may be expected to run without locking. if (pipelineInvolvedNamespaces.empty()) { return {StringMap()}; } // We intentionally do not drop and reacquire our DB lock after resolving the view definition in // order to prevent the definition for any view namespaces we've already resolved from changing. // This is necessary to prevent a cycle from being formed among the view definitions cached in // 'resolvedNamespaces' because we won't re-resolve a view namespace we've already encountered. AutoGetDb autoDb(opCtx, request.getNamespaceString().db(), MODE_IS); Database* const db = autoDb.getDb(); ViewCatalog* viewCatalog = db ? db->getViewCatalog() : nullptr; std::deque involvedNamespacesQueue(pipelineInvolvedNamespaces.begin(), pipelineInvolvedNamespaces.end()); StringMap resolvedNamespaces; while (!involvedNamespacesQueue.empty()) { auto involvedNs = std::move(involvedNamespacesQueue.front()); involvedNamespacesQueue.pop_front(); if (resolvedNamespaces.find(involvedNs.coll()) != resolvedNamespaces.end()) { continue; } if (!db || db->getCollection(opCtx, involvedNs)) { // If the database exists and 'involvedNs' refers to a collection namespace, then we // resolve it as an empty pipeline in order to read directly from the underlying // collection. If the database doesn't exist, then we still resolve it as an empty // pipeline because 'involvedNs' doesn't refer to a view namespace in our consistent // snapshot of the view catalog. resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector{}}; } else if (viewCatalog->lookup(opCtx, involvedNs.ns())) { // If 'involvedNs' refers to a view namespace, then we resolve its definition. auto resolvedView = viewCatalog->resolveView(opCtx, involvedNs); if (!resolvedView.isOK()) { return {ErrorCodes::FailedToParse, str::stream() << "Failed to resolve view '" << involvedNs.ns() << "': " << resolvedView.getStatus().toString()}; } resolvedNamespaces[involvedNs.coll()] = {resolvedView.getValue().getNamespace(), resolvedView.getValue().getPipeline()}; // We parse the pipeline corresponding to the resolved view in case we must resolve // other view namespaces that are also involved. LiteParsedPipeline resolvedViewLitePipeline( {resolvedView.getValue().getNamespace(), resolvedView.getValue().getPipeline()}); const auto& resolvedViewInvolvedNamespaces = resolvedViewLitePipeline.getInvolvedNamespaces(); involvedNamespacesQueue.insert(involvedNamespacesQueue.end(), resolvedViewInvolvedNamespaces.begin(), resolvedViewInvolvedNamespaces.end()); } else { // 'involvedNs' is neither a view nor a collection, so resolve it as an empty pipeline // to treat it as reading from a non-existent collection. resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector{}}; } } return resolvedNamespaces; } /** * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse(). * fasserts if it fails to parse after being serialized. */ std::unique_ptr reparsePipeline( const Pipeline* pipeline, const AggregationRequest& request, const boost::intrusive_ptr& expCtx) { auto serialized = pipeline->serialize(); // Convert vector to vector. std::vector parseableSerialization; parseableSerialization.reserve(serialized.size()); for (auto&& serializedStage : serialized) { invariant(serializedStage.getType() == BSONType::Object); parseableSerialization.push_back(serializedStage.getDocument().toBson()); } auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx); if (!reparsedPipeline.isOK()) { error() << "Aggregation command did not round trip through parsing and serialization " "correctly. Input pipeline: " << Value(request.getPipeline()) << ", serialized pipeline: " << Value(serialized); fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus()); } reparsedPipeline.getValue()->optimizePipeline(); return std::move(reparsedPipeline.getValue()); } /** * Returns Status::OK if each view namespace in 'pipeline' has a default collator equivalent to * 'collator'. Otherwise, returns ErrorCodes::OptionNotSupportedOnView. */ Status collatorCompatibleWithPipeline(OperationContext* opCtx, Database* db, const CollatorInterface* collator, const Pipeline* pipeline) { if (!db || !pipeline) { return Status::OK(); } for (auto&& potentialViewNs : pipeline->getInvolvedCollections()) { if (db->getCollection(opCtx, potentialViewNs)) { continue; } auto view = db->getViewCatalog()->lookup(opCtx, potentialViewNs.ns()); if (!view) { continue; } if (!CollatorInterface::collatorsMatch(view->defaultCollator(), collator)) { return {ErrorCodes::OptionNotSupportedOnView, str::stream() << "Cannot override default collation of view " << potentialViewNs.ns()}; } } return Status::OK(); } /** * Resolves the collator to either the user-specified collation or, if none was specified, to the * collection-default collation. */ std::unique_ptr resolveCollator(OperationContext* opCtx, const AggregationRequest& request, const Collection* collection) { if (!request.getCollation().isEmpty()) { return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(request.getCollation())); } return (collection && collection->getDefaultCollator() ? collection->getDefaultCollator()->clone() : nullptr); } } // namespace Status runAggregate(OperationContext* opCtx, const NamespaceString& origNss, const AggregationRequest& request, const BSONObj& cmdObj, BSONObjBuilder& result) { // For operations on views, this will be the underlying namespace. NamespaceString nss = request.getNamespaceString(); // The collation to use for this aggregation. boost::optional to distinguish between the case // where the collation has not yet been resolved, and where it has been resolved to nullptr. boost::optional> collatorToUse; unique_ptr exec; boost::intrusive_ptr expCtx; Pipeline* unownedPipeline; auto curOp = CurOp::get(opCtx); { const LiteParsedPipeline liteParsedPipeline(request); // Check whether the parsed pipeline supports the given read concern. liteParsedPipeline.assertSupportsReadConcern(opCtx, request.getExplain()); if (liteParsedPipeline.hasChangeStream()) { nss = NamespaceString::kRsOplogNamespace; // If the read concern is not specified, upgrade to 'majority' and wait to make sure we // have a snapshot available. if (!repl::ReadConcernArgs::get(opCtx).hasLevel()) { const repl::ReadConcernArgs readConcern( repl::ReadConcernLevel::kMajorityReadConcern); uassertStatusOK(waitForReadConcern(opCtx, readConcern, true)); } if (origNss.isCollectionlessAggregateNS()) { // If the change stream is opened against all collections in a database which does // not exist yet, go ahead and create it. Use MODE_IX since the AutoGetOrCreateDb // helper will automatically reacquire as MODE_X if the database does not exist. AutoGetOrCreateDb dbLock(opCtx, origNss.db(), MODE_IX); invariant(dbLock.getDb()); } else { // Change streams can only be run against collections; // AutoGetCollectionForReadCommand will raise an error if the given namespace is a // view. A change stream may be opened on a namespace before the associated // collection is created, but only if the database already exists. If the // $changeStream was sent from mongoS then the database exists at the cluster level // even if not yet present on this shard, so we allow the $changeStream to run. AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss); // Resolve the collator to either the user-specified collation or the default // collation of the collection on which $changeStream was invoked, so that we do not // end up resolving the collation on the oplog. invariant(!collatorToUse); if (!origNssCtx.getDb() && !request.isFromMongos()) { AutoGetOrCreateDb dbLock(opCtx, origNss.db(), MODE_X); invariant(dbLock.getDb()); } Collection* origColl = origNssCtx.getCollection(); collatorToUse.emplace(resolveCollator(opCtx, request, origColl)); } } const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces(); // If emplaced, AutoGetCollectionForReadCommand will throw if the sharding version for this // connection is out of date. If the namespace is a view, the lock will be released before // re-running the expanded aggregation. boost::optional ctx; // If this is a collectionless aggregation, we won't create 'ctx' but will still need an // AutoStatsTracker to record CurOp and Top entries. boost::optional statsTracker; // If this is a collectionless aggregation with no foreign namespaces, we don't want to // acquire any locks. Otherwise, lock the collection or view. if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) { statsTracker.emplace(opCtx, nss, Top::LockType::NotLocked, 0); } else { ctx.emplace(opCtx, nss, AutoGetCollection::ViewMode::kViewsPermitted); } Collection* collection = ctx ? ctx->getCollection() : nullptr; // The collator may already have been set if this is a $changeStream pipeline. If not, // resolve the collator to either the user-specified collation or the collection default. if (!collatorToUse) { collatorToUse.emplace(resolveCollator(opCtx, request, collection)); } // If this is a view, resolve it by finding the underlying collection and stitching view // pipelines and this request's pipeline together. We then release our locks before // recursively calling runAggregate(), which will re-acquire locks on the underlying // collection. (The lock must be released because recursively acquiring locks on the // database will prohibit yielding.) if (ctx && ctx->getView() && !liteParsedPipeline.startsWithCollStats()) { invariant(nss != NamespaceString::kRsOplogNamespace); invariant(!nss.isCollectionlessAggregateNS()); // Check that the default collation of 'view' is compatible with the operation's // collation. The check is skipped if the request did not specify a collation. if (!request.getCollation().isEmpty()) { invariant(collatorToUse); // Should already be resolved at this point. if (!CollatorInterface::collatorsMatch(ctx->getView()->defaultCollator(), collatorToUse->get())) { return {ErrorCodes::OptionNotSupportedOnView, "Cannot override a view's default collation"}; } } ViewShardingCheck::throwResolvedViewIfSharded(opCtx, ctx->getDb(), ctx->getView()); auto resolvedView = ctx->getDb()->getViewCatalog()->resolveView(opCtx, nss); if (!resolvedView.isOK()) { return resolvedView.getStatus(); } // With the view & collation resolved, we can relinquish locks. ctx.reset(); // Parse the resolved view into a new aggregation request. auto newRequest = resolvedView.getValue().asExpandedViewAggregation(request); auto newCmd = newRequest.serializeToCommandObj().toBson(); auto status = runAggregate(opCtx, origNss, newRequest, newCmd, result); { // Set the namespace of the curop back to the view namespace so ctx records // stats on this view namespace on destruction. stdx::lock_guard lk(*opCtx->getClient()); curOp->setNS_inlock(nss.ns()); } return status; } invariant(collatorToUse); expCtx.reset( new ExpressionContext(opCtx, request, std::move(*collatorToUse), std::make_shared(opCtx), uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)))); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; auto session = OperationContextSession::get(opCtx); expCtx->inSnapshotReadOrMultiDocumentTransaction = session && session->inSnapshotReadOrMultiDocumentTransaction(); auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); // Check that the view's collation matches the collation of any views involved in the // pipeline. if (!pipelineInvolvedNamespaces.empty()) { invariant(ctx); auto pipelineCollationStatus = collatorCompatibleWithPipeline( opCtx, ctx->getDb(), expCtx->getCollator(), pipeline.get()); if (!pipelineCollationStatus.isOK()) { return pipelineCollationStatus; } } pipeline->optimizePipeline(); if (kDebugBuild && !expCtx->explain && !expCtx->fromMongos) { // Make sure all operations round-trip through Pipeline::serialize() correctly by // re-parsing every command in debug builds. This is important because sharded // aggregations rely on this ability. Skipping when fromMongos because this has // already been through the transformation (and this un-sets expCtx->fromMongos). pipeline = reparsePipeline(pipeline.get(), request, expCtx); } // Prepare a PlanExecutor to provide input into the pipeline, if needed. if (liteParsedPipeline.hasChangeStream()) { // If we are using a change stream, the cursor stage should have a simple collation, // regardless of what the user's collation was. std::unique_ptr collatorForCursor = nullptr; auto collatorStash = expCtx->temporarilyChangeCollator(std::move(collatorForCursor)); PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get()); } else { PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get()); } // Optimize again, since there may be additional optimizations that can be done after adding // the initial cursor stage. Note this has to be done outside the above blocks to ensure // this process uses the correct collation if it does any string comparisons. pipeline->optimizePipeline(); // Transfer ownership of the Pipeline to the PipelineProxyStage. unownedPipeline = pipeline.get(); auto ws = make_unique(); auto proxy = make_unique(opCtx, std::move(pipeline), ws.get()); // This PlanExecutor will simply forward requests to the Pipeline, so does not need to // yield or to be registered with any collection's CursorManager to receive invalidations. // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are* // registered with their respective collection's CursorManager auto statusWithPlanExecutor = PlanExecutor::make(opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD); invariant(statusWithPlanExecutor.isOK()); exec = std::move(statusWithPlanExecutor.getValue()); { auto planSummary = Explain::getPlanSummary(exec.get()); stdx::lock_guard lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(std::move(planSummary)); } } // Having released the collection lock, we can now create a cursor that returns results from the // pipeline. This cursor owns no collection state, and thus we register it with the global // cursor manager. The global cursor manager does not deliver invalidations or kill // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving // invalidations and kill notifications themselves, not the cursor we create here. ClientCursorParams cursorParams( std::move(exec), origNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), opCtx->recoveryUnit()->getReadConcernLevel(), cmdObj); if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { cursorParams.setTailable(true); cursorParams.setAwaitData(true); } auto pin = CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams)); ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin); // If both explain and cursor are specified, explain wins. if (expCtx->explain) { Explain::explainPipelineExecutor( pin.getCursor()->getExecutor(), *(expCtx->explain), &result); } else { // Cursor must be specified, if explain is not. const bool keepCursor = handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result); if (keepCursor) { opCtx->setStashedCursor(); cursorFreer.Dismiss(); } } if (!expCtx->explain) { PlanSummaryStats stats; Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats); curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; } // Any code that needs the cursor pinned must be inside the try block, above. return Status::OK(); } } // namespace mongo