diff options
author | David Storch <david.storch@mongodb.com> | 2020-07-09 20:07:55 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-24 16:31:37 +0000 |
commit | b4b35f9cc69412611a198642333bf40daa5ba58c (patch) | |
tree | 909673b812a499a60692c46abb53853f7df42b48 /src/mongo/db/pipeline | |
parent | 5e53ee3ca0a90eb98cdab94b298dec810fb46804 (diff) | |
download | mongo-b4b35f9cc69412611a198642333bf40daa5ba58c.tar.gz |
SERVER-48478 Replace PipelineProxyStage with PlanExecutorPipeline
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/plan_executor_pipeline.cpp | 206 | ||||
-rw-r--r-- | src/mongo/db/pipeline/plan_executor_pipeline.h | 186 |
4 files changed, 402 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 9c8389594b9..7c530ef0663 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -195,7 +195,7 @@ void DocumentSourceCursor::_updateOplogTimestamp() { void DocumentSourceCursor::recordPlanSummaryStats() { invariant(_exec); - Explain::getSummaryStats(*_exec, &_planSummaryStats); + _exec->getSummaryStats(&_planSummaryStats); } Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> verbosity) const { @@ -300,7 +300,7 @@ DocumentSourceCursor::DocumentSourceCursor( // Later code in the DocumentSourceCursor lifecycle expects that '_exec' is in a saved state. _exec->saveState(); - _planSummary = Explain::getPlanSummary(_exec.get()); + _planSummary = _exec->getPlanSummary(); recordPlanSummaryStats(); if (pExpCtx->explain) { diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 8ef13836c62..93663a68abf 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -157,18 +157,14 @@ public: } /** - * Sets the OperationContext of 'pCtx' to nullptr. - * - * The PipelineProxyStage is responsible for detaching the OperationContext and releasing any - * storage-engine state of the DocumentSourceCursor that may be present in '_sources'. + * Sets the OperationContext of 'pCtx' to nullptr and calls 'detachFromOperationContext()' on + * all underlying DocumentSources. */ void detachFromOperationContext(); /** - * Sets the OperationContext of 'pCtx' to 'opCtx'. - * - * The PipelineProxyStage is responsible for reattaching the OperationContext and reacquiring - * any storage-engine state of the DocumentSourceCursor that may be present in '_sources'. + * Sets the OperationContext of 'pCtx' to 'opCtx', and reattaches all underlying DocumentSources + * to 'opCtx'. */ void reattachToOperationContext(OperationContext* opCtx); @@ -186,6 +182,10 @@ public: */ void dispose(OperationContext* opCtx); + bool isDisposed() const { + return _disposed; + } + /** * Checks to see if disk is ever used within the pipeline. */ diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp new file mode 100644 index 00000000000..7a9f0524446 --- /dev/null +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -0,0 +1,206 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/plan_executor_pipeline.h" + +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/pipeline/resume_token.h" +#include "mongo/db/repl/speculative_majority_read_info.h" + +namespace mongo { + +PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + bool isChangeStream) + : _expCtx(std::move(expCtx)), _pipeline(std::move(pipeline)), _isChangeStream(isChangeStream) { + // Pipeline plan executors must always have an ExpressionContext. + invariant(_expCtx); + + // The caller is responsible for disposing this plan executor before deleting it, which will in + // turn dispose the underlying pipeline. Therefore, there is no need to dispose the pipeline + // again when it is destroyed. + _pipeline.get_deleter().dismissDisposal(); + + if (_isChangeStream) { + // Set _postBatchResumeToken to the initial PBRT that was added to the expression context + // during pipeline construction, and use it to obtain the starting time for + // _latestOplogTimestamp. + invariant(!_expCtx->initialPostBatchResumeToken.isEmpty()); + _postBatchResumeToken = _expCtx->initialPostBatchResumeToken.getOwned(); + _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime; + } +} + +PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId* recordIdOut) { + // The pipeline-based execution engine does not track the record ids associated with documents, + // so it is an error for the caller to ask for one. For the same reason, we expect the caller to + // provide a non-null BSONObj pointer for 'objOut'. + invariant(!recordIdOut); + invariant(objOut); + + if (!_stash.empty()) { + *objOut = std::move(_stash.front()); + _stash.pop(); + ++_nReturned; + return PlanExecutor::ADVANCED; + } + + Document docOut; + auto execState = getNextDocument(&docOut, nullptr); + if (execState == PlanExecutor::ADVANCED) { + // Include metadata if the output will be consumed by a merging node. + *objOut = _expCtx->needsMerge ? docOut.toBsonWithMetaData() : docOut.toBson(); + } + return execState; +} + +PlanExecutor::ExecState PlanExecutorPipeline::getNextDocument(Document* docOut, + RecordId* recordIdOut) { + // The pipeline-based execution engine does not track the record ids associated with documents, + // so it is an error for the caller to ask for one. For the same reason, we expect the caller to + // provide a non-null pointer for 'docOut'. + invariant(!recordIdOut); + invariant(docOut); + + // Callers which use 'enqueue()' are not allowed to use 'getNextDocument()', and must instead + // use 'getNext()'. + invariant(_stash.empty()); + + if (auto next = _getNext()) { + *docOut = std::move(*next); + ++_nReturned; + return PlanExecutor::ADVANCED; + } + + return PlanExecutor::IS_EOF; +} + +bool PlanExecutorPipeline::isEOF() { + if (!_stash.empty()) { + return false; + } + + return _pipelineIsEof; +} + +boost::optional<Document> PlanExecutorPipeline::_getNext() { + auto nextDoc = _pipeline->getNext(); + if (!nextDoc) { + _pipelineIsEof = true; + } + + if (_isChangeStream) { + _performChangeStreamsAccounting(nextDoc); + } + return nextDoc; +} + +void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional<Document> doc) { + invariant(_isChangeStream); + if (doc) { + // While we have more results to return, we track both the timestamp and the resume token of + // the latest event observed in the oplog, the latter via its sort key metadata field. + _validateResumeToken(*doc); + _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + _postBatchResumeToken = doc->metadata().getSortKey().getDocument().toBson(); + _setSpeculativeReadTimestamp(); + } else { + // We ran out of results to return. Check whether the oplog cursor has moved forward since + // the last recorded timestamp. Because we advance _latestOplogTimestamp for every event we + // return, if the new time is higher than the last then we are guaranteed not to have + // already returned any events at this timestamp. We can set _postBatchResumeToken to a new + // high-water-mark token at the current clusterTime. + auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + if (highWaterMark > _latestOplogTimestamp) { + auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark); + _postBatchResumeToken = token.toDocument().toBson(); + _latestOplogTimestamp = highWaterMark; + _setSpeculativeReadTimestamp(); + } + } +} + +void PlanExecutorPipeline::executePlan() { + while (_getNext()) { + // Run the pipeline to completion, but discard the results. + } +} + +std::string PlanExecutorPipeline::getPlanSummary() const { + return PipelineD::getPlanSummaryStr(_pipeline.get()); +} + +void PlanExecutorPipeline::getSummaryStats(PlanSummaryStats* statsOut) const { + invariant(statsOut); + PipelineD::getPlanSummaryStats(_pipeline.get(), statsOut); + statsOut->nReturned = _nReturned; +} + +void PlanExecutorPipeline::_validateResumeToken(const Document& event) const { + // If we are producing output to be merged on mongoS, then no stages can have modified the _id. + if (_expCtx->needsMerge) { + return; + } + + // Confirm that the document _id field matches the original resume token in the sort key field. + auto eventBSON = event.toBson(); + auto resumeToken = event.metadata().getSortKey(); + auto idField = eventBSON.getObjectField("_id"); + invariant(!resumeToken.missing()); + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "Encountered an event whose _id field, which contains the resume " + "token, was modified by the pipeline. Modifying the _id field of an " + "event makes it impossible to resume the stream from that point. Only " + "transformations that retain the unmodified _id field are allowed. " + "Expected: " + << BSON("_id" << resumeToken) << " but found: " + << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()), + (resumeToken.getType() == BSONType::Object) && + idField.binaryEqual(resumeToken.getDocument().toBson())); +} + +void PlanExecutorPipeline::_setSpeculativeReadTimestamp() { + repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo = + repl::SpeculativeMajorityReadInfo::get(_expCtx->opCtx); + if (speculativeMajorityReadInfo.isSpeculativeRead() && !_latestOplogTimestamp.isNull()) { + speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(_latestOplogTimestamp); + } +} + +void PlanExecutorPipeline::markAsKilled(Status killStatus) { + invariant(!killStatus.isOK()); + // If killed multiple times, only retain the first status. + if (_killStatus.isOK()) { + _killStatus = killStatus; + } +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h new file mode 100644 index 00000000000..f6ad2a97341 --- /dev/null +++ b/src/mongo/db/pipeline/plan_executor_pipeline.h @@ -0,0 +1,186 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <queue> + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/query/plan_executor.h" + +namespace mongo { + +/** + * A plan executor which is used to execute a Pipeline of DocumentSources. + */ +class PlanExecutorPipeline final : public PlanExecutor { +public: + PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + bool isChangeStream); + + PlanStage* getRootStage() const override { + MONGO_UNREACHABLE; + } + + CanonicalQuery* getCanonicalQuery() const override { + return nullptr; + } + + const NamespaceString& nss() const override { + return _expCtx->ns; + } + + OperationContext* getOpCtx() const override { + return _expCtx->opCtx; + } + + // Pipeline execution does not support the saveState()/restoreState() interface. Instead, the + // underlying data access plan is saved/restored internally in between DocumentSourceCursor + // batches, or when the underlying PlanStage tree yields. + void saveState() override {} + void restoreState() override {} + + void detachFromOperationContext() override { + _pipeline->detachFromOperationContext(); + } + + void reattachToOperationContext(OperationContext* opCtx) override { + _pipeline->reattachToOperationContext(opCtx); + } + + ExecState getNext(BSONObj* objOut, RecordId* recordIdOut) override; + ExecState getNextDocument(Document* docOut, RecordId* recordIdOut) override; + + bool isEOF() override; + + void executePlan() override; + + void dispose(OperationContext* opCtx) override { + _pipeline->dispose(opCtx); + } + + void enqueue(const BSONObj& obj) override { + _stash.push(obj.getOwned()); + } + + void markAsKilled(Status killStatus) override; + + bool isMarkedAsKilled() const override { + return !_killStatus.isOK(); + } + + Status getKillStatus() override { + invariant(isMarkedAsKilled()); + return _killStatus; + } + + bool isDisposed() const override { + return _pipeline->isDisposed(); + } + + Timestamp getLatestOplogTimestamp() const override { + return _latestOplogTimestamp; + } + + BSONObj getPostBatchResumeToken() const override { + return _postBatchResumeToken; + } + + LockPolicy lockPolicy() const override { + return LockPolicy::kLocksInternally; + } + + std::string getPlanSummary() const override; + + void getSummaryStats(PlanSummaryStats* statsOut) const override; + + /** + * Writes the explain information about the underlying pipeline to a std::vector<Value>, + * providing the level of detail specified by 'verbosity'. + */ + std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const { + return _pipeline->writeExplainOps(verbosity); + } + + BSONObj getStats() const override { + // TODO SERVER-49808: Report execution stats for the pipeline. + return BSONObj{}; + } + +private: + /** + * Obtains the next document from the underlying Pipeline, and does change streams-related + * accounting if needed. + */ + boost::optional<Document> _getNext(); + + /** + * If this is a change stream, advance the cluster time and post batch resume token based on the + * latest document returned by the underlying pipeline. + */ + void _performChangeStreamsAccounting(const boost::optional<Document>); + + /** + * Verifies that the docs's resume token has not been modified. + */ + void _validateResumeToken(const Document& event) const; + + /** + * Set the speculative majority read timestamp if we have scanned up to a certain oplog + * timestamp. + */ + void _setSpeculativeReadTimestamp(); + + boost::intrusive_ptr<ExpressionContext> _expCtx; + + std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; + + const bool _isChangeStream; + + std::queue<BSONObj> _stash; + + // If _killStatus has a non-OK value, then we have been killed and the value represents the + // reason for the kill. + Status _killStatus = Status::OK(); + + size_t _nReturned = 0; + + // Set to true once we have received all results from the underlying '_pipeline', and the + // pipeline has indicated end-of-stream. + bool _pipelineIsEof = false; + + // If '_pipeline' is a change stream, these track the latest timestamp seen while scanning the + // oplog, as well as the most recent PBRT. + Timestamp _latestOplogTimestamp; + BSONObj _postBatchResumeToken; +}; + +} // namespace mongo |