summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2019-05-08 09:45:50 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2019-05-21 11:20:22 -0400
commit1397d1398b3b9b1723cd9b93de6b345f940a17e8 (patch)
tree8c087879396b984caa369d2c4d107a03538e95bf /src/mongo
parenta3638ccc4889436984eb385cb7717eecd6af83cf (diff)
downloadmongo-1397d1398b3b9b1723cd9b93de6b345f940a17e8.tar.gz
SERVER-40539 Add DocumentSourceQueue
This stage is distinguished from the Mock stage in that it doesn't bother tracking state about which methods have been called. The queue version is simpler and is used in production code (namely the update system), whereas the mock is still preferred in testing environments.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_exchange_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_mock.cpp36
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h59
-rw-r--r--src/mongo/db/pipeline/document_source_queue.cpp58
-rw-r--r--src/mongo/db/pipeline/document_source_queue.h107
-rw-r--r--src/mongo/db/pipeline/document_source_sample_test.cpp42
-rw-r--r--src/mongo/db/update/SConscript1
-rw-r--r--src/mongo/db/update/pipeline_executor.cpp26
-rw-r--r--src/mongo/db/update/pipeline_executor.h16
11 files changed, 238 insertions, 118 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 85001ac9c5b..a778d314936 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -401,6 +401,7 @@ pipelineeEnv.Library(
'document_source_out_replace_coll.cpp',
'document_source_plan_cache_stats.cpp',
'document_source_project.cpp',
+ 'document_source_queue.cpp',
'document_source_redact.cpp',
'document_source_replace_root.cpp',
'document_source_sample.cpp',
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 230ad31c8f3..ff7c7674425 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -67,7 +67,7 @@ protected:
*/
void addDocument(
Timestamp ts, int version, std::size_t applyOpsIndex, Document docKey, UUID uuid) {
- _mock->queue.push_back(
+ _mock->push_back(
Document{{"_id",
ResumeToken(ResumeTokenData(ts, version, applyOpsIndex, uuid, Value(docKey)))
.toDocument()}});
@@ -89,7 +89,7 @@ protected:
}
void addPause() {
- _mock->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+ _mock->push_back(DocumentSource::GetNextResult::makePauseExecution());
}
/**
diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp
index 4cc3cf60e8c..cd66171a246 100644
--- a/src/mongo/db/pipeline/document_source_exchange_test.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp
@@ -81,7 +81,7 @@ protected:
auto source = DocumentSourceMock::createForTest();
for (int i = 0; i < cnt; ++i)
- source->queue.emplace_back(Document{{"a", i}, {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}});
+ source->emplace_back(Document{{"a", i}, {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}});
return source;
}
@@ -99,8 +99,8 @@ protected:
auto source = DocumentSourceMock::createForTest();
for (size_t i = 0; i < cnt; ++i)
- source->queue.emplace_back(Document{{"a", static_cast<int>(prng.nextInt32() % cnt)},
- {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}});
+ source->emplace_back(Document{{"a", static_cast<int>(prng.nextInt32() % cnt)},
+ {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}});
return source;
}
diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp
index 0f15b96a038..f4efb3e731e 100644
--- a/src/mongo/db/pipeline/document_source_mock.cpp
+++ b/src/mongo/db/pipeline/document_source_mock.cpp
@@ -41,28 +41,12 @@ using boost::intrusive_ptr;
using std::deque;
DocumentSourceMock::DocumentSourceMock(deque<GetNextResult> results)
- : DocumentSource(new ExpressionContextForTest()),
- queue(std::move(results)),
- sorts(SimpleBSONObjComparator::kInstance.makeBSONObjSet()) {}
-
-DocumentSourceMock::DocumentSourceMock(deque<GetNextResult> results,
- const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx),
- queue(std::move(results)),
- sorts(SimpleBSONObjComparator::kInstance.makeBSONObjSet()) {}
+ : DocumentSourceQueue(std::move(results), new ExpressionContextForTest()) {}
const char* DocumentSourceMock::getSourceName() const {
return "mock";
}
-Value DocumentSourceMock::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- return Value(Document{{getSourceName(), Document()}});
-}
-
-void DocumentSourceMock::doDispose() {
- isDisposed = true;
-}
-
intrusive_ptr<DocumentSourceMock> DocumentSourceMock::createForTest(Document doc) {
return new DocumentSourceMock({std::move(doc)});
}
@@ -92,22 +76,4 @@ intrusive_ptr<DocumentSourceMock> DocumentSourceMock::createForTest(
}
return new DocumentSourceMock(std::move(results));
}
-
-boost::intrusive_ptr<DocumentSourceMock> DocumentSourceMock::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceMock({}, expCtx);
-}
-
-DocumentSource::GetNextResult DocumentSourceMock::getNext() {
- invariant(!isDisposed);
- invariant(!isDetachedFromOpCtx);
-
- if (queue.empty()) {
- return GetNextResult::makeEOF();
- }
-
- auto next = std::move(queue.front());
- queue.pop_front();
- return next;
-}
}
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index dc6d501a210..11156ef4422 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -31,38 +31,16 @@
#include <deque>
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_queue.h"
namespace mongo {
/**
- * Used in testing to store documents without using the storage layer. Methods are not marked as
- * final in order to allow tests to intercept calls if needed.
+ * A mock DocumentSource which is useful for testing. In addition to re-spooling documents like
+ * DocumentSourceQueue, it tracks some state about which methods have been called.
*/
-class DocumentSourceMock : public DocumentSource {
+class DocumentSourceMock : public DocumentSourceQueue {
public:
- DocumentSourceMock(std::deque<GetNextResult> results);
- DocumentSourceMock(std::deque<GetNextResult> results,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- GetNextResult getNext() override;
- const char* getSourceName() const override;
- Value serialize(
- boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const override {
- StageConstraints constraints(StreamType::kStreaming,
- PositionRequirement::kFirst,
- HostTypeRequirement::kNone,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed,
- TransactionRequirement::kAllowed,
- LookupRequirement::kAllowed);
-
- constraints.requiresInputDocSource = false;
- return constraints;
- }
-
// Constructors which create their own ExpressionContextForTest. Do _not_ use these outside of
// tests, as they will spin up ServiceContexts (TODO SERVER-41060).
static boost::intrusive_ptr<DocumentSourceMock> createForTest();
@@ -77,10 +55,21 @@ public:
static boost::intrusive_ptr<DocumentSourceMock> createForTest(
const std::initializer_list<const char*>& jsons);
- // Use these constructors outside of tests.
- // TODO: SERVER-40539 this should no longer be necessary once there's a DocumentSourceQueue.
- static boost::intrusive_ptr<DocumentSourceMock> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ using DocumentSourceQueue::DocumentSourceQueue;
+ DocumentSourceMock(std::deque<GetNextResult>);
+
+ GetNextResult getNext() override {
+ invariant(!isDisposed);
+ invariant(!isDetachedFromOpCtx);
+ return DocumentSourceQueue::getNext();
+ }
+ Value serialize(
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override {
+ // Unlike the queue, it's okay to serialize this stage for testing purposes.
+ return Value(Document{{getSourceName(), Document()}});
+ }
+
+ const char* getSourceName() const override;
void reattachToOperationContext(OperationContext* opCtx) {
isDetachedFromOpCtx = false;
@@ -106,18 +95,14 @@ public:
return boost::none;
}
- // Return documents from front of queue.
- std::deque<GetNextResult> queue;
-
bool isDisposed = false;
bool isDetachedFromOpCtx = false;
bool isOptimized = false;
- bool isExpCtxInjected = false;
-
- BSONObjSet sorts;
protected:
- void doDispose() override;
+ void doDispose() override {
+ isDisposed = true;
+ }
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp
new file mode 100644
index 00000000000..80559de1a71
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_queue.cpp
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_queue.h"
+
+namespace mongo {
+
+boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceQueue({}, expCtx);
+}
+
+DocumentSourceQueue::DocumentSourceQueue(std::deque<GetNextResult> results,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx), _queue(std::move(results)) {}
+
+const char* DocumentSourceQueue::getSourceName() const {
+ return kStageName.rawData();
+}
+
+DocumentSource::GetNextResult DocumentSourceQueue::getNext() {
+ if (_queue.empty()) {
+ return GetNextResult::makeEOF();
+ }
+
+ auto next = std::move(_queue.front());
+ _queue.pop_front();
+ return next;
+}
+}
diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h
new file mode 100644
index 00000000000..b38c992248e
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_queue.h
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <deque>
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * A DocumentSource which re-spools a queue of documents loaded into it. This stage does not
+ * retrieve any input from an earlier stage. It can be useful to adapt the usual pull-based model of
+ * a pipeline to more of a push-based model by pushing documents to feed through the pipeline into
+ * this queue stage.
+ */
+class DocumentSourceQueue : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "queue"_sd;
+
+ static boost::intrusive_ptr<DocumentSourceQueue> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ DocumentSourceQueue(std::deque<GetNextResult> results,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ virtual ~DocumentSourceQueue() {}
+
+ GetNextResult getNext() override;
+ const char* getSourceName() const override;
+ Value serialize(
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override {
+ // This stage is not intended to be serialized. Supporting a fully-general serialization is
+ // not trivial since we'd have to invent a serialization format for each of the
+ // GetNextResult states.
+ MONGO_UNREACHABLE;
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed,
+ LookupRequirement::kAllowed);
+
+ constraints.requiresInputDocSource = false;
+ return constraints;
+ }
+
+ /**
+ * This stage does not modify anything.
+ */
+ GetModPathsReturn getModifiedPaths() const override {
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
+ }
+
+ boost::optional<MergingLogic> mergingLogic() override {
+ return boost::none;
+ }
+
+ template <class... Args>
+ GetNextResult& emplace_back(Args&&... args) {
+ return _queue.emplace_back(std::forward<Args>(args)...);
+ }
+
+ void push_back(GetNextResult&& result) {
+ _queue.push_back(std::move(result));
+ }
+
+ void push_back(const GetNextResult& result) {
+ _queue.push_back(result);
+ }
+
+protected:
+ // Return documents from front of queue.
+ std::deque<GetNextResult> _queue;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sample_test.cpp b/src/mongo/db/pipeline/document_source_sample_test.cpp
index f177edd6acc..0cd406abbcb 100644
--- a/src/mongo/db/pipeline/document_source_sample_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sample_test.cpp
@@ -102,7 +102,7 @@ protected:
*/
void loadDocuments(int nDocs) {
for (int i = 0; i < nDocs; i++) {
- _mock->queue.push_back(DOC("_id" << i));
+ _mock->push_back(DOC("_id" << i));
}
}
@@ -160,7 +160,7 @@ TEST_F(SampleBasics, SampleEOFBeforeSource) {
*/
TEST_F(SampleBasics, DocsUnmodified) {
createSample(1);
- source()->queue.push_back(DOC("a" << 1 << "b" << DOC("c" << 2)));
+ source()->push_back(DOC("a" << 1 << "b" << DOC("c" << 2)));
auto next = sample()->getNext();
ASSERT_TRUE(next.isAdvanced());
auto doc = next.releaseDocument();
@@ -172,12 +172,12 @@ TEST_F(SampleBasics, DocsUnmodified) {
TEST_F(SampleBasics, ShouldPropagatePauses) {
createSample(2);
- source()->queue.push_back(Document());
- source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
- source()->queue.push_back(Document());
- source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
- source()->queue.push_back(Document());
- source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+ source()->push_back(Document());
+ source()->push_back(DocumentSource::GetNextResult::makePauseExecution());
+ source()->push_back(Document());
+ source()->push_back(DocumentSource::GetNextResult::makePauseExecution());
+ source()->push_back(Document());
+ source()->push_back(DocumentSource::GetNextResult::makePauseExecution());
// The $sample stage needs to populate itself, so should propagate all three pauses before
// returning any results.
@@ -276,7 +276,7 @@ TEST_F(SampleFromRandomCursorBasics, SampleEOFBeforeSource) {
*/
TEST_F(SampleFromRandomCursorBasics, DocsUnmodified) {
createSample(1);
- source()->queue.push_back(DOC("_id" << 1 << "b" << DOC("c" << 2)));
+ source()->push_back(DOC("_id" << 1 << "b" << DOC("c" << 2)));
auto next = sample()->getNext();
ASSERT_TRUE(next.isAdvanced());
auto doc = next.releaseDocument();
@@ -291,9 +291,9 @@ TEST_F(SampleFromRandomCursorBasics, DocsUnmodified) {
*/
TEST_F(SampleFromRandomCursorBasics, IgnoreDuplicates) {
createSample(2);
- source()->queue.push_back(DOC("_id" << 1));
- source()->queue.push_back(DOC("_id" << 1)); // Duplicate, should ignore.
- source()->queue.push_back(DOC("_id" << 2));
+ source()->push_back(DOC("_id" << 1));
+ source()->push_back(DOC("_id" << 1)); // Duplicate, should ignore.
+ source()->push_back(DOC("_id" << 2));
auto next = sample()->getNext();
ASSERT_TRUE(next.isAdvanced());
@@ -322,7 +322,7 @@ TEST_F(SampleFromRandomCursorBasics, IgnoreDuplicates) {
TEST_F(SampleFromRandomCursorBasics, TooManyDups) {
createSample(2);
for (int i = 0; i < 1000; i++) {
- source()->queue.push_back(DOC("_id" << 1));
+ source()->push_back(DOC("_id" << 1));
}
// First should be successful, it's not a duplicate.
@@ -338,14 +338,14 @@ TEST_F(SampleFromRandomCursorBasics, TooManyDups) {
TEST_F(SampleFromRandomCursorBasics, MissingIdField) {
// Once with only a bad document.
createSample(2); // _idField is '_id'.
- source()->queue.push_back(DOC("non_id" << 2));
+ source()->push_back(DOC("non_id" << 2));
ASSERT_THROWS_CODE(sample()->getNext(), AssertionException, 28793);
// Again, with some regular documents before a bad one.
createSample(2); // _idField is '_id'.
- source()->queue.push_back(DOC("_id" << 1));
- source()->queue.push_back(DOC("_id" << 1));
- source()->queue.push_back(DOC("non_id" << 2));
+ source()->push_back(DOC("_id" << 1));
+ source()->push_back(DOC("_id" << 1));
+ source()->push_back(DOC("non_id" << 2));
// First should be successful.
ASSERT_TRUE(sample()->getNext().isAdvanced());
@@ -367,8 +367,8 @@ TEST_F(SampleFromRandomCursorBasics, MimicNonOptimized) {
_sample = DocumentSourceSampleFromRandomCursor::create(getExpCtx(), 2, "_id", 3);
sample()->setSource(_mock.get());
- source()->queue.push_back(DOC("_id" << 1));
- source()->queue.push_back(DOC("_id" << 2));
+ source()->push_back(DOC("_id" << 1));
+ source()->push_back(DOC("_id" << 2));
auto doc = sample()->getNext();
ASSERT_TRUE(doc.isAdvanced());
@@ -395,8 +395,8 @@ DEATH_TEST_F(SampleFromRandomCursorBasics,
ShouldFailIfGivenPausedInput,
"Invariant failure Hit a MONGO_UNREACHABLE!") {
createSample(2);
- source()->queue.push_back(Document{{"_id", 1}});
- source()->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+ source()->push_back(Document{{"_id", 1}});
+ source()->push_back(DocumentSource::GetNextResult::makePauseExecution());
// Should see the first result, then see a pause and fail.
ASSERT_TRUE(sample()->getNext().isAdvanced());
diff --git a/src/mongo/db/update/SConscript b/src/mongo/db/update/SConscript
index 29a37605244..06aecc03122 100644
--- a/src/mongo/db/update/SConscript
+++ b/src/mongo/db/update/SConscript
@@ -92,7 +92,6 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/logical_clock',
- '$BUILD_DIR/mongo/db/pipeline/document_source_mock',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/update_index_data',
'update_common',
diff --git a/src/mongo/db/update/pipeline_executor.cpp b/src/mongo/db/update/pipeline_executor.cpp
index c058e8fb43f..ff2b581220c 100644
--- a/src/mongo/db/update/pipeline_executor.cpp
+++ b/src/mongo/db/update/pipeline_executor.cpp
@@ -32,7 +32,7 @@
#include "mongo/db/update/pipeline_executor.h"
#include "mongo/db/bson/dotted_path_support.h"
-#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_queue.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/update/object_replace_executor.h"
#include "mongo/db/update/storage_validation.h"
@@ -71,12 +71,12 @@ PipelineExecutor::PipelineExecutor(const boost::intrusive_ptr<ExpressionContext>
invariant(!stageConstraints.isIndependentOfAnyCollection);
}
- _pipeline->addInitialSource(DocumentSourceMock::create(expCtx));
+ _pipeline->addInitialSource(DocumentSourceQueue::create(expCtx));
}
UpdateExecutor::ApplyResult PipelineExecutor::applyUpdate(ApplyParams applyParams) const {
- DocumentSourceMock* mockStage = static_cast<DocumentSourceMock*>(_pipeline->peekFront());
- mockStage->queue.emplace_back(Document{applyParams.element.getDocument().getObject()});
+ DocumentSourceQueue* queueStage = static_cast<DocumentSourceQueue*>(_pipeline->peekFront());
+ queueStage->emplace_back(Document{applyParams.element.getDocument().getObject()});
auto transformedDoc = _pipeline->getNext()->toBson();
auto transformedDocHasIdField = transformedDoc.hasField(kIdFieldName);
@@ -84,4 +84,22 @@ UpdateExecutor::ApplyResult PipelineExecutor::applyUpdate(ApplyParams applyParam
applyParams, transformedDoc, transformedDocHasIdField);
}
+Value PipelineExecutor::serialize() const {
+ std::vector<Value> valueArray;
+ for (const auto& stage : _pipeline->getSources()) {
+ // The queue stage we add to adapt the pull-based '_pipeline' to our use case should not
+ // be serialized out. Firstly, this was not part of the user's pipeline and is just an
+ // implementation detail. It wouldn't have much value in exposing. Secondly, supporting
+ // a serialization that we can later re-parse is non trivial. See the comment in
+ // DocumentSourceQueue for more details.
+ if (typeid(*stage) == typeid(DocumentSourceQueue)) {
+ continue;
+ }
+
+ stage->serializeToArray(valueArray);
+ }
+
+ return Value(valueArray);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/update/pipeline_executor.h b/src/mongo/db/update/pipeline_executor.h
index 0fba04ad24b..2f10539c7e2 100644
--- a/src/mongo/db/update/pipeline_executor.h
+++ b/src/mongo/db/update/pipeline_executor.h
@@ -61,21 +61,7 @@ public:
*/
ApplyResult applyUpdate(ApplyParams applyParams) const final;
- Value serialize() const final {
- std::vector<Value> valueArray;
- for (const auto& stage : _pipeline->getSources()) {
- // TODO SERVER-40539: Consider subclassing DocumentSourceQueue with a class that is
- // explicitly skipped when serializing. With that change call Pipeline::serialize()
- // directly.
- if (stage->getSourceName() == "mock"_sd) {
- continue;
- }
-
- stage->serializeToArray(valueArray);
- }
-
- return Value(valueArray);
- }
+ Value serialize() const final;
private:
boost::intrusive_ptr<ExpressionContext> _expCtx;