summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2020-07-09 20:07:55 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-24 16:31:37 +0000
commitb4b35f9cc69412611a198642333bf40daa5ba58c (patch)
tree909673b812a499a60692c46abb53853f7df42b48 /src/mongo/db/pipeline
parent5e53ee3ca0a90eb98cdab94b298dec810fb46804 (diff)
downloadmongo-b4b35f9cc69412611a198642333bf40daa5ba58c.tar.gz
SERVER-48478 Replace PipelineProxyStage with PlanExecutorPipeline
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline.h16
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp206
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.h186
4 files changed, 402 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 9c8389594b9..7c530ef0663 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -195,7 +195,7 @@ void DocumentSourceCursor::_updateOplogTimestamp() {
void DocumentSourceCursor::recordPlanSummaryStats() {
invariant(_exec);
- Explain::getSummaryStats(*_exec, &_planSummaryStats);
+ _exec->getSummaryStats(&_planSummaryStats);
}
Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> verbosity) const {
@@ -300,7 +300,7 @@ DocumentSourceCursor::DocumentSourceCursor(
// Later code in the DocumentSourceCursor lifecycle expects that '_exec' is in a saved state.
_exec->saveState();
- _planSummary = Explain::getPlanSummary(_exec.get());
+ _planSummary = _exec->getPlanSummary();
recordPlanSummaryStats();
if (pExpCtx->explain) {
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 8ef13836c62..93663a68abf 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -157,18 +157,14 @@ public:
}
/**
- * Sets the OperationContext of 'pCtx' to nullptr.
- *
- * The PipelineProxyStage is responsible for detaching the OperationContext and releasing any
- * storage-engine state of the DocumentSourceCursor that may be present in '_sources'.
+ * Sets the OperationContext of 'pCtx' to nullptr and calls 'detachFromOperationContext()' on
+ * all underlying DocumentSources.
*/
void detachFromOperationContext();
/**
- * Sets the OperationContext of 'pCtx' to 'opCtx'.
- *
- * The PipelineProxyStage is responsible for reattaching the OperationContext and reacquiring
- * any storage-engine state of the DocumentSourceCursor that may be present in '_sources'.
+ * Sets the OperationContext of 'pCtx' to 'opCtx', and reattaches all underlying DocumentSources
+ * to 'opCtx'.
*/
void reattachToOperationContext(OperationContext* opCtx);
@@ -186,6 +182,10 @@ public:
*/
void dispose(OperationContext* opCtx);
+ bool isDisposed() const {
+ return _disposed;
+ }
+
/**
* Checks to see if disk is ever used within the pipeline.
*/
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
new file mode 100644
index 00000000000..7a9f0524446
--- /dev/null
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -0,0 +1,206 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/plan_executor_pipeline.h"
+
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/pipeline/resume_token.h"
+#include "mongo/db/repl/speculative_majority_read_info.h"
+
+namespace mongo {
+
+PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ bool isChangeStream)
+ : _expCtx(std::move(expCtx)), _pipeline(std::move(pipeline)), _isChangeStream(isChangeStream) {
+ // Pipeline plan executors must always have an ExpressionContext.
+ invariant(_expCtx);
+
+ // The caller is responsible for disposing this plan executor before deleting it, which will in
+ // turn dispose the underlying pipeline. Therefore, there is no need to dispose the pipeline
+ // again when it is destroyed.
+ _pipeline.get_deleter().dismissDisposal();
+
+ if (_isChangeStream) {
+ // Set _postBatchResumeToken to the initial PBRT that was added to the expression context
+ // during pipeline construction, and use it to obtain the starting time for
+ // _latestOplogTimestamp.
+ invariant(!_expCtx->initialPostBatchResumeToken.isEmpty());
+ _postBatchResumeToken = _expCtx->initialPostBatchResumeToken.getOwned();
+ _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
+ }
+}
+
+PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId* recordIdOut) {
+ // The pipeline-based execution engine does not track the record ids associated with documents,
+ // so it is an error for the caller to ask for one. For the same reason, we expect the caller to
+ // provide a non-null BSONObj pointer for 'objOut'.
+ invariant(!recordIdOut);
+ invariant(objOut);
+
+ if (!_stash.empty()) {
+ *objOut = std::move(_stash.front());
+ _stash.pop();
+ ++_nReturned;
+ return PlanExecutor::ADVANCED;
+ }
+
+ Document docOut;
+ auto execState = getNextDocument(&docOut, nullptr);
+ if (execState == PlanExecutor::ADVANCED) {
+ // Include metadata if the output will be consumed by a merging node.
+ *objOut = _expCtx->needsMerge ? docOut.toBsonWithMetaData() : docOut.toBson();
+ }
+ return execState;
+}
+
+PlanExecutor::ExecState PlanExecutorPipeline::getNextDocument(Document* docOut,
+ RecordId* recordIdOut) {
+ // The pipeline-based execution engine does not track the record ids associated with documents,
+ // so it is an error for the caller to ask for one. For the same reason, we expect the caller to
+ // provide a non-null pointer for 'docOut'.
+ invariant(!recordIdOut);
+ invariant(docOut);
+
+ // Callers which use 'enqueue()' are not allowed to use 'getNextDocument()', and must instead
+ // use 'getNext()'.
+ invariant(_stash.empty());
+
+ if (auto next = _getNext()) {
+ *docOut = std::move(*next);
+ ++_nReturned;
+ return PlanExecutor::ADVANCED;
+ }
+
+ return PlanExecutor::IS_EOF;
+}
+
+bool PlanExecutorPipeline::isEOF() {
+ if (!_stash.empty()) {
+ return false;
+ }
+
+ return _pipelineIsEof;
+}
+
+boost::optional<Document> PlanExecutorPipeline::_getNext() {
+ auto nextDoc = _pipeline->getNext();
+ if (!nextDoc) {
+ _pipelineIsEof = true;
+ }
+
+ if (_isChangeStream) {
+ _performChangeStreamsAccounting(nextDoc);
+ }
+ return nextDoc;
+}
+
+void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional<Document> doc) {
+ invariant(_isChangeStream);
+ if (doc) {
+ // While we have more results to return, we track both the timestamp and the resume token of
+ // the latest event observed in the oplog, the latter via its sort key metadata field.
+ _validateResumeToken(*doc);
+ _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ _postBatchResumeToken = doc->metadata().getSortKey().getDocument().toBson();
+ _setSpeculativeReadTimestamp();
+ } else {
+ // We ran out of results to return. Check whether the oplog cursor has moved forward since
+ // the last recorded timestamp. Because we advance _latestOplogTimestamp for every event we
+ // return, if the new time is higher than the last then we are guaranteed not to have
+ // already returned any events at this timestamp. We can set _postBatchResumeToken to a new
+ // high-water-mark token at the current clusterTime.
+ auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ if (highWaterMark > _latestOplogTimestamp) {
+ auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
+ _postBatchResumeToken = token.toDocument().toBson();
+ _latestOplogTimestamp = highWaterMark;
+ _setSpeculativeReadTimestamp();
+ }
+ }
+}
+
+void PlanExecutorPipeline::executePlan() {
+ while (_getNext()) {
+ // Run the pipeline to completion, but discard the results.
+ }
+}
+
+std::string PlanExecutorPipeline::getPlanSummary() const {
+ return PipelineD::getPlanSummaryStr(_pipeline.get());
+}
+
+void PlanExecutorPipeline::getSummaryStats(PlanSummaryStats* statsOut) const {
+ invariant(statsOut);
+ PipelineD::getPlanSummaryStats(_pipeline.get(), statsOut);
+ statsOut->nReturned = _nReturned;
+}
+
+void PlanExecutorPipeline::_validateResumeToken(const Document& event) const {
+ // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
+ if (_expCtx->needsMerge) {
+ return;
+ }
+
+ // Confirm that the document _id field matches the original resume token in the sort key field.
+ auto eventBSON = event.toBson();
+ auto resumeToken = event.metadata().getSortKey();
+ auto idField = eventBSON.getObjectField("_id");
+ invariant(!resumeToken.missing());
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "Encountered an event whose _id field, which contains the resume "
+ "token, was modified by the pipeline. Modifying the _id field of an "
+ "event makes it impossible to resume the stream from that point. Only "
+ "transformations that retain the unmodified _id field are allowed. "
+ "Expected: "
+ << BSON("_id" << resumeToken) << " but found: "
+ << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
+ (resumeToken.getType() == BSONType::Object) &&
+ idField.binaryEqual(resumeToken.getDocument().toBson()));
+}
+
+void PlanExecutorPipeline::_setSpeculativeReadTimestamp() {
+ repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
+ repl::SpeculativeMajorityReadInfo::get(_expCtx->opCtx);
+ if (speculativeMajorityReadInfo.isSpeculativeRead() && !_latestOplogTimestamp.isNull()) {
+ speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(_latestOplogTimestamp);
+ }
+}
+
+void PlanExecutorPipeline::markAsKilled(Status killStatus) {
+ invariant(!killStatus.isOK());
+ // If killed multiple times, only retain the first status.
+ if (_killStatus.isOK()) {
+ _killStatus = killStatus;
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h
new file mode 100644
index 00000000000..f6ad2a97341
--- /dev/null
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.h
@@ -0,0 +1,186 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <queue>
+
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/query/plan_executor.h"
+
+namespace mongo {
+
+/**
+ * A plan executor which is used to execute a Pipeline of DocumentSources.
+ */
+class PlanExecutorPipeline final : public PlanExecutor {
+public:
+ PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ bool isChangeStream);
+
+ PlanStage* getRootStage() const override {
+ MONGO_UNREACHABLE;
+ }
+
+ CanonicalQuery* getCanonicalQuery() const override {
+ return nullptr;
+ }
+
+ const NamespaceString& nss() const override {
+ return _expCtx->ns;
+ }
+
+ OperationContext* getOpCtx() const override {
+ return _expCtx->opCtx;
+ }
+
+ // Pipeline execution does not support the saveState()/restoreState() interface. Instead, the
+ // underlying data access plan is saved/restored internally in between DocumentSourceCursor
+ // batches, or when the underlying PlanStage tree yields.
+ void saveState() override {}
+ void restoreState() override {}
+
+ void detachFromOperationContext() override {
+ _pipeline->detachFromOperationContext();
+ }
+
+ void reattachToOperationContext(OperationContext* opCtx) override {
+ _pipeline->reattachToOperationContext(opCtx);
+ }
+
+ ExecState getNext(BSONObj* objOut, RecordId* recordIdOut) override;
+ ExecState getNextDocument(Document* docOut, RecordId* recordIdOut) override;
+
+ bool isEOF() override;
+
+ void executePlan() override;
+
+ void dispose(OperationContext* opCtx) override {
+ _pipeline->dispose(opCtx);
+ }
+
+ void enqueue(const BSONObj& obj) override {
+ _stash.push(obj.getOwned());
+ }
+
+ void markAsKilled(Status killStatus) override;
+
+ bool isMarkedAsKilled() const override {
+ return !_killStatus.isOK();
+ }
+
+ Status getKillStatus() override {
+ invariant(isMarkedAsKilled());
+ return _killStatus;
+ }
+
+ bool isDisposed() const override {
+ return _pipeline->isDisposed();
+ }
+
+ Timestamp getLatestOplogTimestamp() const override {
+ return _latestOplogTimestamp;
+ }
+
+ BSONObj getPostBatchResumeToken() const override {
+ return _postBatchResumeToken;
+ }
+
+ LockPolicy lockPolicy() const override {
+ return LockPolicy::kLocksInternally;
+ }
+
+ std::string getPlanSummary() const override;
+
+ void getSummaryStats(PlanSummaryStats* statsOut) const override;
+
+ /**
+ * Writes the explain information about the underlying pipeline to a std::vector<Value>,
+ * providing the level of detail specified by 'verbosity'.
+ */
+ std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const {
+ return _pipeline->writeExplainOps(verbosity);
+ }
+
+ BSONObj getStats() const override {
+ // TODO SERVER-49808: Report execution stats for the pipeline.
+ return BSONObj{};
+ }
+
+private:
+ /**
+ * Obtains the next document from the underlying Pipeline, and does change streams-related
+ * accounting if needed.
+ */
+ boost::optional<Document> _getNext();
+
+ /**
+ * If this is a change stream, advance the cluster time and post batch resume token based on the
+ * latest document returned by the underlying pipeline.
+ */
+ void _performChangeStreamsAccounting(const boost::optional<Document>);
+
+ /**
+ * Verifies that the docs's resume token has not been modified.
+ */
+ void _validateResumeToken(const Document& event) const;
+
+ /**
+ * Set the speculative majority read timestamp if we have scanned up to a certain oplog
+ * timestamp.
+ */
+ void _setSpeculativeReadTimestamp();
+
+ boost::intrusive_ptr<ExpressionContext> _expCtx;
+
+ std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
+
+ const bool _isChangeStream;
+
+ std::queue<BSONObj> _stash;
+
+ // If _killStatus has a non-OK value, then we have been killed and the value represents the
+ // reason for the kill.
+ Status _killStatus = Status::OK();
+
+ size_t _nReturned = 0;
+
+ // Set to true once we have received all results from the underlying '_pipeline', and the
+ // pipeline has indicated end-of-stream.
+ bool _pipelineIsEof = false;
+
+ // If '_pipeline' is a change stream, these track the latest timestamp seen while scanning the
+ // oplog, as well as the most recent PBRT.
+ Timestamp _latestOplogTimestamp;
+ BSONObj _postBatchResumeToken;
+};
+
+} // namespace mongo