summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source.h212
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h2
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp43
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h4
-rw-r--r--src/mongo/db/pipeline/document_source_current_op_test.cpp24
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h12
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h3
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h6
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp43
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h2
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_cursors.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_cursors.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp21
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h15
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp43
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp52
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_out.h3
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h16
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp15
-rw-r--r--src/mongo/db/pipeline/expression_context.h15
-rw-r--r--src/mongo/db/pipeline/expression_context_for_test.h7
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h210
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp26
-rw-r--r--src/mongo/db/pipeline/pipeline.h101
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp701
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h71
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h50
43 files changed, 941 insertions, 901 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 7ea9d967c4a..17622862af1 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -699,216 +699,4 @@ protected:
virtual ~SplittableDocumentSource() {}
};
-
-/**
- * This class marks DocumentSources which need functionality specific to a mongos or a mongod. It
- * causes a MongodProcessInterface to be injected when in a mongod and a MongosProcessInterface when
- * in a mongos.
- */
-class DocumentSourceNeedsMongoProcessInterface : public DocumentSource {
-public:
- /**
- * Any functionality needed by an aggregation stage that is either context specific to a mongod
- * or mongos process, or is only compiled in to one of those two binaries must be accessed via
- * this interface. This allows all DocumentSources to be parsed on either mongos or mongod, but
- * only executable where it makes sense.
- */
- class MongoProcessInterface {
- public:
- enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle };
- enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
- enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps };
-
- struct MakePipelineOptions {
- MakePipelineOptions(){};
-
- bool optimize = true;
- bool attachCursorSource = true;
-
- // Ordinarily, a MongoProcessInterface is injected into the pipeline at the point
- // when the cursor source is added. If true, 'forceInjectMongoProcessInterface' will
- // inject MongoProcessInterfaces into the pipeline even if 'attachCursorSource' is
- // false. If 'attachCursorSource' is true, then the value of
- // 'forceInjectMongoProcessInterface' is irrelevant.
- bool forceInjectMongoProcessInterface = false;
- };
-
- virtual ~MongoProcessInterface(){};
-
- /**
- * Sets the OperationContext of the DBDirectClient returned by directClient(). This method
- * must be called after updating the 'opCtx' member of the ExpressionContext associated with
- * the document source.
- */
- virtual void setOperationContext(OperationContext* opCtx) = 0;
-
- /**
- * Always returns a DBDirectClient. The return type in the function signature is a
- * DBClientBase* because DBDirectClient isn't linked into mongos.
- */
- virtual DBClientBase* directClient() = 0;
-
- // Note that in some rare cases this could return a false negative but will never return
- // a false positive. This method will be fixed in the future once it becomes possible to
- // avoid false negatives.
- virtual bool isSharded(const NamespaceString& ns) = 0;
-
- /**
- * Inserts 'objs' into 'ns' and returns the "detailed" last error object.
- */
- virtual BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) = 0;
-
- virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) = 0;
-
- /**
- * Appends operation latency statistics for collection "nss" to "builder"
- */
- virtual void appendLatencyStats(const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const = 0;
-
- /**
- * Appends storage statistics for collection "nss" to "builder"
- */
- virtual Status appendStorageStats(const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const = 0;
-
- /**
- * Appends the record count for collection "nss" to "builder".
- */
- virtual Status appendRecordCount(const NamespaceString& nss,
- BSONObjBuilder* builder) const = 0;
-
- /**
- * Gets the collection options for the collection given by 'nss'.
- */
- virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0;
-
- /**
- * Performs the given rename command if the collection given by 'targetNs' has the same
- * options as specified in 'originalCollectionOptions', and has the same indexes as
- * 'originalIndexes'.
- */
- virtual Status renameIfOptionsAndIndexesHaveNotChanged(
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) = 0;
-
- /**
- * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of
- * the returned pipeline will depend upon the supplied MakePipelineOptions:
- * - The boolean opts.optimize determines whether the pipeline will be optimized.
- * - If opts.attachCursorSource is false, the pipeline will be returned without attempting
- * to add an initial cursor source.
- * - If opts.forceInjectMongoProcessInterface is true, then a MongoProcessInterface will be
- * provided to each stage which requires one, regardless of whether a cursor source is
- * attached to the pipeline.
- *
- * This function returns a non-OK status if parsing the pipeline failed.
- */
- virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
-
- /**
- * Attaches a cursor source to the start of a pipeline. Performs no further optimization.
- * This function asserts if the collection to be aggregated is sharded. NamespaceNotFound
- * will be returned if ExpressionContext has a UUID and that UUID doesn't exist anymore.
- * That should be the only case where NamespaceNotFound is returned.
- */
- virtual Status attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
-
- /**
- * Returns a vector of owned BSONObjs, each of which contains details of an in-progress
- * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report
- * operations for all authenticated users; otherwise, report only the current user's
- * operations.
- */
- virtual std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
- CurrentOpUserMode userMode,
- CurrentOpTruncateMode) const = 0;
-
- /**
- * Returns the name of the local shard if sharding is enabled, or an empty string.
- */
- virtual std::string getShardName(OperationContext* opCtx) const = 0;
-
- /**
- * Returns the fields of the document key (in order) for the current collection, including
- * the shard key and _id. If _id is not in the shard key, it is added last.
- */
- virtual std::vector<FieldPath> collectDocumentKeyFields(UUID) const = 0;
-
- /**
- * Returns zero or one documents with the document key 'documentKey'. 'documentKey' is
- * treated as a unique identifier of a document, and may include an _id or all fields from
- * the shard key and an _id. Throws if more than one match was found. Returns boost::none if
- * no matching documents were found, including cases where the given namespace does not
- * exist.
- */
- virtual boost::optional<Document> lookupSingleDocument(
- const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) = 0;
-
- /**
- * Returns a vector of all local cursors.
- */
- virtual std::vector<GenericCursor> getCursors(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) const = 0;
-
- // Add new methods as needed.
- };
-
- DocumentSourceNeedsMongoProcessInterface(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx) {}
-
- void injectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {
- _mongoProcessInterface = mongoProcessInterface;
- doInjectMongoProcessInterface(mongoProcessInterface);
- }
-
- /**
- * Derived classes may override this method to register custom inject functionality.
- */
- virtual void doInjectMongoProcessInterface(
- std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {}
-
- void detachFromOperationContext() override {
- invariant(_mongoProcessInterface);
- _mongoProcessInterface->setOperationContext(nullptr);
- doDetachFromOperationContext();
- }
-
- /**
- * Derived classes may override this method to register custom detach functionality.
- */
- virtual void doDetachFromOperationContext() {}
-
- void reattachToOperationContext(OperationContext* opCtx) final {
- invariant(_mongoProcessInterface);
- _mongoProcessInterface->setOperationContext(opCtx);
- doReattachToOperationContext(opCtx);
- }
-
- /**
- * Derived classes may override this method to register custom reattach functionality.
- */
- virtual void doReattachToOperationContext(OperationContext* opCtx) {}
-
-protected:
- // It is invalid to delete through a DocumentSourceNeedsMongoProcessInterface-typed pointer.
- virtual ~DocumentSourceNeedsMongoProcessInterface() {}
-
- // Gives subclasses access to a MongoProcessInterface implementation
- std::shared_ptr<MongoProcessInterface> _mongoProcessInterface;
-};
-
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 537b32737da..95032ec3cae 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -433,7 +433,7 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
// the documentKey fields to include the shard key. We only need to re-check the documentKey
// while the collection is unsharded; if the collection is or becomes sharded, then the
// documentKey is final and will not change.
- if (_mongoProcess && !_documentKeyFieldsSharded) {
+ if (!_documentKeyFieldsSharded) {
// If this is not a shard server, 'catalogCache' will be nullptr and we will skip the
// routing table check.
auto catalogCache = Grid::get(_expCtx->opCtx)->catalogCache();
@@ -443,7 +443,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
return routingInfo.isOK() && routingInfo.getValue().cm();
}();
if (_documentKeyFields.empty() || collectionIsSharded) {
- _documentKeyFields = _mongoProcess->collectDocumentKeyFields(uuid.getUuid());
+ _documentKeyFields = _expCtx->mongoProcessInterface->collectDocumentKeyFields(
+ _expCtx->opCtx, _expCtx->ns, uuid.getUuid());
_documentKeyFieldsSharded = collectionIsSharded;
}
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 5a6e9d0dcf7..44c28591d7b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/uuid.h"
@@ -110,7 +111,9 @@ struct MockMongoProcessInterface final : public StubMongoProcessInterface {
MockMongoProcessInterface(std::vector<FieldPath> fields) : _fields(std::move(fields)) {}
- std::vector<FieldPath> collectDocumentKeyFields(UUID) const final {
+ std::vector<FieldPath> collectDocumentKeyFields(OperationContext*,
+ const NamespaceString&,
+ UUID) const final {
return _fields;
}
@@ -131,9 +134,8 @@ public:
vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry);
auto transform = stages[2].get();
- auto mongoProcess = std::make_shared<MockMongoProcessInterface>(docKeyFields);
- using NeedyDS = DocumentSourceNeedsMongoProcessInterface;
- dynamic_cast<NeedyDS&>(*transform).injectMongoProcessInterface(std::move(mongoProcess));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(docKeyFields);
auto next = transform->getNext();
// Match stage should pass the doc down if expectedDoc is given.
@@ -152,6 +154,8 @@ public:
list<intrusive_ptr<DocumentSource>> result =
DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(std::vector<FieldPath>{});
// This match stage is a DocumentSourceOplogMatch, which we explicitly disallow from
// executing as a safety mechanism, since it needs to use the collection-default collation,
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 0ef2c383990..6fc95e0cec0 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -91,9 +91,7 @@ intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResu
DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token)
- : DocumentSourceNeedsMongoProcessInterface(expCtx),
- _token(std::move(token)),
- _verifiedResumability(false) {}
+ : DocumentSource(expCtx), _token(std::move(token)), _verifiedResumability(false) {}
DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
pExpCtx->checkForInterrupt();
@@ -118,8 +116,8 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
// with the resume token.
auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
auto matchSpec = BSON("$match" << BSONObj());
- auto pipeline =
- uassertStatusOK(_mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
+ auto pipeline = uassertStatusOK(
+ pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
if (auto first = pipeline->getNext()) {
auto firstOplogEntry = Value(*first);
uassert(40576,
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 956135f9d7a..b06bae41da1 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -54,7 +54,7 @@ namespace mongo {
* This source need only run on a sharded collection. For unsharded collections,
* DocumentSourceEnsureResumeTokenPresent is sufficient.
*/
-class DocumentSourceShardCheckResumability final : public DocumentSourceNeedsMongoProcessInterface {
+class DocumentSourceShardCheckResumability final : public DocumentSource {
public:
GetNextResult getNext() final;
const char* getSourceName() const final;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index acf4f02e9e7..b4dce391492 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -218,11 +218,11 @@ public:
MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
@@ -259,8 +259,7 @@ TEST_F(ShardCheckResumabilityTest,
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID");
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
addDocument(resumeTimestamp, "ID");
// We should see the resume token.
auto result = shardCheckResumability->getNext();
@@ -276,8 +275,7 @@ TEST_F(ShardCheckResumabilityTest,
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID");
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
addDocument(resumeTimestamp, "ID");
// We should see the resume token.
auto result = shardCheckResumability->getNext();
@@ -291,8 +289,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIs
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID");
deque<DocumentSource::GetNextResult> mockOplog;
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
addDocument(resumeTimestamp, "ID");
// We should see the resume token.
auto result = shardCheckResumability->getNext();
@@ -308,8 +305,7 @@ TEST_F(ShardCheckResumabilityTest,
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0");
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
auto result = shardCheckResumability->getNext();
ASSERT_TRUE(result.isEOF());
}
@@ -321,8 +317,7 @@ TEST_F(ShardCheckResumabilityTest,
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0");
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576);
}
@@ -331,8 +326,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplo
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0");
deque<DocumentSource::GetNextResult> mockOplog;
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
auto result = shardCheckResumability->getNext();
ASSERT_TRUE(result.isEOF());
}
@@ -346,8 +340,7 @@ TEST_F(ShardCheckResumabilityTest,
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0");
addDocument(docTimestamp, "ID");
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
auto result = shardCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
@@ -363,8 +356,7 @@ TEST_F(ShardCheckResumabilityTest,
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0");
addDocument(docTimestamp, "ID");
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576);
}
@@ -377,16 +369,14 @@ TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0");
addDocument(docTimestamp, "ID");
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
auto result1 = shardCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
auto result2 = shardCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
@@ -401,8 +391,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAf
addDocument(docTimestamp, "ID");
deque<DocumentSource::GetNextResult> mockOplog(
{{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
auto result1 = shardCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
@@ -418,14 +407,12 @@ TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0");
deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}});
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
auto result1 = shardCheckResumability->getNext();
ASSERT_TRUE(result1.isEOF());
mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
- shardCheckResumability->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockOplog));
+ getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog);
auto result2 = shardCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index 3a6e5fc9591..7d05a806e98 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -106,7 +106,7 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() {
builder.append("ns", pExpCtx->ns.ns());
- auto shardName = _mongoProcessInterface->getShardName(pExpCtx->opCtx);
+ auto shardName = pExpCtx->mongoProcessInterface->getShardName(pExpCtx->opCtx);
if (!shardName.empty()) {
builder.append("shard", shardName);
@@ -121,14 +121,15 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() {
if (_collStatsSpec["latencyStats"].type() == BSONType::Object) {
includeHistograms = _collStatsSpec["latencyStats"]["histograms"].boolean();
}
- _mongoProcessInterface->appendLatencyStats(pExpCtx->ns, includeHistograms, &builder);
+ pExpCtx->mongoProcessInterface->appendLatencyStats(
+ pExpCtx->opCtx, pExpCtx->ns, includeHistograms, &builder);
}
if (_collStatsSpec.hasField("storageStats")) {
// If the storageStats field exists, it must have been validated as an object when parsing.
BSONObjBuilder storageBuilder(builder.subobjStart("storageStats"));
- Status status = _mongoProcessInterface->appendStorageStats(
- pExpCtx->ns, _collStatsSpec["storageStats"].Obj(), &storageBuilder);
+ Status status = pExpCtx->mongoProcessInterface->appendStorageStats(
+ pExpCtx->opCtx, pExpCtx->ns, _collStatsSpec["storageStats"].Obj(), &storageBuilder);
storageBuilder.doneFast();
if (!status.isOK()) {
uasserted(40280,
@@ -138,7 +139,8 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() {
}
if (_collStatsSpec.hasField("count")) {
- Status status = _mongoProcessInterface->appendRecordCount(pExpCtx->ns, &builder);
+ Status status = pExpCtx->mongoProcessInterface->appendRecordCount(
+ pExpCtx->opCtx, pExpCtx->ns, &builder);
if (!status.isOK()) {
uasserted(40481,
str::stream() << "Unable to retrieve count in $collStats stage: "
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index baae1fcb945..5b7bf719a14 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -36,7 +36,7 @@ namespace mongo {
* Provides a document source interface to retrieve collection-level statistics for a given
* collection.
*/
-class DocumentSourceCollStats : public DocumentSourceNeedsMongoProcessInterface {
+class DocumentSourceCollStats : public DocumentSource {
public:
class LiteParsed final : public LiteParsedDocumentSource {
public:
@@ -68,7 +68,7 @@ public:
};
DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongoProcessInterface(pExpCtx) {}
+ : DocumentSource(pExpCtx) {}
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
index 029f754eeae..c650f9ae0b9 100644
--- a/src/mongo/db/pipeline/document_source_current_op.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -90,13 +90,13 @@ DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() {
pExpCtx->checkForInterrupt();
if (_ops.empty()) {
- _ops = _mongoProcessInterface->getCurrentOps(
- _includeIdleConnections, _includeOpsFromAllUsers, _truncateOps);
+ _ops = pExpCtx->mongoProcessInterface->getCurrentOps(
+ pExpCtx->opCtx, _includeIdleConnections, _includeOpsFromAllUsers, _truncateOps);
_opsIter = _ops.begin();
if (pExpCtx->fromMongos) {
- _shardName = _mongoProcessInterface->getShardName(pExpCtx->opCtx);
+ _shardName = pExpCtx->mongoProcessInterface->getShardName(pExpCtx->opCtx);
uassert(40465,
"Aggregation request specified 'fromMongos' but unable to retrieve shard name "
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index 2e15403e52f..14e95983f70 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -32,7 +32,7 @@
namespace mongo {
-class DocumentSourceCurrentOp final : public DocumentSourceNeedsMongoProcessInterface {
+class DocumentSourceCurrentOp final : public DocumentSource {
public:
class LiteParsed final : public LiteParsedDocumentSource {
public:
@@ -100,7 +100,7 @@ private:
ConnMode includeIdleConnections = ConnMode::kExcludeIdle,
UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers,
TruncationMode truncateOps = TruncationMode::kNoTruncation)
- : DocumentSourceNeedsMongoProcessInterface(pExpCtx),
+ : DocumentSource(pExpCtx),
_includeIdleConnections(includeIdleConnections),
_includeOpsFromAllUsers(includeOpsFromAllUsers),
_truncateOps(truncateOps) {}
diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp
index 82853b2f007..cbafc412e22 100644
--- a/src/mongo/db/pipeline/document_source_current_op_test.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp
@@ -65,7 +65,8 @@ public:
MockMongoProcessInterfaceImplementation(bool hasShardName = true)
: _hasShardName(hasShardName) {}
- std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
+ CurrentOpConnectionsMode connMode,
CurrentOpUserMode userMode,
CurrentOpTruncateMode truncateMode) const {
return _ops;
@@ -183,9 +184,10 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDef
}
TEST_F(DocumentSourceCurrentOpTest, ShouldReturnEOFImmediatelyIfNoCurrentOps) {
+ getExpCtx()->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterfaceImplementation>();
+
const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
- const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>();
- currentOp->injectMongoProcessInterface(mongod);
ASSERT(currentOp->getNext().isEOF());
}
@@ -195,10 +197,10 @@ TEST_F(DocumentSourceCurrentOpTest,
getExpCtx()->fromMongos = true;
std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")};
- const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops);
+ getExpCtx()->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterfaceImplementation>(ops);
const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
- currentOp->injectMongoProcessInterface(mongod);
const auto expectedOutput =
Document{{"shard", kMockShardName},
@@ -213,10 +215,10 @@ TEST_F(DocumentSourceCurrentOpTest,
getExpCtx()->fromMongos = false;
std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")};
- const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops);
+ getExpCtx()->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterfaceImplementation>(ops);
const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
- currentOp->injectMongoProcessInterface(mongod);
const auto expectedOutput =
Document{{"client", std::string("192.168.1.10:50844")}, {"opid", 430}};
@@ -227,10 +229,10 @@ TEST_F(DocumentSourceCurrentOpTest,
TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRequest) {
getExpCtx()->fromMongos = true;
- const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(false);
+ getExpCtx()->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterfaceImplementation>(false);
const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
- currentOp->injectMongoProcessInterface(mongod);
ASSERT_THROWS_CODE(currentOp->getNext(), AssertionException, 40465);
}
@@ -239,10 +241,10 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfOpIDIsNonNumericWhenModifyingInS
getExpCtx()->fromMongos = true;
std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 'string' }")};
- const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops);
+ getExpCtx()->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterfaceImplementation>(ops);
const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx());
- currentOp->injectMongoProcessInterface(mongod);
ASSERT_THROWS_CODE(currentOp->getNext(), AssertionException, ErrorCodes::TypeMismatch);
}
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 1fa0ae1fc2a..050ffa9f331 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -57,7 +57,7 @@ using std::vector;
DocumentSourceFacet::DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines,
const intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceNeedsMongoProcessInterface(expCtx),
+ : DocumentSource(expCtx),
_teeBuffer(TeeBuffer::create(facetPipelines.size())),
_facets(std::move(facetPipelines)) {
for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
@@ -215,25 +215,13 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::optimize() {
return this;
}
-void DocumentSourceFacet::doInjectMongoProcessInterface(
- std::shared_ptr<MongoProcessInterface> pipelineContext) {
- for (auto&& facet : _facets) {
- for (auto&& stage : facet.pipeline->getSources()) {
- if (auto stageNeedingMongoProcessInterface =
- dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(stage.get())) {
- stageNeedingMongoProcessInterface->injectMongoProcessInterface(pipelineContext);
- }
- }
- }
-}
-
-void DocumentSourceFacet::doDetachFromOperationContext() {
+void DocumentSourceFacet::detachFromOperationContext() {
for (auto&& facet : _facets) {
facet.pipeline->detachFromOperationContext();
}
}
-void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) {
+void DocumentSourceFacet::reattachToOperationContext(OperationContext* opCtx) {
for (auto&& facet : _facets) {
facet.pipeline->reattachToOperationContext(opCtx);
}
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index ff7c12f4ae1..32248772188 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -55,15 +55,14 @@ class NamespaceString;
* stage which will produce a document like the following:
* {facetA: [<all input documents except the first one>], facetB: [<the first document>]}.
*/
-class DocumentSourceFacet final : public DocumentSourceNeedsMongoProcessInterface,
- public SplittableDocumentSource {
+class DocumentSourceFacet final : public DocumentSource, public SplittableDocumentSource {
public:
struct FacetPipeline {
- FacetPipeline(std::string name, std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline)
+ FacetPipeline(std::string name, std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
: name(std::move(name)), pipeline(std::move(pipeline)) {}
std::string name;
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline;
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
};
class LiteParsed : public LiteParsedDocumentSource {
@@ -136,9 +135,8 @@ public:
// The following are overridden just to forward calls to sub-pipelines.
void addInvolvedCollections(std::vector<NamespaceString>* collections) const final;
- void doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface>) final;
- void doDetachFromOperationContext() final;
- void doReattachToOperationContext(OperationContext* opCtx) final;
+ void detachFromOperationContext() final;
+ void reattachToOperationContext(OperationContext* opCtx) final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
protected:
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 1ed1f4a779c..c706a5ca31e 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -463,8 +463,22 @@ TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) {
ASSERT_TRUE(dummy->isOptimized);
}
-TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) {
+/**
+ * An implementation of the MongoProcessInterface that is okay with changing the OperationContext,
+ * but has no other parts of the interface implemented.
+ */
+class StubMongoProcessOkWithOpCtxChanges : public StubMongoProcessInterface {
+public:
+ void setOperationContext(OperationContext* opCtx) final {
+ return;
+ }
+};
+
+TEST_F(DocumentSourceFacetTest, ShouldPropagateDetachingAndReattachingOfOpCtx) {
auto ctx = getExpCtx();
+ // We're going to be changing the OperationContext, so we need to use a MongoProcessInterface
+ // that won't throw when we do so.
+ ctx->mongoProcessInterface = stdx::make_unique<StubMongoProcessOkWithOpCtxChanges>();
auto firstDummy = DocumentSourcePassthrough::create();
auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({firstDummy}, ctx));
@@ -480,12 +494,12 @@ TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) {
// Test detaching.
ASSERT_FALSE(firstDummy->isDetachedFromOpCtx);
ASSERT_FALSE(secondDummy->isDetachedFromOpCtx);
- facetStage->doDetachFromOperationContext();
+ facetStage->detachFromOperationContext();
ASSERT_TRUE(firstDummy->isDetachedFromOpCtx);
ASSERT_TRUE(secondDummy->isDetachedFromOpCtx);
// Test reattaching.
- facetStage->doReattachToOperationContext(ctx->opCtx);
+ facetStage->reattachToOperationContext(ctx->opCtx);
ASSERT_FALSE(firstDummy->isDetachedFromOpCtx);
ASSERT_FALSE(secondDummy->isDetachedFromOpCtx);
}
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index 4b81ff86bdf..39a31253573 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -185,7 +185,7 @@ BSONObj DocumentSourceGeoNear::buildGeoNearCmd() const {
void DocumentSourceGeoNear::runCommand() {
massert(16603, "Already ran geoNearCommand", !resultsIterator);
- bool ok = _mongoProcessInterface->directClient()->runCommand(
+ bool ok = pExpCtx->mongoProcessInterface->directClient()->runCommand(
pExpCtx->ns.db().toString(), buildGeoNearCmd(), cmdOutput);
if (!ok) {
uassertStatusOK(getStatusFromCommandResult(cmdOutput));
@@ -275,7 +275,7 @@ void DocumentSourceGeoNear::parseOptions(BSONObj options) {
}
DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongoProcessInterface(pExpCtx),
+ : DocumentSource(pExpCtx),
coordsIsArray(false),
limit(DocumentSourceGeoNear::kDefaultLimit),
maxDistance(-1.0),
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 45eda260dba..376e4a03ea0 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -33,8 +33,7 @@
namespace mongo {
-class DocumentSourceGeoNear : public DocumentSourceNeedsMongoProcessInterface,
- public SplittableDocumentSource {
+class DocumentSourceGeoNear : public DocumentSource, public SplittableDocumentSource {
public:
static const long long kDefaultLimit;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index a600097578c..07fac74d963 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -205,8 +205,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
// We've already allocated space for the trailing $match stage in '_fromPipeline'.
_fromPipeline.back() = *matchStage;
- auto pipeline =
- uassertStatusOK(_mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx));
+ auto pipeline = uassertStatusOK(
+ pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx));
while (auto next = pipeline->getNext()) {
uassert(40271,
str::stream()
@@ -436,11 +436,11 @@ void DocumentSourceGraphLookUp::serializeToArray(
}
}
-void DocumentSourceGraphLookUp::doDetachFromOperationContext() {
+void DocumentSourceGraphLookUp::detachFromOperationContext() {
_fromExpCtx->opCtx = nullptr;
}
-void DocumentSourceGraphLookUp::doReattachToOperationContext(OperationContext* opCtx) {
+void DocumentSourceGraphLookUp::reattachToOperationContext(OperationContext* opCtx) {
_fromExpCtx->opCtx = opCtx;
}
@@ -455,7 +455,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
boost::optional<FieldPath> depthField,
boost::optional<long long> maxDepth,
boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc)
- : DocumentSourceNeedsMongoProcessInterface(expCtx),
+ : DocumentSource(expCtx),
_from(std::move(from)),
_as(std::move(as)),
_connectFromField(std::move(connectFromField)),
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index d91c1675a81..9b5882e3f52 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -36,7 +36,7 @@
namespace mongo {
-class DocumentSourceGraphLookUp final : public DocumentSourceNeedsMongoProcessInterface {
+class DocumentSourceGraphLookUp final : public DocumentSource {
public:
static std::unique_ptr<LiteParsedDocumentSourceForeignCollections> liteParse(
const AggregationRequest& request, const BSONElement& spec);
@@ -73,9 +73,9 @@ public:
collections->push_back(_from);
}
- void doDetachFromOperationContext() final;
+ void detachFromOperationContext() final;
- void doReattachToOperationContext(OperationContext* opCtx) final;
+ void reattachToOperationContext(OperationContext* opCtx) final;
static boost::intrusive_ptr<DocumentSourceGraphLookUp> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index c0037a3caaf..6dc9849baff 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -56,12 +56,12 @@ using DocumentSourceGraphLookUpTest = AggregationContextFixture;
* A MongoProcessInterface use for testing that supports making pipelines with an initial
* DocumentSourceMock source.
*/
-class MockMongoProcessInterfaceImplementation final : public StubMongoProcessInterface {
+class MockMongoProcess final : public StubMongoProcessInterface {
public:
- MockMongoProcessInterfaceImplementation(std::deque<DocumentSource::GetNextResult> results)
+ MockMongoProcess(std::deque<DocumentSource::GetNextResult> results)
: _results(std::move(results)) {}
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
@@ -82,7 +82,7 @@ public:
}
Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) override {
+ Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_results));
return Status::OK();
}
@@ -94,7 +94,6 @@ private:
TEST_F(DocumentSourceGraphLookUpTest,
ShouldErrorWhenDoingInitialMatchIfDocumentInFromCollectionIsMissingId) {
auto expCtx = getExpCtx();
-
std::deque<DocumentSource::GetNextResult> inputs{Document{{"_id", 0}}};
auto inputMock = DocumentSourceMock::create(std::move(inputs));
@@ -102,6 +101,7 @@ TEST_F(DocumentSourceGraphLookUpTest,
NamespaceString fromNs("test", "graph_lookup");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
auto graphLookupStage =
DocumentSourceGraphLookUp::create(expCtx,
fromNs,
@@ -114,9 +114,6 @@ TEST_F(DocumentSourceGraphLookUpTest,
boost::none,
boost::none);
graphLookupStage->setSource(inputMock.get());
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
-
ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271);
}
@@ -132,6 +129,7 @@ TEST_F(DocumentSourceGraphLookUpTest,
NamespaceString fromNs("test", "graph_lookup");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
auto graphLookupStage =
DocumentSourceGraphLookUp::create(expCtx,
fromNs,
@@ -144,8 +142,6 @@ TEST_F(DocumentSourceGraphLookUpTest,
boost::none,
boost::none);
graphLookupStage->setSource(inputMock.get());
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271);
}
@@ -161,6 +157,7 @@ TEST_F(DocumentSourceGraphLookUpTest,
NamespaceString fromNs("test", "graph_lookup");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, boost::none);
auto graphLookupStage =
DocumentSourceGraphLookUp::create(expCtx,
@@ -173,8 +170,6 @@ TEST_F(DocumentSourceGraphLookUpTest,
boost::none,
boost::none,
unwindStage);
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
graphLookupStage->setSource(inputMock.get());
ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271);
@@ -205,6 +200,7 @@ TEST_F(DocumentSourceGraphLookUpTest,
NamespaceString fromNs("test", "graph_lookup");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
auto graphLookupStage =
DocumentSourceGraphLookUp::create(expCtx,
fromNs,
@@ -217,8 +213,6 @@ TEST_F(DocumentSourceGraphLookUpTest,
boost::none,
boost::none);
graphLookupStage->setSource(inputMock.get());
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
graphLookupStage->setSource(inputMock.get());
auto next = graphLookupStage->getNext();
@@ -270,6 +264,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePauses) {
NamespaceString fromNs("test", "foreign");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
auto graphLookupStage =
DocumentSourceGraphLookUp::create(expCtx,
fromNs,
@@ -284,9 +279,6 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePauses) {
graphLookupStage->setSource(inputMock.get());
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
-
auto next = graphLookupStage->getNext();
ASSERT_TRUE(next.isAdvanced());
@@ -338,6 +330,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePausesWhileUnwinding) {
NamespaceString fromNs("test", "foreign");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
const bool preserveNullAndEmptyArrays = false;
const boost::optional<std::string> includeArrayIndex = boost::none;
@@ -358,9 +351,6 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePausesWhileUnwinding) {
graphLookupStage->setSource(inputMock.get());
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
-
// Assert it has the expected results. Note the results can be in either order.
auto expectedA =
Document{{"startPoint", 0}, {"results", Document{{"_id", "a"_sd}, {"to", 0}, {"from", 1}}}};
@@ -403,6 +393,8 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupShouldReportAsFieldIsModified)
auto expCtx = getExpCtx();
NamespaceString fromNs("test", "foreign");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcess>(std::deque<DocumentSource::GetNextResult>{});
auto graphLookupStage =
DocumentSourceGraphLookUp::create(expCtx,
fromNs,
@@ -425,6 +417,8 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupShouldReportFieldsModifiedByAbs
auto expCtx = getExpCtx();
NamespaceString fromNs("test", "foreign");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcess>(std::deque<DocumentSource::GetNextResult>{});
auto unwindStage =
DocumentSourceUnwind::create(expCtx, "results", false, std::string("arrIndex"));
auto graphLookupStage =
@@ -455,6 +449,7 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupWithComparisonExpressionForStar
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
std::deque<DocumentSource::GetNextResult> fromContents{Document{{"_id", 0}, {"to", true}},
Document{{"_id", 1}, {"to", false}}};
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
auto graphLookupStage = DocumentSourceGraphLookUp::create(
expCtx,
@@ -472,8 +467,6 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupWithComparisonExpressionForStar
boost::none);
graphLookupStage->setSource(inputMock.get());
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
auto next = graphLookupStage->getNext();
ASSERT_TRUE(next.isAdvanced());
@@ -519,6 +512,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldExpandArraysAtEndOfConnectFromField)
NamespaceString fromNs("test", "graph_lookup");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
auto graphLookupStage =
DocumentSourceGraphLookUp::create(expCtx,
fromNs,
@@ -531,8 +525,6 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldExpandArraysAtEndOfConnectFromField)
boost::none,
boost::none);
graphLookupStage->setSource(inputMock.get());
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
graphLookupStage->setSource(inputMock.get());
auto next = graphLookupStage->getNext();
@@ -592,6 +584,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldNotExpandArraysWithinArraysAtEndOfCo
NamespaceString fromNs("test", "graph_lookup");
expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents));
auto graphLookupStage =
DocumentSourceGraphLookUp::create(expCtx,
fromNs,
@@ -604,8 +597,6 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldNotExpandArraysWithinArraysAtEndOfCo
boost::none,
boost::none);
graphLookupStage->setSource(inputMock.get());
- graphLookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents)));
graphLookupStage->setSource(inputMock.get());
auto next = graphLookupStage->getNext();
diff --git a/src/mongo/db/pipeline/document_source_index_stats.cpp b/src/mongo/db/pipeline/document_source_index_stats.cpp
index 5e12f7c72e8..dfb69786d18 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_index_stats.cpp
@@ -50,7 +50,7 @@ DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() {
pExpCtx->checkForInterrupt();
if (_indexStatsMap.empty()) {
- _indexStatsMap = _mongoProcessInterface->getIndexStats(pExpCtx->opCtx, pExpCtx->ns);
+ _indexStatsMap = pExpCtx->mongoProcessInterface->getIndexStats(pExpCtx->opCtx, pExpCtx->ns);
_indexStatsIter = _indexStatsMap.begin();
}
@@ -70,7 +70,7 @@ DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() {
}
DocumentSourceIndexStats::DocumentSourceIndexStats(const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongoProcessInterface(pExpCtx), _processName(getHostNameCachedAndPort()) {}
+ : DocumentSource(pExpCtx), _processName(getHostNameCachedAndPort()) {}
intrusive_ptr<DocumentSource> DocumentSourceIndexStats::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 592f7f02a0d..b485db6d748 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -37,7 +37,7 @@ namespace mongo {
* Provides a document source interface to retrieve index statistics for a given namespace.
* Each document returned represents a single index and mongod instance.
*/
-class DocumentSourceIndexStats final : public DocumentSourceNeedsMongoProcessInterface {
+class DocumentSourceIndexStats final : public DocumentSource {
public:
class LiteParsed final : public LiteParsedDocumentSource {
public:
diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.cpp b/src/mongo/db/pipeline/document_source_list_local_cursors.cpp
index 084cf03a264..0bafd966e71 100644
--- a/src/mongo/db/pipeline/document_source_list_local_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_list_local_cursors.cpp
@@ -73,5 +73,5 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceListLocalCursors::createFromB
DocumentSourceListLocalCursors::DocumentSourceListLocalCursors(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongoProcessInterface(pExpCtx) {}
+ : DocumentSource(pExpCtx), _cursors(pExpCtx->mongoProcessInterface->getCursors(pExpCtx)) {}
}
diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.h b/src/mongo/db/pipeline/document_source_list_local_cursors.h
index a349057fd86..4746c4933e7 100644
--- a/src/mongo/db/pipeline/document_source_list_local_cursors.h
+++ b/src/mongo/db/pipeline/document_source_list_local_cursors.h
@@ -44,7 +44,7 @@ namespace mongo {
* as true, and returns just sessions for the currently logged in user if
* 'allUsers' is specified as false, or not specified at all.
*/
-class DocumentSourceListLocalCursors final : public DocumentSourceNeedsMongoProcessInterface {
+class DocumentSourceListLocalCursors final : public DocumentSource {
public:
static const char* kStageName;
@@ -94,10 +94,6 @@ public:
return constraints;
}
- void doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongo) override {
- _cursors = mongo->getCursors(pExpCtx);
- }
-
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index bf8a890924f..e4fd70920f6 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -70,7 +70,7 @@ constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth;
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongoProcessInterface(pExpCtx),
+ : DocumentSource(pExpCtx),
_fromNs(std::move(fromNs)),
_as(std::move(as)),
_variables(pExpCtx->variables),
@@ -249,7 +249,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
return output.freeze();
}
-std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline(
+std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
const Document& inputDoc) {
// Copy all 'let' variables into the foreign pipeline's expression context.
copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
@@ -260,21 +260,18 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline
// If we don't have a cache, build and return the pipeline immediately.
if (!_cache || _cache->isAbandoned()) {
return uassertStatusOK(
- _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx));
+ pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx));
}
// Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
- // cursor source. If the cache is present and serving, then we will not be adding a cursor
- // source later, so inject a MongoProcessInterface into all stages that need one.
+ // cursor source.
MongoProcessInterface::MakePipelineOptions pipelineOpts;
-
pipelineOpts.optimize = false;
pipelineOpts.attachCursorSource = false;
- pipelineOpts.forceInjectMongoProcessInterface = _cache->isServing();
// Construct the basic pipeline without a cache stage.
auto pipeline = uassertStatusOK(
- _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
+ pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
// Add the cache stage at the end and optimize. During the optimization process, the cache will
// either move itself to the correct position in the pipeline, or will abandon itself if no
@@ -286,8 +283,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
- uassertStatusOK(
- _mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get()));
+ uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ _fromExpCtx, pipeline.get()));
}
// If the cache has been abandoned, release it.
@@ -712,7 +709,7 @@ DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker*
return SEE_NEXT;
}
-void DocumentSourceLookUp::doDetachFromOperationContext() {
+void DocumentSourceLookUp::detachFromOperationContext() {
if (_pipeline) {
// We have a pipeline we're going to be executing across multiple calls to getNext(), so we
// use Pipeline::detachFromOperationContext() to take care of updating '_fromExpCtx->opCtx'.
@@ -723,7 +720,7 @@ void DocumentSourceLookUp::doDetachFromOperationContext() {
}
}
-void DocumentSourceLookUp::doReattachToOperationContext(OperationContext* opCtx) {
+void DocumentSourceLookUp::reattachToOperationContext(OperationContext* opCtx) {
if (_pipeline) {
// We have a pipeline we're going to be executing across multiple calls to getNext(), so we
// use Pipeline::reattachToOperationContext() to take care of updating '_fromExpCtx->opCtx'.
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 4662b606aed..530c62f985c 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -45,8 +45,7 @@ namespace mongo {
* Queries separate collection for equality matches with documents in the pipeline collection.
* Adds matching documents to a new array field in the input document.
*/
-class DocumentSourceLookUp final : public DocumentSourceNeedsMongoProcessInterface,
- public SplittableDocumentSource {
+class DocumentSourceLookUp final : public DocumentSource, public SplittableDocumentSource {
public:
static constexpr size_t kMaxSubPipelineDepth = 20;
@@ -135,9 +134,9 @@ public:
collections->push_back(_fromNs);
}
- void doDetachFromOperationContext() final;
+ void detachFromOperationContext() final;
- void doReattachToOperationContext(OperationContext* opCtx) final;
+ void reattachToOperationContext(OperationContext* opCtx) final;
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
@@ -183,7 +182,7 @@ public:
return _variablesParseState;
}
- std::unique_ptr<Pipeline, Pipeline::Deleter> getSubPipeline_forTest(const Document& inputDoc) {
+ std::unique_ptr<Pipeline, PipelineDeleter> getSubPipeline_forTest(const Document& inputDoc) {
return buildPipeline(inputDoc);
}
@@ -267,7 +266,7 @@ private:
* Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
* cursor and/or cache source as appropriate.
*/
- std::unique_ptr<Pipeline, Pipeline::Deleter> buildPipeline(const Document& inputDoc);
+ std::unique_ptr<Pipeline, PipelineDeleter> buildPipeline(const Document& inputDoc);
/**
* The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that
@@ -319,7 +318,7 @@ private:
std::vector<BSONObj> _userPipeline;
// A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective
// functions. If sub-$lookup stages are present, their pipelines are constructed recursively.
- std::unique_ptr<Pipeline, Pipeline::Deleter> _parsedIntrospectionPipeline;
+ std::unique_ptr<Pipeline, PipelineDeleter> _parsedIntrospectionPipeline;
std::vector<LetVariable> _letVariables;
@@ -329,7 +328,7 @@ private:
// The following members are used to hold onto state across getNext() calls when '_unwindSrc' is
// not null.
long long _cursorIndex = 0;
- std::unique_ptr<Pipeline, Pipeline::Deleter> _pipeline;
+ std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
boost::optional<Document> _input;
boost::optional<Document> _nextValue;
};
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
index b616b5ec659..89af387cf42 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -112,8 +112,8 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat
<< resumeToken.getData().clusterTime))
: boost::none;
invariant(resumeToken.getData().uuid);
- auto lookedUpDoc = _mongoProcessInterface->lookupSingleDocument(
- nss, *resumeToken.getData().uuid, documentKey, readConcern);
+ auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocument(
+ pExpCtx, nss, *resumeToken.getData().uuid, documentKey, readConcern);
// Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may
// not have returned any results if the document was deleted in the time since the update op.
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index 2a4ccc972fb..416088fb742 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -40,7 +40,7 @@ namespace mongo {
* Uses the ExpressionContext to determine what collection to look up into.
* TODO SERVER-29134 When we allow change streams on multiple collections, this will need to change.
*/
-class DocumentSourceLookupChangePostImage final : public DocumentSourceNeedsMongoProcessInterface {
+class DocumentSourceLookupChangePostImage final : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalLookupChangePostImage"_sd;
static constexpr StringData kFullDocumentFieldName =
@@ -106,7 +106,7 @@ public:
private:
DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceNeedsMongoProcessInterface(expCtx) {}
+ : DocumentSource(expCtx) {}
/**
* Uses the "documentKey" field from 'updateOp' to look up the current version of the document.
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 1d003c9c09e..ffcfb6fb904 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -83,11 +83,11 @@ public:
MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final {
@@ -113,11 +113,12 @@ public:
return Status::OK();
}
- boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) {
- boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) {
auto swPipeline = makePipeline({BSON("$match" << documentKey)}, expCtx);
if (swPipeline == ErrorCodes::NamespaceNotFound) {
return boost::none;
@@ -158,8 +159,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyO
lookupChangeStage->setSource(mockLocalSource.get());
// Mock out the foreign collection.
- lookupChangeStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{});
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
}
@@ -180,8 +181,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationTyp
lookupChangeStage->setSource(mockLocalSource.get());
// Mock out the foreign collection.
- lookupChangeStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{});
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
}
@@ -202,8 +203,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) {
lookupChangeStage->setSource(mockLocalSource.get());
// Mock out the foreign collection.
- lookupChangeStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{});
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
}
@@ -224,8 +225,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType
lookupChangeStage->setSource(mockLocalSource.get());
// Mock out the foreign collection.
- lookupChangeStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{});
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
}
@@ -246,8 +247,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatch
lookupChangeStage->setSource(mockLocalSource.get());
// Mock out the foreign collection.
- lookupChangeStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{});
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579);
}
@@ -270,8 +271,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni
// Mock out the foreign collection to have two documents with the same document key.
deque<DocumentSource::GetNextResult> foreignCollection = {Document{{"_id", 0}},
Document{{"_id", 0}}};
- lookupChangeStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(std::move(foreignCollection)));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(std::move(foreignCollection));
ASSERT_THROWS_CODE(
lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments);
@@ -302,8 +303,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
// Mock out the foreign collection.
deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
Document{{"_id", 1}}};
- lookupChangeStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents)));
+ getExpCtx()->mongoProcessInterface =
+ stdx::make_unique<MockMongoProcessInterface>(std::move(mockForeignContents));
auto next = lookupChangeStage->getNext();
ASSERT_TRUE(next.isAdvanced());
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 587411e15f2..ea96b7f9353 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -493,11 +493,11 @@ public:
: _mockResults(std::move(mockResults)),
_removeLeadingQueryStages(removeLeadingQueryStages) {}
- bool isSharded(const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
@@ -564,8 +564,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) {
// Mock out the foreign collection.
deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
Document{{"_id", 1}}};
- lookup->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents)));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents));
auto next = lookup->getNext();
ASSERT_TRUE(next.isAdvanced());
@@ -619,8 +619,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) {
// Mock out the foreign collection.
deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
Document{{"_id", 1}}};
- lookup->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents)));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents));
auto next = lookup->getNext();
ASSERT_TRUE(next.isAdvanced());
@@ -710,8 +710,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) {
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- lookupStage->injectMongoProcessInterface(
- std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({})));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{});
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -744,8 +744,8 @@ TEST_F(DocumentSourceLookUpTest,
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- lookupStage->injectMongoProcessInterface(
- std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({})));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{});
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -774,8 +774,8 @@ TEST_F(DocumentSourceLookUpTest, ExprEmbeddedInMatchExpressionShouldBeOptimized)
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- lookupStage->injectMongoProcessInterface(
- std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({})));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{});
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -815,8 +815,8 @@ TEST_F(DocumentSourceLookUpTest,
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- lookupStage->injectMongoProcessInterface(
- std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({})));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{});
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -850,8 +850,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup)
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- lookupStage->injectMongoProcessInterface(
- std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({})));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{});
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -886,8 +886,8 @@ TEST_F(DocumentSourceLookUpTest,
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- lookupStage->injectMongoProcessInterface(
- std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({})));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{});
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -917,8 +917,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheEntirePipelineIfNonCorrelated) {
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- lookupStage->injectMongoProcessInterface(
- std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({})));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{});
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -959,8 +959,8 @@ TEST_F(DocumentSourceLookUpTest,
deque<DocumentSource::GetNextResult> mockForeignContents{
Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}};
- lookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockForeignContents));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(mockForeignContents);
// Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields.
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
@@ -1036,8 +1036,8 @@ TEST_F(DocumentSourceLookUpTest,
deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}},
Document{{"x", 1}}};
- lookupStage->injectMongoProcessInterface(
- std::make_shared<MockMongoProcessInterface>(mockForeignContents));
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoProcessInterface>(mockForeignContents);
// Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields.
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
@@ -1093,8 +1093,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl
const bool removeLeadingQueryStages = true;
- lookupStage->injectMongoProcessInterface(std::shared_ptr<MockMongoProcessInterface>(
- new MockMongoProcessInterface({}, removeLeadingQueryStages)));
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(
+ std::deque<DocumentSource::GetNextResult>{}, removeLeadingQueryStages);
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
ASSERT(subPipeline);
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 08dde0b4dc0..50678f82143 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -44,8 +44,9 @@ DocumentSourceOut::~DocumentSourceOut() {
// Make sure we drop the temp collection if anything goes wrong. Errors are ignored
// here because nothing can be done about them. Additionally, if this fails and the
// collection is left behind, it will be cleaned up next time the server is started.
- if (_mongoProcessInterface && _tempNs.size()) _mongoProcessInterface->directClient()
- ->dropCollection(_tempNs.ns());)
+ if (_tempNs.size()) {
+ pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns());
+ });
}
std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceOut::liteParse(
@@ -80,12 +81,11 @@ const char* DocumentSourceOut::getSourceName() const {
static AtomicUInt32 aggOutCounter;
void DocumentSourceOut::initialize() {
- invariant(_mongoProcessInterface);
- DBClientBase* conn = _mongoProcessInterface->directClient();
+ DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient();
// Save the original collection options and index specs so we can check they didn't change
// during computation.
- _originalOutOptions = _mongoProcessInterface->getCollectionOptions(_outputNs);
+ _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(_outputNs);
_originalIndexes = conn->getIndexSpecs(_outputNs.ns());
// Check if it's sharded or capped to make sure we have a chance of succeeding before we do all
@@ -95,7 +95,7 @@ void DocumentSourceOut::initialize() {
uassert(17017,
str::stream() << "namespace '" << _outputNs.ns()
<< "' is sharded so it can't be used for $out'",
- !_mongoProcessInterface->isSharded(_outputNs));
+ !pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs));
uassert(17152,
str::stream() << "namespace '" << _outputNs.ns()
<< "' is capped so it can't be used for $out",
@@ -145,7 +145,7 @@ void DocumentSourceOut::initialize() {
}
void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) {
- BSONObj err = _mongoProcessInterface->insert(_tempNs, toInsert);
+ BSONObj err = pExpCtx->mongoProcessInterface->insert(pExpCtx, _tempNs, toInsert);
uassert(16996,
str::stream() << "insert for $out failed: " << err,
DBClientBase::getLastErrorString(err).empty());
@@ -195,8 +195,8 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() {
BSON("renameCollection" << _tempNs.ns() << "to" << _outputNs.ns() << "dropTarget"
<< true);
- auto status = _mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged(
- renameCommandObj, _outputNs, _originalOutOptions, _originalIndexes);
+ auto status = pExpCtx->mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged(
+ pExpCtx->opCtx, renameCommandObj, _outputNs, _originalOutOptions, _originalIndexes);
uassert(16997, str::stream() << "$out failed: " << status.reason(), status.isOK());
// We don't need to drop the temp collection in our destructor if the rename succeeded.
@@ -212,7 +212,7 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() {
DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs,
const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongoProcessInterface(pExpCtx),
+ : DocumentSource(pExpCtx),
_done(false),
_tempNs(""), // Filled in during getNext().
_outputNs(outputNs) {}
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index a9d824a1731..26e49e9f54c 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -32,8 +32,7 @@
namespace mongo {
-class DocumentSourceOut final : public DocumentSourceNeedsMongoProcessInterface,
- public SplittableDocumentSource {
+class DocumentSourceOut final : public DocumentSource, public SplittableDocumentSource {
public:
static std::unique_ptr<LiteParsedDocumentSourceForeignCollections> liteParse(
const AggregationRequest& request, const BSONElement& spec);
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
index f48f5897add..136147c2d31 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
@@ -46,7 +46,7 @@ DocumentSourceSingleDocumentTransformation::DocumentSourceSingleDocumentTransfor
const intrusive_ptr<ExpressionContext>& pExpCtx,
std::unique_ptr<TransformerInterface> parsedTransform,
std::string name)
- : DocumentSourceNeedsMongoProcessInterface(pExpCtx),
+ : DocumentSource(pExpCtx),
_parsedTransform(std::move(parsedTransform)),
_name(std::move(name)) {}
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index d2f280f41d5..6b18e8cd74d 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -38,8 +38,7 @@ namespace mongo {
* a ParsedSingleDocumentTransformation. It is not a registered DocumentSource, and it cannot be
* created from BSON.
*/
-class DocumentSourceSingleDocumentTransformation final
- : public DocumentSourceNeedsMongoProcessInterface {
+class DocumentSourceSingleDocumentTransformation final : public DocumentSource {
public:
/**
* This class defines the minimal interface that every parser wishing to take advantage of
@@ -88,14 +87,7 @@ public:
return false;
}
- protected:
- MongoProcessInterface* _mongoProcess{nullptr};
-
private:
- void injectMongoProcess(MongoProcessInterface* p) {
- _mongoProcess = p;
- }
-
friend class DocumentSourceSingleDocumentTransformation;
};
@@ -112,12 +104,6 @@ public:
DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
GetModPathsReturn getModifiedPaths() const final;
- void doInjectMongoProcessInterface(
- std::shared_ptr<MongoProcessInterface> mongoProcessInterface) override {
-
- _parsedTransform->injectMongoProcess(mongoProcessInterface.get());
- }
-
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
StageConstraints constraints(
StreamType::kStreaming,
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 799a8b14fcb..361542fd2db 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/stub_mongo_process_interface.h"
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
@@ -43,6 +44,7 @@ ExpressionContext::ResolvedNamespace::ResolvedNamespace(NamespaceString ns,
ExpressionContext::ExpressionContext(OperationContext* opCtx,
const AggregationRequest& request,
std::unique_ptr<CollatorInterface> collator,
+ std::shared_ptr<MongoProcessInterface> processInterface,
StringMap<ResolvedNamespace> resolvedNamespaces)
: ExpressionContext(opCtx, collator.get()) {
explain = request.getExplain();
@@ -52,12 +54,15 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
allowDiskUse = request.shouldAllowDiskUse();
bypassDocumentValidation = request.shouldBypassDocumentValidation();
ns = request.getNamespaceString();
+ mongoProcessInterface = std::move(processInterface);
collation = request.getCollation();
_ownedCollator = std::move(collator);
_resolvedNamespaces = std::move(resolvedNamespaces);
}
+
ExpressionContext::ExpressionContext(OperationContext* opCtx, const CollatorInterface* collator)
: opCtx(opCtx),
+ mongoProcessInterface(std::make_shared<StubMongoProcessInterface>()),
timeZoneDatabase(opCtx && opCtx->getServiceContext()
? TimeZoneDatabase::get(opCtx->getServiceContext())
: nullptr),
@@ -66,6 +71,14 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, const CollatorInte
_documentComparator(_collator),
_valueComparator(_collator) {}
+ExpressionContext::ExpressionContext(NamespaceString nss,
+ std::shared_ptr<MongoProcessInterface> processInterface,
+ const TimeZoneDatabase* tzDb)
+ : ns(std::move(nss)),
+ mongoProcessInterface(std::move(processInterface)),
+ timeZoneDatabase(tzDb),
+ variablesParseState(variables.useIdGenerator()) {}
+
void ExpressionContext::checkForInterrupt() {
// This check could be expensive, at least in relative terms, so don't check every time.
if (--_interruptCounter == 0) {
@@ -127,7 +140,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(
boost::optional<UUID> uuid,
boost::optional<std::unique_ptr<CollatorInterface>> collator) const {
intrusive_ptr<ExpressionContext> expCtx =
- new ExpressionContext(std::move(ns), timeZoneDatabase);
+ new ExpressionContext(std::move(ns), mongoProcessInterface, timeZoneDatabase);
expCtx->uuid = std::move(uuid);
expCtx->explain = explain;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 3df31c97ddc..2bafa8253db 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -39,6 +39,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document_comparator.h"
+#include "mongo/db/pipeline/mongo_process_interface.h"
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/pipeline/variables.h"
#include "mongo/db/query/collation/collator_interface.h"
@@ -100,6 +101,7 @@ public:
ExpressionContext(OperationContext* opCtx,
const AggregationRequest& request,
std::unique_ptr<CollatorInterface> collator,
+ std::shared_ptr<MongoProcessInterface> mongoProcessInterface,
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces);
/**
@@ -180,6 +182,12 @@ public:
OperationContext* opCtx;
+ // An interface for accessing information or performing operations that have different
+ // implementations on mongod and mongos, or that only make sense on one of the two.
+ // Additionally, putting some of this functionality behind an interface prevents aggregation
+ // libraries from having large numbers of dependencies. This pointer is always non-null.
+ std::shared_ptr<MongoProcessInterface> mongoProcessInterface;
+
const TimeZoneDatabase* timeZoneDatabase;
// Collation requested by the user for this pipeline. Empty if the user did not request a
@@ -197,10 +205,9 @@ public:
protected:
static const int kInterruptCheckPeriod = 128;
- ExpressionContext(NamespaceString nss, const TimeZoneDatabase* tzDb)
- : ns(std::move(nss)),
- timeZoneDatabase(tzDb),
- variablesParseState(variables.useIdGenerator()) {}
+ ExpressionContext(NamespaceString nss,
+ std::shared_ptr<MongoProcessInterface>,
+ const TimeZoneDatabase* tzDb);
/**
* Sets '_ownedCollator' and resets '_collator', 'documentComparator' and 'valueComparator'.
diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h
index 2747862f52b..db7bbed9599 100644
--- a/src/mongo/db/pipeline/expression_context_for_test.h
+++ b/src/mongo/db/pipeline/expression_context_for_test.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/stub_mongo_process_interface.h"
#include "mongo/db/query/datetime/date_time_support.h"
#include "mongo/db/query/query_test_service_context.h"
@@ -48,7 +49,8 @@ public:
: ExpressionContextForTest(NamespaceString{"test"_sd, "namespace"_sd}) {}
ExpressionContextForTest(NamespaceString nss)
- : ExpressionContext(std::move(nss), kNullTimeZoneDatabase),
+ : ExpressionContext(
+ std::move(nss), std::make_shared<StubMongoProcessInterface>(), kNullTimeZoneDatabase),
_testOpCtx(_serviceContext.makeOperationContext()) {
TimeZoneDatabase::set(_serviceContext.getServiceContext(),
stdx::make_unique<TimeZoneDatabase>());
@@ -60,7 +62,8 @@ public:
}
ExpressionContextForTest(OperationContext* opCtx, const AggregationRequest& request)
- : ExpressionContext(opCtx, request, nullptr, {}) {}
+ : ExpressionContext(
+ opCtx, request, nullptr, std::make_shared<StubMongoProcessInterface>(), {}) {}
/**
* Sets the resolved definition for an involved namespace.
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
new file mode 100644
index 00000000000..d32e8276c63
--- /dev/null
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -0,0 +1,210 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/optional.hpp>
+#include <list>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/collection_index_usage_tracker.h"
+#include "mongo/db/generic_cursor.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/field_path.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/pipeline/value.h"
+#include "mongo/db/query/explain_options.h"
+
+namespace mongo {
+
+class ExpressionContext;
+class Pipeline;
+class PipelineDeleter;
+
+/**
+ * Any functionality needed by an aggregation stage that is either context specific to a mongod or
+ * mongos process, or is only compiled in to one of those two binaries must be accessed via this
+ * interface. This allows all DocumentSources to be parsed on either mongos or mongod, but only
+ * executable where it makes sense.
+ */
+class MongoProcessInterface {
+public:
+ enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle };
+ enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
+ enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps };
+
+ struct MakePipelineOptions {
+ MakePipelineOptions(){};
+
+ bool optimize = true;
+ bool attachCursorSource = true;
+ };
+
+ virtual ~MongoProcessInterface(){};
+
+ /**
+ * Sets the OperationContext of the DBDirectClient returned by directClient(). This method must
+ * be called after updating the 'opCtx' member of the ExpressionContext associated with the
+ * document source.
+ */
+ virtual void setOperationContext(OperationContext* opCtx) = 0;
+
+ /**
+ * Always returns a DBDirectClient. The return type in the function signature is a DBClientBase*
+ * because DBDirectClient isn't linked into mongos.
+ */
+ virtual DBClientBase* directClient() = 0;
+
+ /**
+ * Note that in some rare cases this could return a false negative but will never return a false
+ * positive. This method will be fixed in the future once it becomes possible to avoid false
+ * negatives.
+ */
+ virtual bool isSharded(OperationContext* opCtx, const NamespaceString& ns) = 0;
+
+ /**
+ * Inserts 'objs' into 'ns' and returns the "detailed" last error object.
+ */
+ virtual BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) = 0;
+
+ virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) = 0;
+
+ /**
+ * Appends operation latency statistics for collection "nss" to "builder"
+ */
+ virtual void appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const = 0;
+
+ /**
+ * Appends storage statistics for collection "nss" to "builder"
+ */
+ virtual Status appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const = 0;
+
+ /**
+ * Appends the record count for collection "nss" to "builder".
+ */
+ virtual Status appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const = 0;
+
+ /**
+ * Gets the collection options for the collection given by 'nss'.
+ */
+ virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0;
+
+ /**
+ * Performs the given rename command if the collection given by 'targetNs' has the same options
+ * as specified in 'originalCollectionOptions', and has the same indexes as 'originalIndexes'.
+ */
+ virtual Status renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) = 0;
+
+ /**
+ * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of the
+ * returned pipeline will depend upon the supplied MakePipelineOptions:
+ * - The boolean opts.optimize determines whether the pipeline will be optimized.
+ * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to
+ * add an initial cursor source.
+ *
+ * This function returns a non-OK status if parsing the pipeline failed.
+ */
+ virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
+
+ /**
+ * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This
+ * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be
+ * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
+ * the only case where NamespaceNotFound is returned.
+ */
+ virtual Status attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
+
+ /**
+ * Returns a vector of owned BSONObjs, each of which contains details of an in-progress
+ * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report
+ * operations for all authenticated users; otherwise, report only the current user's operations.
+ */
+ virtual std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
+ CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode,
+ CurrentOpTruncateMode) const = 0;
+
+ /**
+ * Returns the name of the local shard if sharding is enabled, or an empty string.
+ */
+ virtual std::string getShardName(OperationContext* opCtx) const = 0;
+
+ /**
+ * Returns the fields of the document key (in order) for the collection given by 'nss' and
+ * 'UUID', including the shard key and _id. If _id is not in the shard key, it is added last.
+ */
+ virtual std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx,
+ const NamespaceString& nss,
+ UUID uuid) const = 0;
+
+ /**
+ * Returns zero or one documents with the document key 'documentKey'. 'documentKey' is treated
+ * as a unique identifier of a document, and may include an _id or all fields from the shard key
+ * and an _id. Throws if more than one match was found. Returns boost::none if no matching
+ * documents were found, including cases where the given namespace does not exist.
+ */
+ virtual boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) = 0;
+
+ /**
+ * Returns a vector of all local cursors.
+ */
+ virtual std::vector<GenericCursor> getCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 6f71c129707..4e12ba4358d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -78,17 +78,17 @@ Pipeline::~Pipeline() {
invariant(_disposed);
}
-StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse(
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parse(
const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false);
}
-StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseFacetPipeline(
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseFacetPipeline(
const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true);
}
-StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseTopLevelOrFacetPipeline(
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOrFacetPipeline(
const std::vector<BSONObj>& rawPipeline,
const intrusive_ptr<ExpressionContext>& expCtx,
const bool isFacetPipeline) {
@@ -103,22 +103,22 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseTopLevel
return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline);
}
-StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create(
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::create(
SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false);
}
-StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::createFacetPipeline(
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createFacetPipeline(
SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true);
}
-StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::createTopLevelOrFacetPipeline(
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createTopLevelOrFacetPipeline(
SourceContainer stages,
const intrusive_ptr<ExpressionContext>& expCtx,
const bool isFacetPipeline) {
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(std::move(stages), expCtx),
- Pipeline::Deleter(expCtx->opCtx));
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx),
+ PipelineDeleter(expCtx->opCtx));
try {
if (isFacetPipeline) {
pipeline->validateFacetPipeline();
@@ -276,6 +276,7 @@ bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) {
void Pipeline::detachFromOperationContext() {
pCtx->opCtx = nullptr;
+ pCtx->mongoProcessInterface->setOperationContext(nullptr);
for (auto&& source : _sources) {
source->detachFromOperationContext();
@@ -284,6 +285,7 @@ void Pipeline::detachFromOperationContext() {
void Pipeline::reattachToOperationContext(OperationContext* opCtx) {
pCtx->opCtx = opCtx;
+ pCtx->mongoProcessInterface->setOperationContext(opCtx);
for (auto&& source : _sources) {
source->reattachToOperationContext(opCtx);
@@ -307,7 +309,7 @@ void Pipeline::dispose(OperationContext* opCtx) {
}
}
-std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
+std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() {
invariant(!isSplitForShards());
invariant(!isSplitForMerge());
invariant(!_unsplitSources);
@@ -315,8 +317,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
// Create and initialize the shard spec we'll return. We start with an empty pipeline on the
// shards and all work being done in the merger. Optimizations can move operations between
// the pipelines to be more efficient.
- std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipeline(new Pipeline(pCtx),
- Pipeline::Deleter(pCtx->opCtx));
+ std::unique_ptr<Pipeline, PipelineDeleter> shardPipeline(new Pipeline(pCtx),
+ PipelineDeleter(pCtx->opCtx));
// Keep a copy of the original source list in case we need to reset the pipeline from split to
// unsplit later.
@@ -337,7 +339,7 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
}
void Pipeline::unsplitFromSharded(
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMergingShard) {
invariant(isSplitForShards());
invariant(!isSplitForMerge());
invariant(pipelineForMergingShard);
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 51cf4c34da0..6aba6d38d66 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -50,6 +50,7 @@ class ExpressionContext;
class DocumentSource;
class CollatorInterface;
class OperationContext;
+class PipelineDeleter;
/**
* A Pipeline object represents a list of DocumentSources and is responsible for optimizing the
@@ -66,46 +67,6 @@ public:
enum class SplitState { kUnsplit, kSplitForShards, kSplitForMerge };
/**
- * This class will ensure a Pipeline is disposed before it is deleted.
- */
- class Deleter {
- public:
- /**
- * Constructs an empty deleter. Useful for creating a
- * unique_ptr<Pipeline, Pipeline::Deleter> without populating it.
- */
- Deleter() {}
-
- explicit Deleter(OperationContext* opCtx) : _opCtx(opCtx) {}
-
- /**
- * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume
- * responsibility for calling PlanExecutor::dispose(), they can call dismissDisposal(). If
- * dismissed, a Deleter will not call dispose() when deleting the PlanExecutor.
- */
- void dismissDisposal() {
- _dismissed = true;
- }
-
- /**
- * Calls dispose() on 'pipeline', unless this Deleter has been dismissed.
- */
- void operator()(Pipeline* pipeline) {
- // It is illegal to call this method on a default-constructed Deleter.
- invariant(_opCtx);
- if (!_dismissed) {
- pipeline->dispose(_opCtx);
- }
- delete pipeline;
- }
-
- private:
- OperationContext* _opCtx = nullptr;
-
- bool _dismissed = false;
- };
-
- /**
* List of supported match expression features in a pipeline.
*/
static constexpr MatchExpressionParser::AllowedFeatureSet kAllowedMatcherFeatures =
@@ -122,7 +83,7 @@ public:
* will not be used during execution of the pipeline. Doing so may cause comparisons made during
* parse-time to return the wrong results.
*/
- static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parse(
+ static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parse(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -133,7 +94,7 @@ public:
* optimized, but the caller may convert it to an optimized pipeline by calling
* optimizePipeline().
*/
- static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parseFacetPipeline(
+ static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parseFacetPipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -143,7 +104,7 @@ public:
* Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage
* is present but is not the last stage.
*/
- static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> create(
+ static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> create(
SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
@@ -152,7 +113,7 @@ public:
* Returns a non-OK status if any stage is invalid. For example, if the pipeline is empty or if
* any stage is an initial source.
*/
- static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> createFacetPipeline(
+ static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> createFacetPipeline(
SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
@@ -185,7 +146,7 @@ public:
* Must be called before deleting a Pipeline.
*
* There are multiple cleanup scenarios:
- * - This Pipeline will only ever use one OperationContext. In this case the Pipeline::Deleter
+ * - This Pipeline will only ever use one OperationContext. In this case the PipelineDeleter
* will automatically call dispose() before deleting the Pipeline, and the owner need not
* call dispose().
* - This Pipeline may use multiple OperationContexts over its lifetime. In this case it
@@ -199,7 +160,7 @@ public:
* results within mongos. This permanently alters this pipeline for the merging operation, and
* returns a Pipeline object that should be executed on each targeted shard.
*/
- std::unique_ptr<Pipeline, Pipeline::Deleter> splitForSharded();
+ std::unique_ptr<Pipeline, PipelineDeleter> splitForSharded();
/**
* Reassemble a split shard pipeline into its original form. Upon return, this pipeline will
@@ -207,7 +168,7 @@ public:
* returned by a call to splitForSharded(). It is an error to call this on the merge part of the
* pipeline, or on a pipeline that has not been split.
*/
- void unsplitFromSharded(std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard);
+ void unsplitFromSharded(std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMergingShard);
/**
* Returns true if this pipeline has not been split.
@@ -334,12 +295,13 @@ private:
};
friend class Optimizations::Sharded;
+ friend class PipelineDeleter;
/**
* Used by both Pipeline::parse() and Pipeline::parseFacetPipeline() to build and validate the
* pipeline.
*/
- static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parseTopLevelOrFacetPipeline(
+ static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parseTopLevelOrFacetPipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const bool isFacetPipeline);
@@ -348,7 +310,7 @@ private:
* Used by both Pipeline::create() and Pipeline::createFacetPipeline() to build and validate the
* pipeline.
*/
- static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> createTopLevelOrFacetPipeline(
+ static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> createTopLevelOrFacetPipeline(
SourceContainer sources,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const bool isSubPipeline);
@@ -410,4 +372,45 @@ private:
boost::intrusive_ptr<ExpressionContext> pCtx;
bool _disposed = false;
};
+
+/**
+ * This class will ensure a Pipeline is disposed before it is deleted.
+ */
+class PipelineDeleter {
+public:
+ /**
+ * Constructs an empty deleter. Useful for creating a
+ * unique_ptr<Pipeline, PipelineDeleter> without populating it.
+ */
+ PipelineDeleter() {}
+
+ explicit PipelineDeleter(OperationContext* opCtx) : _opCtx(opCtx) {}
+
+ /**
+ * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume
+ * responsibility for calling PlanExecutor::dispose(), they can call dismissDisposal(). If
+ * dismissed, a PipelineDeleter will not call dispose() when deleting the PlanExecutor.
+ */
+ void dismissDisposal() {
+ _dismissed = true;
+ }
+
+ /**
+ * Calls dispose() on 'pipeline', unless this PipelineDeleter has been dismissed.
+ */
+ void operator()(Pipeline* pipeline) {
+ // It is illegal to call this method on a default-constructed PipelineDeleter.
+ invariant(_opCtx);
+ if (!_dismissed) {
+ pipeline->dispose(_opCtx);
+ }
+ delete pipeline;
+ }
+
+private:
+ OperationContext* _opCtx = nullptr;
+
+ bool _dismissed = false;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index f19c9aa3abf..2638e7c9a82 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -95,360 +95,6 @@ using std::unique_ptr;
namespace {
-class MongodProcessInterface final
- : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface {
-public:
- MongodProcessInterface(const intrusive_ptr<ExpressionContext>& ctx)
- : _ctx(ctx), _client(ctx->opCtx) {}
-
- void setOperationContext(OperationContext* opCtx) {
- invariant(_ctx->opCtx == opCtx);
- _client.setOpCtx(opCtx);
- }
-
- DBClientBase* directClient() final {
- return &_client;
- }
-
- bool isSharded(const NamespaceString& nss) final {
- AutoGetCollectionForReadCommand autoColl(_ctx->opCtx, nss);
- // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
- // state.
- auto css = CollectionShardingState::get(_ctx->opCtx, nss);
- return bool(css->getMetadata());
- }
-
- BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final {
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (_ctx->bypassDocumentValidation)
- maybeDisableValidation.emplace(_ctx->opCtx);
-
- _client.insert(ns.ns(), objs);
- return _client.getLastErrorDetailed();
- }
-
- CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) final {
- AutoGetCollectionForReadCommand autoColl(opCtx, ns);
-
- Collection* collection = autoColl.getCollection();
- if (!collection) {
- LOG(2) << "Collection not found on index stats retrieval: " << ns.ns();
- return CollectionIndexUsageMap();
- }
-
- return collection->infoCache()->getIndexUsageStats();
- }
-
- void appendLatencyStats(const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const final {
- Top::get(_ctx->opCtx->getServiceContext())
- .appendLatencyStats(nss.ns(), includeHistograms, builder);
- }
-
- Status appendStorageStats(const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const final {
- return appendCollectionStorageStats(_ctx->opCtx, nss, param, builder);
- }
-
- Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const final {
- return appendCollectionRecordCount(_ctx->opCtx, nss, builder);
- }
-
- BSONObj getCollectionOptions(const NamespaceString& nss) final {
- const auto infos =
- _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
- }
-
- Status renameIfOptionsAndIndexesHaveNotChanged(
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) final {
- Lock::GlobalWrite globalLock(_ctx->opCtx);
-
- if (SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions !=
- getCollectionOptions(targetNs))) {
- return {ErrorCodes::CommandFailed,
- str::stream() << "collection options of target collection " << targetNs.ns()
- << " changed during processing. Original options: "
- << originalCollectionOptions
- << ", new options: "
- << getCollectionOptions(targetNs)};
- }
-
- auto currentIndexes = _client.getIndexSpecs(targetNs.ns());
- if (originalIndexes.size() != currentIndexes.size() ||
- !std::equal(originalIndexes.begin(),
- originalIndexes.end(),
- currentIndexes.begin(),
- SimpleBSONObjComparator::kInstance.makeEqualTo())) {
- return {ErrorCodes::CommandFailed,
- str::stream() << "indexes of target collection " << targetNs.ns()
- << " changed during processing."};
- }
-
- BSONObj info;
- bool ok = _client.runCommand("admin", renameCommandObj, info);
- return ok ? Status::OK() : Status{ErrorCodes::CommandFailed,
- str::stream() << "renameCollection failed: " << info};
- }
-
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts = MakePipelineOptions{}) final {
- // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace
- // than the DocumentSource this MongodProcessInterface is injected into, but both
- // ExpressionContext instances should still have the same OperationContext.
- invariant(_ctx->opCtx == expCtx->opCtx);
-
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
-
- if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
- }
-
- Status cursorStatus = Status::OK();
-
- if (opts.attachCursorSource) {
- cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
- } else if (opts.forceInjectMongoProcessInterface) {
- PipelineD::injectMongodInterface(pipeline.getValue().get());
- }
-
- return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
- }
-
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
- invariant(_ctx->opCtx == expCtx->opCtx);
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
-
- boost::optional<AutoGetCollectionForReadCommand> autoColl;
- if (expCtx->uuid) {
- autoColl.emplace(expCtx->opCtx, expCtx->ns.db(), *expCtx->uuid);
- if (autoColl->getCollection() == nullptr) {
- // The UUID doesn't exist anymore.
- return {ErrorCodes::NamespaceNotFound,
- "No namespace with UUID " + expCtx->uuid->toString()};
- }
- } else {
- autoColl.emplace(expCtx->opCtx, expCtx->ns);
- }
-
- // makePipeline() is only called to perform secondary aggregation requests and expects the
- // collection representing the document source to be not-sharded. We confirm sharding state
- // here to avoid taking a collection lock elsewhere for this purpose alone.
- // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor
- // until after we release the lock, leaving room for a collection to be sharded inbetween.
- // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
- // state.
- auto css = CollectionShardingState::get(_ctx->opCtx, expCtx->ns);
- uassert(4567,
- str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded",
- !bool(css->getMetadata()));
-
- PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
- // Optimize again, since there may be additional optimizations that can be done after adding
- // the initial cursor stage.
- pipeline->optimizePipeline();
-
- return Status::OK();
- }
-
- std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
- CurrentOpUserMode userMode,
- CurrentOpTruncateMode truncateMode) const {
- AuthorizationSession* ctxAuth = AuthorizationSession::get(_ctx->opCtx->getClient());
-
- const std::string hostName = getHostNameCachedAndPort();
-
- std::vector<BSONObj> ops;
-
- for (ServiceContext::LockedClientsCursor cursor(
- _ctx->opCtx->getClient()->getServiceContext());
- Client* client = cursor.next();) {
- invariant(client);
-
- stdx::lock_guard<Client> lk(*client);
-
- // If auth is disabled, ignore the allUsers parameter.
- if (ctxAuth->getAuthorizationManager().isAuthEnabled() &&
- userMode == CurrentOpUserMode::kExcludeOthers &&
- !ctxAuth->isCoauthorizedWithClient(client)) {
- continue;
- }
-
- const OperationContext* clientOpCtx = client->getOperationContext();
-
- if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) {
- continue;
- }
-
- BSONObjBuilder infoBuilder;
-
- infoBuilder.append("host", hostName);
-
- client->reportState(infoBuilder);
-
- const auto& clientMetadata =
- ClientMetadataIsMasterState::get(client).getClientMetadata();
-
- if (clientMetadata) {
- auto appName = clientMetadata.get().getApplicationName();
- if (!appName.empty()) {
- infoBuilder.append("appName", appName);
- }
-
- auto clientMetadataDocument = clientMetadata.get().getDocument();
- infoBuilder.append("clientMetadata", clientMetadataDocument);
- }
-
- // Fill out the rest of the BSONObj with opCtx specific details.
- infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx));
- infoBuilder.append(
- "currentOpTime",
- _ctx->opCtx->getServiceContext()->getPreciseClockSource()->now().toString());
-
- if (clientOpCtx) {
- infoBuilder.append("opid", clientOpCtx->getOpID());
- if (clientOpCtx->isKillPending()) {
- infoBuilder.append("killPending", true);
- }
-
- if (clientOpCtx->getLogicalSessionId()) {
- BSONObjBuilder bob(infoBuilder.subobjStart("lsid"));
- clientOpCtx->getLogicalSessionId()->serialize(&bob);
- }
-
- CurOp::get(clientOpCtx)
- ->reportState(&infoBuilder,
- (truncateMode == CurrentOpTruncateMode::kTruncateOps));
-
- Locker::LockerInfo lockerInfo;
- clientOpCtx->lockState()->getLockerInfo(&lockerInfo);
- fillLockerInfo(lockerInfo, infoBuilder);
- }
-
- ops.emplace_back(infoBuilder.obj());
- }
-
- return ops;
- }
-
- std::string getShardName(OperationContext* opCtx) const {
- if (ShardingState::get(opCtx)->enabled()) {
- return ShardingState::get(opCtx)->getShardName();
- }
-
- return std::string();
- }
-
- std::vector<FieldPath> collectDocumentKeyFields(UUID uuid) const final {
- if (!ShardingState::get(_ctx->opCtx)->enabled()) {
- return {"_id"}; // Nothing is sharded.
- }
-
- auto scm = [this]() -> ScopedCollectionMetadata {
- AutoGetCollection autoColl(_ctx->opCtx, _ctx->ns, MODE_IS);
- return CollectionShardingState::get(_ctx->opCtx, _ctx->ns)->getMetadata();
- }();
-
- if (!scm) {
- return {"_id"}; // Collection is not sharded.
- }
-
- uassert(ErrorCodes::InvalidUUID,
- str::stream() << "Collection " << _ctx->ns.ns()
- << " UUID differs from UUID on change stream operations",
- scm->uuidMatches(uuid));
-
- // Unpack the shard key.
- std::vector<FieldPath> result;
- bool gotId = false;
- for (auto& field : scm->getKeyPatternFields()) {
- result.emplace_back(field->dottedField());
- gotId |= (result.back().fullPath() == "_id");
- }
- if (!gotId) { // If not part of the shard key, "_id" comes last.
- result.emplace_back("_id");
- }
- return result;
- }
-
- boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) final {
- invariant(!readConcern); // We don't currently support a read concern on mongod - it's only
- // expected to be necessary on mongos.
- //
- // Be sure to do the lookup using the collection default collation.
- auto foreignExpCtx =
- _ctx->copyWith(nss, collectionUUID, _getCollectionDefaultCollator(nss, collectionUUID));
- auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
- if (swPipeline == ErrorCodes::NamespaceNotFound) {
- return boost::none;
- }
- auto pipeline = uassertStatusOK(std::move(swPipeline));
-
- auto lookedUpDocument = pipeline->getNext();
- if (auto next = pipeline->getNext()) {
- uasserted(ErrorCodes::TooManyMatchingDocuments,
- str::stream() << "found more than one document with document key "
- << documentKey.toString()
- << " ["
- << lookedUpDocument->toString()
- << ", "
- << next->toString()
- << "]");
- }
- return lookedUpDocument;
- }
-
- std::vector<GenericCursor> getCursors(const intrusive_ptr<ExpressionContext>& expCtx) const {
- return CursorManager::getAllCursors(expCtx->opCtx);
- }
-
-private:
- /**
- * Looks up the collection default collator for the collection given by 'collectionUUID'. A
- * collection's default collation is not allowed to change, so we cache the result to allow for
- * quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr' if the
- * collection does not exist or if the collection's default collation is the simple collation.
- */
- std::unique_ptr<CollatorInterface> _getCollectionDefaultCollator(const NamespaceString& nss,
- UUID collectionUUID) {
- if (_collatorCache.find(collectionUUID) == _collatorCache.end()) {
- AutoGetCollection autoColl(_ctx->opCtx, nss, collectionUUID, MODE_IS);
- if (!autoColl.getCollection()) {
- // This collection doesn't exist - since we looked up by UUID, it will never exist
- // in the future, so we cache a null pointer as the default collation.
- _collatorCache[collectionUUID] = nullptr;
- } else {
- auto defaultCollator = autoColl.getCollection()->getDefaultCollator();
- // Clone the collator so that we can safely use the pointer if the collection
- // disappears right after we release the lock.
- _collatorCache[collectionUUID] =
- defaultCollator ? defaultCollator->clone() : nullptr;
- }
- }
- return _collatorCache[collectionUUID] ? _collatorCache[collectionUUID]->clone() : nullptr;
- }
-
- intrusive_ptr<ExpressionContext> _ctx;
- DBDirectClient _client;
- std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache;
-};
-
/**
* Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {}
* if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough
@@ -580,16 +226,6 @@ BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) {
}
} // namespace
-void PipelineD::injectMongodInterface(Pipeline* pipeline) {
- for (auto&& source : pipeline->_sources) {
- if (auto needsMongod =
- dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get())) {
- needsMongod->injectMongoProcessInterface(
- std::make_shared<MongodProcessInterface>(pipeline->getContext()));
- }
- }
-}
-
void PipelineD::prepareCursorSource(Collection* collection,
const NamespaceString& nss,
const AggregationRequest* aggRequest,
@@ -599,9 +235,6 @@ void PipelineD::prepareCursorSource(Collection* collection,
// We will be modifying the source vector as we go.
Pipeline::SourceContainer& sources = pipeline->_sources;
- // Inject a MongodProcessInterface to sources that need them.
- injectMongodInterface(pipeline);
-
if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) {
return;
}
@@ -941,4 +574,338 @@ void PipelineD::getPlanSummaryStats(const Pipeline* pPipeline, PlanSummaryStats*
statsOut->hasSortStage = hasSortStage;
}
+PipelineD::MongoDProcessInterface::MongoDProcessInterface(OperationContext* opCtx)
+ : _client(opCtx) {}
+
+void PipelineD::MongoDProcessInterface::setOperationContext(OperationContext* opCtx) {
+ _client.setOpCtx(opCtx);
+}
+
+DBClientBase* PipelineD::MongoDProcessInterface::directClient() {
+ return &_client;
+}
+
+bool PipelineD::MongoDProcessInterface::isSharded(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ AutoGetCollectionForReadCommand autoColl(opCtx, nss);
+ // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
+ // state.
+ auto css = CollectionShardingState::get(opCtx, nss);
+ return bool(css->getMetadata());
+}
+
+BSONObj PipelineD::MongoDProcessInterface::insert(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) {
+ boost::optional<DisableDocumentValidation> maybeDisableValidation;
+ if (expCtx->bypassDocumentValidation)
+ maybeDisableValidation.emplace(expCtx->opCtx);
+
+ _client.insert(ns.ns(), objs);
+ return _client.getLastErrorDetailed();
+}
+
+CollectionIndexUsageMap PipelineD::MongoDProcessInterface::getIndexStats(
+ OperationContext* opCtx, const NamespaceString& ns) {
+ AutoGetCollectionForReadCommand autoColl(opCtx, ns);
+
+ Collection* collection = autoColl.getCollection();
+ if (!collection) {
+ LOG(2) << "Collection not found on index stats retrieval: " << ns.ns();
+ return CollectionIndexUsageMap();
+ }
+
+ return collection->infoCache()->getIndexUsageStats();
+}
+
+void PipelineD::MongoDProcessInterface::appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const {
+ Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder);
+}
+
+Status PipelineD::MongoDProcessInterface::appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const {
+ return appendCollectionStorageStats(opCtx, nss, param, builder);
+}
+
+Status PipelineD::MongoDProcessInterface::appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const {
+ return appendCollectionRecordCount(opCtx, nss, builder);
+}
+
+BSONObj PipelineD::MongoDProcessInterface::getCollectionOptions(const NamespaceString& nss) {
+ const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
+ return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
+}
+
+Status PipelineD::MongoDProcessInterface::renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) {
+ Lock::GlobalWrite globalLock(opCtx);
+
+ if (SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions !=
+ getCollectionOptions(targetNs))) {
+ return {ErrorCodes::CommandFailed,
+ str::stream() << "collection options of target collection " << targetNs.ns()
+ << " changed during processing. Original options: "
+ << originalCollectionOptions
+ << ", new options: "
+ << getCollectionOptions(targetNs)};
+ }
+
+ auto currentIndexes = _client.getIndexSpecs(targetNs.ns());
+ if (originalIndexes.size() != currentIndexes.size() ||
+ !std::equal(originalIndexes.begin(),
+ originalIndexes.end(),
+ currentIndexes.begin(),
+ SimpleBSONObjComparator::kInstance.makeEqualTo())) {
+ return {ErrorCodes::CommandFailed,
+ str::stream() << "indexes of target collection " << targetNs.ns()
+ << " changed during processing."};
+ }
+
+ BSONObj info;
+ bool ok = _client.runCommand("admin", renameCommandObj, info);
+ return ok ? Status::OK() : Status{ErrorCodes::CommandFailed,
+ str::stream() << "renameCollection failed: " << info};
+}
+
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>>
+PipelineD::MongoDProcessInterface::makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) {
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+
+ if (opts.optimize) {
+ pipeline.getValue()->optimizePipeline();
+ }
+
+ Status cursorStatus = Status::OK();
+
+ if (opts.attachCursorSource) {
+ cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
+ }
+
+ return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
+}
+
+Status PipelineD::MongoDProcessInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
+
+ boost::optional<AutoGetCollectionForReadCommand> autoColl;
+ if (expCtx->uuid) {
+ autoColl.emplace(expCtx->opCtx, expCtx->ns.db(), *expCtx->uuid);
+ if (autoColl->getCollection() == nullptr) {
+ // The UUID doesn't exist anymore.
+ return {ErrorCodes::NamespaceNotFound,
+ "No namespace with UUID " + expCtx->uuid->toString()};
+ }
+ } else {
+ autoColl.emplace(expCtx->opCtx, expCtx->ns);
+ }
+
+ // makePipeline() is only called to perform secondary aggregation requests and expects the
+ // collection representing the document source to be not-sharded. We confirm sharding state
+ // here to avoid taking a collection lock elsewhere for this purpose alone.
+ // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor
+ // until after we release the lock, leaving room for a collection to be sharded inbetween.
+ // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
+ // state.
+ auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns);
+ uassert(4567,
+ str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded",
+ !bool(css->getMetadata()));
+
+ PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
+
+ return Status::OK();
+}
+
+std::vector<BSONObj> PipelineD::MongoDProcessInterface::getCurrentOps(
+ OperationContext* opCtx,
+ CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode,
+ CurrentOpTruncateMode truncateMode) const {
+ AuthorizationSession* ctxAuth = AuthorizationSession::get(opCtx->getClient());
+
+ const std::string hostName = getHostNameCachedAndPort();
+
+ std::vector<BSONObj> ops;
+
+ for (ServiceContext::LockedClientsCursor cursor(opCtx->getClient()->getServiceContext());
+ Client* client = cursor.next();) {
+ invariant(client);
+
+ stdx::lock_guard<Client> lk(*client);
+
+ // If auth is disabled, ignore the allUsers parameter.
+ if (ctxAuth->getAuthorizationManager().isAuthEnabled() &&
+ userMode == CurrentOpUserMode::kExcludeOthers &&
+ !ctxAuth->isCoauthorizedWithClient(client)) {
+ continue;
+ }
+
+ const OperationContext* clientOpCtx = client->getOperationContext();
+
+ if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) {
+ continue;
+ }
+
+ BSONObjBuilder infoBuilder;
+
+ infoBuilder.append("host", hostName);
+
+ client->reportState(infoBuilder);
+ const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata();
+
+ if (clientMetadata) {
+ auto appName = clientMetadata.get().getApplicationName();
+ if (!appName.empty()) {
+ infoBuilder.append("appName", appName);
+ }
+
+ auto clientMetadataDocument = clientMetadata.get().getDocument();
+ infoBuilder.append("clientMetadata", clientMetadataDocument);
+ }
+
+ // Fill out the rest of the BSONObj with opCtx specific details.
+ infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx));
+ infoBuilder.append("currentOpTime",
+ opCtx->getServiceContext()->getPreciseClockSource()->now().toString());
+
+ if (clientOpCtx) {
+ infoBuilder.append("opid", clientOpCtx->getOpID());
+ if (clientOpCtx->isKillPending()) {
+ infoBuilder.append("killPending", true);
+ }
+
+ if (clientOpCtx->getLogicalSessionId()) {
+ BSONObjBuilder bob(infoBuilder.subobjStart("lsid"));
+ clientOpCtx->getLogicalSessionId()->serialize(&bob);
+ }
+
+ CurOp::get(clientOpCtx)
+ ->reportState(&infoBuilder, (truncateMode == CurrentOpTruncateMode::kTruncateOps));
+
+ Locker::LockerInfo lockerInfo;
+ clientOpCtx->lockState()->getLockerInfo(&lockerInfo);
+ fillLockerInfo(lockerInfo, infoBuilder);
+ }
+
+ ops.emplace_back(infoBuilder.obj());
+ }
+
+ return ops;
+}
+
+std::string PipelineD::MongoDProcessInterface::getShardName(OperationContext* opCtx) const {
+ if (ShardingState::get(opCtx)->enabled()) {
+ return ShardingState::get(opCtx)->getShardName();
+ }
+
+ return std::string();
+}
+
+std::vector<FieldPath> PipelineD::MongoDProcessInterface::collectDocumentKeyFields(
+ OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const {
+ if (!ShardingState::get(opCtx)->enabled()) {
+ return {"_id"}; // Nothing is sharded.
+ }
+
+ auto scm = [this, opCtx, &nss]() -> ScopedCollectionMetadata {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ return CollectionShardingState::get(opCtx, nss)->getMetadata();
+ }();
+
+ if (!scm) {
+ return {"_id"}; // Collection is not sharded.
+ }
+
+ uassert(ErrorCodes::StaleConfig,
+ str::stream() << "Collection " << nss.ns()
+ << " UUID differs from UUID on change stream operations",
+ scm->uuidMatches(uuid));
+
+ // Unpack the shard key.
+ std::vector<FieldPath> result;
+ bool gotId = false;
+ for (auto& field : scm->getKeyPatternFields()) {
+ result.emplace_back(field->dottedField());
+ gotId |= (result.back().fullPath() == "_id");
+ }
+ if (!gotId) { // If not part of the shard key, "_id" comes last.
+ result.emplace_back("_id");
+ }
+ return result;
+}
+
+std::vector<GenericCursor> PipelineD::MongoDProcessInterface::getCursors(
+ const intrusive_ptr<ExpressionContext>& expCtx) const {
+ return CursorManager::getAllCursors(expCtx->opCtx);
+}
+
+boost::optional<Document> PipelineD::MongoDProcessInterface::lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) {
+ invariant(!readConcern); // We don't currently support a read concern on mongod - it's only
+ // expected to be necessary on mongos.
+
+ // Be sure to do the lookup using the collection default collation.
+ auto foreignExpCtx = expCtx->copyWith(
+ nss, collectionUUID, _getCollectionDefaultCollator(expCtx->opCtx, nss, collectionUUID));
+ auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
+ if (swPipeline == ErrorCodes::NamespaceNotFound) {
+ return boost::none;
+ }
+ auto pipeline = uassertStatusOK(std::move(swPipeline));
+
+ auto lookedUpDocument = pipeline->getNext();
+ if (auto next = pipeline->getNext()) {
+ uasserted(ErrorCodes::TooManyMatchingDocuments,
+ str::stream() << "found more than one document with document key "
+ << documentKey.toString()
+ << " ["
+ << lookedUpDocument->toString()
+ << ", "
+ << next->toString()
+ << "]");
+ }
+ return lookedUpDocument;
+}
+
+std::unique_ptr<CollatorInterface> PipelineD::MongoDProcessInterface::_getCollectionDefaultCollator(
+ OperationContext* opCtx, const NamespaceString& nss, UUID collectionUUID) {
+ if (_collatorCache.find(collectionUUID) == _collatorCache.end()) {
+ AutoGetCollection autoColl(opCtx, nss, collectionUUID, MODE_IS);
+ if (!autoColl.getCollection()) {
+ // This collection doesn't exist - since we looked up by UUID, it will never exist in
+ // the future, so we cache a null pointer as the default collation.
+ _collatorCache[collectionUUID] = nullptr;
+ } else {
+ auto defaultCollator = autoColl.getCollection()->getDefaultCollator();
+ // Clone the collator so that we can safely use the pointer if the collection
+ // disappears right after we release the lock.
+ _collatorCache[collectionUUID] = defaultCollator ? defaultCollator->clone() : nullptr;
+ }
+ }
+ return _collatorCache[collectionUUID] ? _collatorCache[collectionUUID]->clone() : nullptr;
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 467aa9a8d1c..1551a9d035f 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -32,8 +32,10 @@
#include <memory>
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/mongo_process_interface.h"
#include "mongo/db/query/plan_executor.h"
namespace mongo {
@@ -59,6 +61,75 @@ struct DepsTracker;
*/
class PipelineD {
public:
+ class MongoDProcessInterface final : public MongoProcessInterface {
+ public:
+ MongoDProcessInterface(OperationContext* opCtx);
+
+ void setOperationContext(OperationContext* opCtx) final;
+ DBClientBase* directClient() final;
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
+ BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) final;
+ CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) final;
+ void appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const final;
+ Status appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const final;
+ Status appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const final;
+ BSONObj getCollectionOptions(const NamespaceString& nss) final;
+ Status renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) final;
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts = MakePipelineOptions{}) final;
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final;
+ std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
+ CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode,
+ CurrentOpTruncateMode truncateMode) const final;
+ std::string getShardName(OperationContext* opCtx) const final;
+ std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx,
+ const NamespaceString& nss,
+ UUID uuid) const final;
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) final;
+ std::vector<GenericCursor> getCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
+
+ private:
+ /**
+ * Looks up the collection default collator for the collection given by 'collectionUUID'. A
+ * collection's default collation is not allowed to change, so we cache the result to allow
+ * for quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr'
+ * if the collection does not exist or if the collection's default collation is the simple
+ * collation.
+ */
+ std::unique_ptr<CollatorInterface> _getCollectionDefaultCollator(OperationContext* opCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID);
+
+ DBDirectClient _client;
+ std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache;
+ };
+
/**
* If the first stage in the pipeline does not generate its own output documents, attaches a
* DocumentSourceCursor to the front of the pipeline which will output documents from the
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 30875726968..5ec188a2ef6 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1783,8 +1783,8 @@ public:
virtual ~Base() {}
protected:
- std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipe;
- std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipe;
+ std::unique_ptr<Pipeline, PipelineDeleter> mergePipe;
+ std::unique_ptr<Pipeline, PipelineDeleter> shardPipe;
private:
OperationContextNoop _opCtx;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 6b0032b1fd3..0ec37b15eb4 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -26,20 +26,22 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
+#pragma once
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/mongo_process_interface.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/util/assert_util.h"
namespace mongo {
/**
- * A stub MongoProcessInterface that can be used for testing. Create a subclass and override
- * methods as needed.
+ * A stub MongoProcessInterface that provides default implementations of all methods, which can then
+ * be individually overridden for testing. This class may also be used in scenarios where a
+ * placeholder MongoProcessInterface is required by an interface but will not be called. To
+ * guarantee the latter, method implementations in this class are marked MONGO_UNREACHABLE.
*/
-class StubMongoProcessInterface
- : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface {
+class StubMongoProcessInterface : public MongoProcessInterface {
public:
virtual ~StubMongoProcessInterface() = default;
@@ -51,11 +53,13 @@ public:
MONGO_UNREACHABLE;
}
- bool isSharded(const NamespaceString& ns) override {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override {
MONGO_UNREACHABLE;
}
- BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) override {
+ BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) override {
MONGO_UNREACHABLE;
}
@@ -64,19 +68,23 @@ public:
MONGO_UNREACHABLE;
}
- void appendLatencyStats(const NamespaceString& nss,
+ void appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
bool includeHistograms,
BSONObjBuilder* builder) const override {
MONGO_UNREACHABLE;
}
- Status appendStorageStats(const NamespaceString& nss,
+ Status appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
const BSONObj& param,
BSONObjBuilder* builder) const override {
MONGO_UNREACHABLE;
}
- Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const override {
+ Status appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const override {
MONGO_UNREACHABLE;
}
@@ -85,6 +93,7 @@ public:
}
Status renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
const BSONObj& renameCommandObj,
const NamespaceString& targetNs,
const BSONObj& originalCollectionOptions,
@@ -92,7 +101,7 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) override {
@@ -104,7 +113,8 @@ public:
MONGO_UNREACHABLE;
}
- std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
+ std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
+ CurrentOpConnectionsMode connMode,
CurrentOpUserMode userMode,
CurrentOpTruncateMode truncateMode) const override {
MONGO_UNREACHABLE;
@@ -114,14 +124,18 @@ public:
MONGO_UNREACHABLE;
}
- std::vector<FieldPath> collectDocumentKeyFields(UUID) const override {
+ std::vector<FieldPath> collectDocumentKeyFields(OperationContext*,
+ const NamespaceString&,
+ UUID) const override {
MONGO_UNREACHABLE;
}
- boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) {
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) {
MONGO_UNREACHABLE;
}