summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2020-07-09 20:07:55 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-24 16:31:37 +0000
commitb4b35f9cc69412611a198642333bf40daa5ba58c (patch)
tree909673b812a499a60692c46abb53853f7df42b48 /src/mongo/db/exec
parent5e53ee3ca0a90eb98cdab94b298dec810fb46804 (diff)
downloadmongo-b4b35f9cc69412611a198642333bf40daa5ba58c.tar.gz
SERVER-48478 Replace PipelineProxyStage with PlanExecutorPipeline
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp116
-rw-r--r--src/mongo/db/exec/change_stream_proxy.h97
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp145
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h106
4 files changed, 0 insertions, 464 deletions
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
deleted file mode 100644
index c16255a897b..00000000000
--- a/src/mongo/db/exec/change_stream_proxy.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/exec/change_stream_proxy.h"
-
-#include "mongo/db/pipeline/pipeline_d.h"
-#include "mongo/db/pipeline/resume_token.h"
-#include "mongo/db/repl/speculative_majority_read_info.h"
-
-namespace mongo {
-
-const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY";
-
-ChangeStreamProxyStage::ChangeStreamProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws)
- : PipelineProxyStage(expCtx, std::move(pipeline), ws, kStageType) {
- // Set _postBatchResumeToken to the initial PBRT that was added to the expression context during
- // pipeline construction, and use it to obtain the starting time for _latestOplogTimestamp.
- invariant(!_pipeline->getContext()->initialPostBatchResumeToken.isEmpty());
- _postBatchResumeToken = _pipeline->getContext()->initialPostBatchResumeToken.getOwned();
- _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
-}
-
-boost::optional<Document> ChangeStreamProxyStage::getNext() {
- if (auto next = _pipeline->getNext()) {
- // While we have more results to return, we track both the timestamp and the resume token of
- // the latest event observed in the oplog, the latter via its sort key metadata field.
- _validateResumeToken(*next);
- _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
- _postBatchResumeToken = next->metadata().getSortKey().getDocument().toBson();
- _setSpeculativeReadTimestamp();
- return next;
- }
-
- // We ran out of results to return. Check whether the oplog cursor has moved forward since the
- // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return,
- // if the new time is higher than the last then we are guaranteed not to have already returned
- // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token
- // at the current clusterTime.
- auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
- if (highWaterMark > _latestOplogTimestamp) {
- auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
- _postBatchResumeToken = token.toDocument().toBson();
- _latestOplogTimestamp = highWaterMark;
- _setSpeculativeReadTimestamp();
- }
- return boost::none;
-}
-
-void ChangeStreamProxyStage::_validateResumeToken(const Document& event) const {
- // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
- if (_includeMetaData) {
- return;
- }
- // Confirm that the document _id field matches the original resume token in the sort key field.
- auto eventBSON = event.toBson();
- auto resumeToken = event.metadata().getSortKey();
- auto idField = eventBSON.getObjectField("_id");
- invariant(!resumeToken.missing());
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "Encountered an event whose _id field, which contains the resume "
- "token, was modified by the pipeline. Modifying the _id field of an "
- "event makes it impossible to resume the stream from that point. Only "
- "transformations that retain the unmodified _id field are allowed. "
- "Expected: "
- << BSON("_id" << resumeToken) << " but found: "
- << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
- (resumeToken.getType() == BSONType::Object) &&
- idField.binaryEqual(resumeToken.getDocument().toBson()));
-}
-
-void ChangeStreamProxyStage::_setSpeculativeReadTimestamp() {
- repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
- repl::SpeculativeMajorityReadInfo::get(_pipeline->getContext()->opCtx);
- if (speculativeMajorityReadInfo.isSpeculativeRead() && !_latestOplogTimestamp.isNull()) {
- speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(_latestOplogTimestamp);
- }
-}
-
-std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() {
- std::unique_ptr<PlanStageStats> ret =
- std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY);
- ret->specific = std::make_unique<CollectionScanStats>();
- return ret;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h
deleted file mode 100644
index 6d115b78885..00000000000
--- a/src/mongo/db/exec/change_stream_proxy.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/exec/pipeline_proxy.h"
-
-namespace mongo {
-
-/**
- * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the
- * serialization of change stream pipeline output from Document to BSON. In particular, it is
- * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that
- * are necessary for correct merging on mongoS and, in the latter case, must also be provided to
- * mongoD clients.
- */
-class ChangeStreamProxyStage final : public PipelineProxyStage {
-public:
- static const char* kStageType;
-
- /**
- * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into
- * the constructor will cause an invariant() to fail.
- */
- ChangeStreamProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws);
-
- /**
- * Returns an empty PlanStageStats object.
- */
- std::unique_ptr<PlanStageStats> getStats() final;
-
- /**
- * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog
- * timestamp in the event that we need to merge on mongoS.
- */
- Timestamp getLatestOplogTimestamp() const {
- return _includeMetaData ? _latestOplogTimestamp : Timestamp();
- }
-
- /**
- * Passes through the most recent resume token from the proxied pipeline.
- */
- BSONObj getPostBatchResumeToken() const {
- return _postBatchResumeToken;
- }
-
- StageType stageType() const final {
- return STAGE_CHANGE_STREAM_PROXY;
- }
-
-protected:
- boost::optional<Document> getNext() final;
-
-private:
- /**
- * Verifies that the docs's resume token has not been modified.
- */
- void _validateResumeToken(const Document& event) const;
-
- /**
- * Set the speculative majority read timestamp if we have scanned up to a certain oplog
- * timestamp.
- */
- void _setSpeculativeReadTimestamp();
-
- Timestamp _latestOplogTimestamp;
- BSONObj _postBatchResumeToken;
-};
-} // namespace mongo
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
deleted file mode 100644
index c3014ce34a5..00000000000
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/exec/pipeline_proxy.h"
-
-#include <memory>
-
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/db/pipeline/pipeline_d.h"
-
-namespace mongo {
-
-using boost::intrusive_ptr;
-using std::shared_ptr;
-using std::unique_ptr;
-using std::vector;
-
-const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY";
-
-PipelineProxyStage::PipelineProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws)
- : PipelineProxyStage(expCtx, std::move(pipeline), ws, kStageType) {}
-
-PipelineProxyStage::PipelineProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws,
- const char* stageTypeName)
- : PlanStage(stageTypeName, expCtx),
- _pipeline(std::move(pipeline)),
- _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger
- _ws(ws) {
- // We take over responsibility for disposing of the Pipeline, since it is required that
- // doDispose() will be called before destruction of this PipelineProxyStage.
- _pipeline.get_deleter().dismissDisposal();
-}
-
-PlanStage::StageState PipelineProxyStage::doWork(WorkingSetID* out) {
- invariant(out);
-
- if (!_stash.empty()) {
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- if (_includeMetaData && _stash.back().metadata()) {
- member->metadata() = _stash.back().metadata();
- }
- member->doc = {SnapshotId(), std::move(_stash.back())};
- _stash.pop_back();
- member->transitionToOwnedObj();
- return PlanStage::ADVANCED;
- }
-
- if (auto next = getNext()) {
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- if (_includeMetaData && next->metadata()) {
- member->metadata() = next->metadata();
- }
- member->doc = {SnapshotId(), std::move(*next)};
- member->transitionToOwnedObj();
- return PlanStage::ADVANCED;
- }
-
- return PlanStage::IS_EOF;
-}
-
-bool PipelineProxyStage::isEOF() {
- if (!_stash.empty())
- return false;
-
- if (auto next = getNext()) {
- _stash.emplace_back(*next);
- return false;
- }
-
- return true;
-}
-
-void PipelineProxyStage::doDetachFromOperationContext() {
- _pipeline->detachFromOperationContext();
-}
-
-void PipelineProxyStage::doReattachToOperationContext() {
- _pipeline->reattachToOperationContext(opCtx());
-}
-
-void PipelineProxyStage::doDispose() {
- _pipeline->dispose(opCtx());
-}
-
-unique_ptr<PlanStageStats> PipelineProxyStage::getStats() {
- unique_ptr<PlanStageStats> ret =
- std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_PIPELINE_PROXY);
- ret->specific = std::make_unique<CollectionScanStats>();
- return ret;
-}
-
-boost::optional<Document> PipelineProxyStage::getNext() {
- return _pipeline->getNext();
-}
-
-std::string PipelineProxyStage::getPlanSummaryStr() const {
- return PipelineD::getPlanSummaryStr(_pipeline.get());
-}
-
-void PipelineProxyStage::getPlanSummaryStats(PlanSummaryStats* statsOut) const {
- invariant(statsOut);
- PipelineD::getPlanSummaryStats(_pipeline.get(), statsOut);
- statsOut->nReturned = getCommonStats()->advanced;
-}
-
-vector<Value> PipelineProxyStage::writeExplainOps(ExplainOptions::Verbosity verbosity) const {
- return _pipeline->writeExplainOps(verbosity);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
deleted file mode 100644
index 99f8583f3c0..00000000000
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <boost/intrusive_ptr.hpp>
-#include <boost/optional/optional.hpp>
-
-#include "mongo/db/exec/plan_stage.h"
-#include "mongo/db/exec/plan_stats.h"
-#include "mongo/db/pipeline/pipeline.h"
-#include "mongo/db/query/plan_summary_stats.h"
-#include "mongo/db/record_id.h"
-#include "mongo/util/assert_util.h"
-
-namespace mongo {
-
-/**
- * Stage for pulling results out from an aggregation pipeline.
- */
-class PipelineProxyStage : public PlanStage {
-public:
- PipelineProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws);
-
- virtual ~PipelineProxyStage() = default;
-
- PlanStage::StageState doWork(WorkingSetID* out) final;
-
- bool isEOF() final;
-
- //
- // Manage our OperationContext.
- //
- void doDetachFromOperationContext() final;
- void doReattachToOperationContext() final;
-
- // Returns empty PlanStageStats object
- std::unique_ptr<PlanStageStats> getStats() override;
-
- // Not used.
- SpecificStats* getSpecificStats() const final {
- MONGO_UNREACHABLE;
- }
-
- std::string getPlanSummaryStr() const;
- void getPlanSummaryStats(PlanSummaryStats* statsOut) const;
-
- StageType stageType() const override {
- return STAGE_PIPELINE_PROXY;
- }
-
- /**
- * Writes the pipelineProxyStage's operators to a std::vector<Value>, providing the level of
- * detail specified by 'verbosity'.
- */
- std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const;
-
- static const char* kStageType;
-
-protected:
- PipelineProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws,
- const char* stageTypeName);
-
- virtual boost::optional<Document> getNext();
- void doDispose() final;
-
- // Items in the _stash should be returned before pulling items from _pipeline.
- std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
- const bool _includeMetaData;
-
-private:
- std::vector<Document> _stash;
- WorkingSet* _ws;
-};
-
-} // namespace mongo