/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" #include "mongo/db/commands/run_aggregate.h" #include #include #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/database.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/views/view.h" #include "mongo/db/views/view_catalog.h" #include "mongo/db/views/view_sharding_check.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "mongo/util/string_map.h" namespace mongo { using boost::intrusive_ptr; using std::endl; using std::shared_ptr; using std::string; using std::stringstream; using std::unique_ptr; using stdx::make_unique; namespace { /** * Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore * requests). Otherwise, returns false. The passed 'nsForCursor' is only used to determine the * namespace used in the returned cursor, which will be registered with the global cursor manager, * and thus will be different from that in 'request'. */ bool handleCursorCommand(OperationContext* opCtx, const NamespaceString& nsForCursor, ClientCursor* cursor, const AggregationRequest& request, BSONObjBuilder& result) { invariant(cursor); long long batchSize = request.getBatchSize(); // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. BSONArrayBuilder resultsArray; BSONObj next; for (int objCount = 0; objCount < batchSize; objCount++) { // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; break; } if (PlanExecutor::ADVANCED != state) { auto status = WorkingSetCommon::getMemberObjectStatus(next); uasserted(status.code(), "PlanExecutor error during aggregation: " + WorkingSetCommon::toStatusString(next)); } // If adding this object will cause us to exceed the message size limit, then we stash it // for later. if (!FindCommon::haveSpaceForNext(next, objCount, resultsArray.len())) { cursor->getExecutor()->enqueue(next); break; } resultsArray.append(next); } if (cursor) { // If a time limit was set on the pipeline, remaining time is "rolled over" to the // cursor (for use by future getmore ops). cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); CurOp::get(opCtx)->debug().cursorid = cursor->cursorid(); // Cursor needs to be in a saved state while we yield locks for getmore. State // will be restored in getMore(). cursor->getExecutor()->saveState(); cursor->getExecutor()->detachFromOperationContext(); } else { CurOp::get(opCtx)->debug().cursorExhausted = true; } const CursorId cursorId = cursor ? cursor->cursorid() : 0LL; appendCursorResponseObject(cursorId, nsForCursor.ns(), resultsArray.arr(), &result); return static_cast(cursor); } StatusWith> resolveInvolvedNamespaces( OperationContext* opCtx, const AggregationRequest& request) { // We intentionally do not drop and reacquire our DB lock after resolving the view definition in // order to prevent the definition for any view namespaces we've already resolved from changing. // This is necessary to prevent a cycle from being formed among the view definitions cached in // 'resolvedNamespaces' because we won't re-resolve a view namespace we've already encountered. AutoGetDb autoDb(opCtx, request.getNamespaceString().db(), MODE_IS); Database* const db = autoDb.getDb(); ViewCatalog* viewCatalog = db ? db->getViewCatalog() : nullptr; const LiteParsedPipeline liteParsedPipeline(request); const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces(); std::deque involvedNamespacesQueue(pipelineInvolvedNamespaces.begin(), pipelineInvolvedNamespaces.end()); StringMap resolvedNamespaces; while (!involvedNamespacesQueue.empty()) { auto involvedNs = std::move(involvedNamespacesQueue.front()); involvedNamespacesQueue.pop_front(); if (resolvedNamespaces.find(involvedNs.coll()) != resolvedNamespaces.end()) { continue; } if (!db || db->getCollection(opCtx, involvedNs)) { // If the database exists and 'involvedNs' refers to a collection namespace, then we // resolve it as an empty pipeline in order to read directly from the underlying // collection. If the database doesn't exist, then we still resolve it as an empty // pipeline because 'involvedNs' doesn't refer to a view namespace in our consistent // snapshot of the view catalog. resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector{}}; } else if (viewCatalog->lookup(opCtx, involvedNs.ns())) { // If 'involvedNs' refers to a view namespace, then we resolve its definition. auto resolvedView = viewCatalog->resolveView(opCtx, involvedNs); if (!resolvedView.isOK()) { return {ErrorCodes::FailedToParse, str::stream() << "Failed to resolve view '" << involvedNs.ns() << "': " << resolvedView.getStatus().toString()}; } resolvedNamespaces[involvedNs.coll()] = {resolvedView.getValue().getNamespace(), resolvedView.getValue().getPipeline()}; // We parse the pipeline corresponding to the resolved view in case we must resolve // other view namespaces that are also involved. LiteParsedPipeline resolvedViewLitePipeline( {resolvedView.getValue().getNamespace(), resolvedView.getValue().getPipeline()}); const auto& resolvedViewInvolvedNamespaces = resolvedViewLitePipeline.getInvolvedNamespaces(); involvedNamespacesQueue.insert(involvedNamespacesQueue.end(), resolvedViewInvolvedNamespaces.begin(), resolvedViewInvolvedNamespaces.end()); } else { // 'involvedNs' is neither a view nor a collection, so resolve it as an empty pipeline // to treat it as reading from a non-existent collection. resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector{}}; } } return resolvedNamespaces; } /** * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse(). * fasserts if it fails to parse after being serialized. */ std::unique_ptr reparsePipeline( const Pipeline* pipeline, const AggregationRequest& request, const boost::intrusive_ptr& expCtx) { auto serialized = pipeline->serialize(); // Convert vector to vector. std::vector parseableSerialization; parseableSerialization.reserve(serialized.size()); for (auto&& serializedStage : serialized) { invariant(serializedStage.getType() == BSONType::Object); parseableSerialization.push_back(serializedStage.getDocument().toBson()); } auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx); if (!reparsedPipeline.isOK()) { error() << "Aggregation command did not round trip through parsing and serialization " "correctly. Input pipeline: " << Value(request.getPipeline()) << ", serialized pipeline: " << Value(serialized); fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus()); } reparsedPipeline.getValue()->optimizePipeline(); return std::move(reparsedPipeline.getValue()); } /** * Returns Status::OK if each view namespace in 'pipeline' has a default collator equivalent to * 'collator'. Otherwise, returns ErrorCodes::OptionNotSupportedOnView. */ Status collatorCompatibleWithPipeline(OperationContext* opCtx, Database* db, const CollatorInterface* collator, const Pipeline* pipeline) { if (!db || !pipeline) { return Status::OK(); } for (auto&& potentialViewNs : pipeline->getInvolvedCollections()) { if (db->getCollection(opCtx, potentialViewNs)) { continue; } auto view = db->getViewCatalog()->lookup(opCtx, potentialViewNs.ns()); if (!view) { continue; } if (!CollatorInterface::collatorsMatch(view->defaultCollator(), collator)) { return {ErrorCodes::OptionNotSupportedOnView, str::stream() << "Cannot override default collation of view " << potentialViewNs.ns()}; } } return Status::OK(); } } // namespace Status runAggregate(OperationContext* opCtx, const NamespaceString& origNss, const AggregationRequest& request, const BSONObj& cmdObj, BSONObjBuilder& result) { // For operations on views, this will be the underlying namespace. const NamespaceString& nss = request.getNamespaceString(); // Parse the user-specified collation, if any. std::unique_ptr userSpecifiedCollator = request.getCollation().isEmpty() ? nullptr : uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(request.getCollation())); unique_ptr exec; boost::intrusive_ptr expCtx; Pipeline* unownedPipeline; auto curOp = CurOp::get(opCtx); { // This will throw if the sharding version for this connection is out of date. If the // namespace is a view, the lock will be released before re-running the aggregation. AutoGetCollectionOrViewForReadCommand ctx(opCtx, nss); Collection* collection = ctx.getCollection(); // If this is a view, resolve it by finding the underlying collection and stitching view // pipelines and this request's pipeline together. We then release our locks before // recursively calling runAggregate(), which will re-acquire locks on the underlying // collection. (The lock must be released because recursively acquiring locks on the // database will prohibit yielding.) const LiteParsedPipeline liteParsedPipeline(request); if (ctx.getView() && !liteParsedPipeline.startsWithCollStats()) { // Check that the default collation of 'view' is compatible with the operation's // collation. The check is skipped if the 'request' has the empty collation, which // means that no collation was specified. if (!request.getCollation().isEmpty()) { if (!CollatorInterface::collatorsMatch(ctx.getView()->defaultCollator(), userSpecifiedCollator.get())) { return {ErrorCodes::OptionNotSupportedOnView, "Cannot override a view's default collation"}; } } auto viewDefinition = ViewShardingCheck::getResolvedViewIfSharded(opCtx, ctx.getDb(), ctx.getView()); if (!viewDefinition.isOK()) { return viewDefinition.getStatus(); } if (!viewDefinition.getValue().isEmpty()) { return ViewShardingCheck::appendShardedViewResponse(viewDefinition.getValue(), &result); } auto resolvedView = ctx.getDb()->getViewCatalog()->resolveView(opCtx, nss); if (!resolvedView.isOK()) { return resolvedView.getStatus(); } auto collationSpec = ctx.getView()->defaultCollator() ? ctx.getView()->defaultCollator()->getSpec().toBSON().getOwned() : CollationSpec::kSimpleSpec; // With the view & collation resolved, we can relinquish locks. ctx.releaseLocksForView(); // Parse the resolved view into a new aggregation request. auto newRequest = resolvedView.getValue().asExpandedViewAggregation(request); newRequest.setCollation(collationSpec); auto newCmd = newRequest.serializeToCommandObj().toBson(); auto status = runAggregate(opCtx, origNss, newRequest, newCmd, result); { // Set the namespace of the curop back to the view namespace so ctx records // stats on this view namespace on destruction. stdx::lock_guard lk(*opCtx->getClient()); curOp->setNS_inlock(nss.ns()); } return status; } // Determine the appropriate collation to make the ExpressionContext. // If the pipeline does not have a user-specified collation, set it from the collection // default. Be careful to consult the original request BSON to check if a collation was // specified, since a specification of {locale: "simple"} will result in a null // collator. auto collatorToUse = std::move(userSpecifiedCollator); if (request.getCollation().isEmpty() && collection && collection->getDefaultCollator()) { invariant(!collatorToUse); collatorToUse = collection->getDefaultCollator()->clone(); } expCtx.reset( new ExpressionContext(opCtx, request, std::move(collatorToUse), uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)))); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; // Parse the pipeline. auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx); if (!statusWithPipeline.isOK()) { return statusWithPipeline.getStatus(); } auto pipeline = std::move(statusWithPipeline.getValue()); // Check that the view's collation matches the collation of any views involved // in the pipeline. auto pipelineCollationStatus = collatorCompatibleWithPipeline( opCtx, ctx.getDb(), expCtx->getCollator(), pipeline.get()); if (!pipelineCollationStatus.isOK()) { return pipelineCollationStatus; } pipeline->optimizePipeline(); if (kDebugBuild && !expCtx->explain && !expCtx->inShard) { // Make sure all operations round-trip through Pipeline::serialize() correctly by // re-parsing every command in debug builds. This is important because sharded // aggregations rely on this ability. Skipping when inShard because this has // already been through the transformation (and this un-sets expCtx->inShard). pipeline = reparsePipeline(pipeline.get(), request, expCtx); } // This does mongod-specific stuff like creating the input PlanExecutor and adding // it to the front of the pipeline if needed. PipelineD::prepareCursorSource(collection, &request, pipeline.get()); // Transfer ownership of the Pipeline to the PipelineProxyStage. unownedPipeline = pipeline.get(); auto ws = make_unique(); auto proxy = make_unique(opCtx, std::move(pipeline), ws.get()); // This PlanExecutor will simply forward requests to the Pipeline, so does not need to // yield or to be registered with any collection's CursorManager to receive invalidations. // The Pipeline may contain PlanExecutors which *are* yielding PlanExecutors and which *are* // registered with their respective collection's CursorManager auto statusWithPlanExecutor = PlanExecutor::make(opCtx, std::move(ws), std::move(proxy), nss, PlanExecutor::NO_YIELD); invariant(statusWithPlanExecutor.isOK()); exec = std::move(statusWithPlanExecutor.getValue()); { auto planSummary = Explain::getPlanSummary(exec.get()); stdx::lock_guard lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(std::move(planSummary)); } } // Having released the collection lock, we can now create a cursor that returns results from the // pipeline. This cursor owns no collection state, and thus we register it with the global // cursor manager. The global cursor manager does not deliver invalidations or kill // notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving // invalidations and kill notifications themselves, not the cursor we create here. auto pin = CursorManager::getGlobalCursorManager()->registerCursor( opCtx, {std::move(exec), origNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), cmdObj}); ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin); // If both explain and cursor are specified, explain wins. if (expCtx->explain) { result << "stages" << Value(unownedPipeline->writeExplainOps(*expCtx->explain)); } else { // Cursor must be specified, if explain is not. const bool keepCursor = handleCursorCommand(opCtx, origNss, pin.getCursor(), request, result); if (keepCursor) { cursorFreer.Dismiss(); } } if (!expCtx->explain) { PlanSummaryStats stats; Explain::getSummaryStats(*(pin.getCursor()->getExecutor()), &stats); curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; } // Any code that needs the cursor pinned must be inside the try block, above. return Status::OK(); } } // namespace mongo