/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include "mongo/db/db_raii.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_summary_stats.h" namespace mongo { /** * Constructs and returns Documents from the BSONObj objects produced by a supplied PlanExecutor. */ class DocumentSourceCursor : public DocumentSource { public: static constexpr StringData kStageName = "$cursor"_sd; // virtuals from DocumentSource const char* getSourceName() const override; Value serialize(boost::optional explain = boost::none) const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, LookupRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; } boost::optional distributedPlanLogic() final { return boost::none; } void detachFromOperationContext() final; void reattachToOperationContext(OperationContext* opCtx) final; /** * Create a document source based on a passed-in PlanExecutor. 'exec' must be a yielding * PlanExecutor, and must be registered with the associated collection's CursorManager. */ static boost::intrusive_ptr create( Collection* collection, std::unique_ptr exec, const boost::intrusive_ptr& pExpCtx, bool trackOplogTimestamp = false); /** * If subsequent sources need no information from the cursor, the cursor can simply output empty * documents, avoiding the overhead of converting BSONObjs to Documents. */ void shouldProduceEmptyDocs() { _shouldProduceEmptyDocs = true; } Timestamp getLatestOplogTimestamp() const { return _latestOplogTimestamp; } const std::string& getPlanSummaryStr() const { return _planSummary; } const PlanSummaryStats& getPlanSummaryStats() const { return _planSummaryStats; } bool usedDisk() final { return _planSummaryStats.usedDisk; } protected: DocumentSourceCursor(Collection* collection, std::unique_ptr exec, const boost::intrusive_ptr& pExpCtx, bool trackOplogTimestamp = false); GetNextResult doGetNext() final; ~DocumentSourceCursor(); /** * Disposes of '_exec' if it hasn't been disposed already. This involves taking a collection * lock. */ void doDispose() final; /** * If '_shouldProduceEmptyDocs' is false, this function hook is called on each 'obj' returned by * '_exec' when loading a batch and returns a Document to be added to '_currentBatch'. * * The default implementation is the identity function. */ virtual Document transformDoc(Document&& doc) const { return std::move(doc); } private: /** * Acquires the appropriate locks, then destroys and de-registers '_exec'. '_exec' must be * non-null. */ void cleanupExecutor(); /** * Reads a batch of data from '_exec'. Subclasses can specify custom behavior to be performed on * each document by overloading transformBSONObjToDocument(). */ void loadBatch(); void recordPlanSummaryStats(); /** * If we are tailing the oplog, this method updates the cached timestamp to that of the latest * document returned, or the latest timestamp observed in the oplog if we have no more results. */ void _updateOplogTimestamp(); // Batches results returned from the underlying PlanExecutor. std::deque _currentBatch; bool _shouldProduceEmptyDocs = false; // The underlying query plan which feeds this pipeline. Must be destroyed while holding the // collection lock. std::unique_ptr _exec; // Status of the underlying executor, _exec. Used for explain queries if _exec produces an // error. Since _exec may not finish running (if there is a limit, for example), we store OK as // the default. Status _execStatus = Status::OK(); std::string _planSummary; PlanSummaryStats _planSummaryStats; // Used only for explain() queries. Stores the stats of the winning plan when _exec's root // stage is a MultiPlanStage. When the query is executed (with exec->executePlan()), it will // wipe out its own copy of the winning plan's statistics, so they need to be saved here. std::unique_ptr _winningPlanTrialStats; // True if we are tracking the latest observed oplog timestamp, false otherwise. bool _trackOplogTS = false; // If we are tailing the oplog and tracking the latest observed oplog time, this is the latest // timestamp seen in the collection. Otherwise, this is a null timestamp. Timestamp _latestOplogTimestamp; }; } // namespace mongo