diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-28 15:47:34 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-10-03 17:09:06 -0400 |
commit | 53d700710e609c3624367ea7487c030c1fbf6840 (patch) | |
tree | 3829e9b80f8a452df524eddc44cdbd6dd8b3cc6e /src/mongo/db/pipeline | |
parent | f23002e965d29158505575d09432faf403efcb56 (diff) | |
download | mongo-53d700710e609c3624367ea7487c030c1fbf6840.tar.gz |
SERVER-29609 Rename MongodInterface to MongoProcessInterface.
Diffstat (limited to 'src/mongo/db/pipeline')
28 files changed, 232 insertions, 196 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 934d9d29da2..e811d24518e 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -606,14 +606,20 @@ protected: }; -/** This class marks DocumentSources which need mongod-specific functionality. - * It causes a MongodInterface to be injected when in a mongod and prevents mongos from - * merging pipelines containing this stage. +/** + * This class marks DocumentSources which need functionality specific to a mongos or a mongod. It + * causes a MongodProcessInterface to be injected when in a mongod and a MongosProcessInterface when + * in a mongos. */ -class DocumentSourceNeedsMongod : public DocumentSource { +class DocumentSourceNeedsMongoProcessInterface : public DocumentSource { public: - // Wraps mongod-specific functions to allow linking into mongos. - class MongodInterface { + /** + * Any functionality needed by an aggregation stage that is either context specific to a mongod + * or mongos process, or is only compiled in to one of those two binaries must be accessed via + * this interface. This allows all DocumentSources to be parsed on either mongos or mongod, but + * only executable where it makes sense. + */ + class MongoProcessInterface { public: enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle }; enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; @@ -625,14 +631,15 @@ public: bool optimize = true; bool attachCursorSource = true; - // Ordinarily, a MongodInterface is injected into the pipeline at the point when the - // cursor source is added. If true, 'forceInjectMongod' will inject MongodInterfaces - // into the pipeline even if 'attachCursorSource' is false. If 'attachCursorSource' is - // true, then the value of 'forceInjectMongod' is irrelevant. - bool forceInjectMongod = false; + // Ordinarily, a MongoProcessInterface is injected into the pipeline at the point + // when the cursor source is added. If true, 'forceInjectMongoProcessInterface' will + // inject MongoProcessInterfaces into the pipeline even if 'attachCursorSource' is + // false. If 'attachCursorSource' is true, then the value of + // 'forceInjectMongoProcessInterface' is irrelevant. + bool forceInjectMongoProcessInterface = false; }; - virtual ~MongodInterface(){}; + virtual ~MongoProcessInterface(){}; /** * Sets the OperationContext of the DBDirectClient returned by directClient(). This method @@ -701,10 +708,10 @@ public: * the returned pipeline will depend upon the supplied MakePipelineOptions: * - The boolean opts.optimize determines whether the pipeline will be optimized. * - If opts.attachCursorSource is false, the pipeline will be returned without attempting - * to add an initial cursor source. - * - If opts.forceInjectMongod is true, then a MongodInterface will be provided to each - * stage which requires one, regardless of whether a cursor source is attached to the - * pipeline. + * to add an initial cursor source. + * - If opts.forceInjectMongoProcessInterface is true, then a MongoProcessInterface will be + * provided to each stage which requires one, regardless of whether a cursor source is + * attached to the pipeline. * * This function returns a non-OK status if parsing the pipeline failed. */ @@ -740,22 +747,23 @@ public: // Add new methods as needed. }; - DocumentSourceNeedsMongod(const boost::intrusive_ptr<ExpressionContext>& expCtx) + DocumentSourceNeedsMongoProcessInterface(const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(expCtx) {} - void injectMongodInterface(std::shared_ptr<MongodInterface> mongod) { - _mongod = mongod; - doInjectMongodInterface(mongod); + void injectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface) { + _mongoProcessInterface = mongoProcessInterface; + doInjectMongoProcessInterface(mongoProcessInterface); } /** * Derived classes may override this method to register custom inject functionality. */ - virtual void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) {} + virtual void doInjectMongoProcessInterface( + std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {} void detachFromOperationContext() override { - invariant(_mongod); - _mongod->setOperationContext(nullptr); + invariant(_mongoProcessInterface); + _mongoProcessInterface->setOperationContext(nullptr); doDetachFromOperationContext(); } @@ -765,8 +773,8 @@ public: virtual void doDetachFromOperationContext() {} void reattachToOperationContext(OperationContext* opCtx) final { - invariant(_mongod); - _mongod->setOperationContext(opCtx); + invariant(_mongoProcessInterface); + _mongoProcessInterface->setOperationContext(opCtx); doReattachToOperationContext(opCtx); } @@ -776,11 +784,11 @@ public: virtual void doReattachToOperationContext(OperationContext* opCtx) {} protected: - // It is invalid to delete through a DocumentSourceNeedsMongod-typed pointer. - virtual ~DocumentSourceNeedsMongod() {} + // It is invalid to delete through a DocumentSourceNeedsMongoProcessInterface-typed pointer. + virtual ~DocumentSourceNeedsMongoProcessInterface() {} - // Gives subclasses access to a MongodInterface implementation - std::shared_ptr<MongodInterface> _mongod; + // Gives subclasses access to a MongoProcessInterface implementation + std::shared_ptr<MongoProcessInterface> _mongoProcessInterface; }; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 3a8de3d06fc..435f464af8f 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -95,7 +95,7 @@ intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResu DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability( const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceShardCheckResumabilitySpec spec) - : DocumentSourceNeedsMongod(expCtx), + : DocumentSourceNeedsMongoProcessInterface(expCtx), _token(spec.getResumeToken()), _verifiedResumability(false) {} @@ -122,7 +122,8 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { // with the resume token. auto firstEntryExpCtx = pExpCtx->copyWith(pExpCtx->ns); auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = uassertStatusOK(_mongod->makePipeline({matchSpec}, firstEntryExpCtx)); + auto pipeline = + uassertStatusOK(_mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); if (auto first = pipeline->getNext()) { auto firstOplogEntry = Value(*first); uassert(40576, diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index cfbef21088d..09a45d8277b 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -55,7 +55,7 @@ typedef DocumentSourceEnsureResumeTokenPresentSpec DocumentSourceShardCheckResum * This source need only run on a sharded collection. For unsharded collections, * DocumentSourceEnsureResumeTokenPresent is sufficient. */ -class DocumentSourceShardCheckResumability final : public DocumentSourceNeedsMongod { +class DocumentSourceShardCheckResumability final : public DocumentSourceNeedsMongoProcessInterface { public: GetNextResult getNext() final; const char* getSourceName() const final; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 72e236ba870..2ea54327516 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -40,7 +40,7 @@ #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/resume_token.h" -#include "mongo/db/pipeline/stub_mongod_interface.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" @@ -223,11 +223,11 @@ TEST_F(CheckResumeTokenTest, ShouldFailWithNoDocuments) { } /** - * A mock MongodInterface which allows mocking a foreign pipeline. + * A mock MongoProcessInterface which allows mocking a foreign pipeline. */ -class MockMongodInterface final : public StubMongodInterface { +class MockMongoProcessInterface final : public StubMongoProcessInterface { public: - MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults) + MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} bool isSharded(const NamespaceString& ns) final { @@ -271,7 +271,8 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); addDocument(resumeTimestamp, "ID"); // We should see the resume token. auto result = shardCheckResumability->getNext(); @@ -287,7 +288,8 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); addDocument(resumeTimestamp, "ID"); // We should see the resume token. auto result = shardCheckResumability->getNext(); @@ -301,7 +303,8 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIs auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog; - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); addDocument(resumeTimestamp, "ID"); // We should see the resume token. auto result = shardCheckResumability->getNext(); @@ -317,7 +320,8 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } @@ -329,7 +333,8 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576); } @@ -338,7 +343,8 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplo auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); deque<DocumentSource::GetNextResult> mockOplog; - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } @@ -352,7 +358,8 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); addDocument(docTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); @@ -368,7 +375,8 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); addDocument(docTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576); } @@ -381,14 +389,16 @@ TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); addDocument(docTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); auto result1 = shardCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); ASSERT_VALUE_EQ(Value(docTimestamp), doc1["_id"]["clusterTime"]["ts"]); mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); auto result2 = shardCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } @@ -403,7 +413,8 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAf addDocument(docTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog( {{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); auto result1 = shardCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); @@ -419,12 +430,14 @@ TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); auto result1 = shardCheckResumability->getNext(); ASSERT_TRUE(result1.isEOF()); mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - shardCheckResumability->injectMongodInterface(std::make_shared<MockMongodInterface>(mockOplog)); + shardCheckResumability->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockOplog)); auto result2 = shardCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 94c36847694..3a6e5fc9591 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -106,7 +106,7 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { builder.append("ns", pExpCtx->ns.ns()); - auto shardName = _mongod->getShardName(pExpCtx->opCtx); + auto shardName = _mongoProcessInterface->getShardName(pExpCtx->opCtx); if (!shardName.empty()) { builder.append("shard", shardName); @@ -121,13 +121,13 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { if (_collStatsSpec["latencyStats"].type() == BSONType::Object) { includeHistograms = _collStatsSpec["latencyStats"]["histograms"].boolean(); } - _mongod->appendLatencyStats(pExpCtx->ns, includeHistograms, &builder); + _mongoProcessInterface->appendLatencyStats(pExpCtx->ns, includeHistograms, &builder); } if (_collStatsSpec.hasField("storageStats")) { // If the storageStats field exists, it must have been validated as an object when parsing. BSONObjBuilder storageBuilder(builder.subobjStart("storageStats")); - Status status = _mongod->appendStorageStats( + Status status = _mongoProcessInterface->appendStorageStats( pExpCtx->ns, _collStatsSpec["storageStats"].Obj(), &storageBuilder); storageBuilder.doneFast(); if (!status.isOK()) { @@ -138,7 +138,7 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { } if (_collStatsSpec.hasField("count")) { - Status status = _mongod->appendRecordCount(pExpCtx->ns, &builder); + Status status = _mongoProcessInterface->appendRecordCount(pExpCtx->ns, &builder); if (!status.isOK()) { uasserted(40481, str::stream() << "Unable to retrieve count in $collStats stage: " diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 0c319729328..282af14cc00 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -36,7 +36,7 @@ namespace mongo { * Provides a document source interface to retrieve collection-level statistics for a given * collection. */ -class DocumentSourceCollStats : public DocumentSourceNeedsMongod { +class DocumentSourceCollStats : public DocumentSourceNeedsMongoProcessInterface { public: class LiteParsed final : public LiteParsedDocumentSource { public: @@ -68,7 +68,7 @@ public: }; DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongod(pExpCtx) {} + : DocumentSourceNeedsMongoProcessInterface(pExpCtx) {} GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index ce77e3da6ae..029f754eeae 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -90,13 +90,13 @@ DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() { pExpCtx->checkForInterrupt(); if (_ops.empty()) { - _ops = - _mongod->getCurrentOps(_includeIdleConnections, _includeOpsFromAllUsers, _truncateOps); + _ops = _mongoProcessInterface->getCurrentOps( + _includeIdleConnections, _includeOpsFromAllUsers, _truncateOps); _opsIter = _ops.begin(); if (pExpCtx->fromMongos) { - _shardName = _mongod->getShardName(pExpCtx->opCtx); + _shardName = _mongoProcessInterface->getShardName(pExpCtx->opCtx); uassert(40465, "Aggregation request specified 'fromMongos' but unable to retrieve shard name " diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 6fa869712b1..78a54e6a0b5 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -32,7 +32,7 @@ namespace mongo { -class DocumentSourceCurrentOp final : public DocumentSourceNeedsMongod { +class DocumentSourceCurrentOp final : public DocumentSourceNeedsMongoProcessInterface { public: class LiteParsed final : public LiteParsedDocumentSource { public: @@ -64,9 +64,9 @@ public: const bool _allUsers; }; - using TruncationMode = MongodInterface::CurrentOpTruncateMode; - using ConnMode = MongodInterface::CurrentOpConnectionsMode; - using UserMode = MongodInterface::CurrentOpUserMode; + using TruncationMode = MongoProcessInterface::CurrentOpTruncateMode; + using ConnMode = MongoProcessInterface::CurrentOpConnectionsMode; + using UserMode = MongoProcessInterface::CurrentOpUserMode; static boost::intrusive_ptr<DocumentSourceCurrentOp> create( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, @@ -100,7 +100,7 @@ private: ConnMode includeIdleConnections = ConnMode::kExcludeIdle, UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers, TruncationMode truncateOps = TruncationMode::kNoTruncation) - : DocumentSourceNeedsMongod(pExpCtx), + : DocumentSourceNeedsMongoProcessInterface(pExpCtx), _includeIdleConnections(includeIdleConnections), _includeOpsFromAllUsers(includeOpsFromAllUsers), _truncateOps(truncateOps) {} diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp index a52453f8aa1..82853b2f007 100644 --- a/src/mongo/db/pipeline/document_source_current_op_test.cpp +++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp @@ -32,7 +32,7 @@ #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source_current_op.h" #include "mongo/db/pipeline/document_value_test_util.h" -#include "mongo/db/pipeline/stub_mongod_interface.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -55,14 +55,15 @@ public: }; /** - * A MongodInterface used for testing which returns artificial currentOp entries. + * A MongoProcessInterface used for testing which returns artificial currentOp entries. */ -class MockMongodImplementation final : public StubMongodInterface { +class MockMongoProcessInterfaceImplementation final : public StubMongoProcessInterface { public: - MockMongodImplementation(std::vector<BSONObj> ops, bool hasShardName = true) + MockMongoProcessInterfaceImplementation(std::vector<BSONObj> ops, bool hasShardName = true) : _ops(std::move(ops)), _hasShardName(hasShardName) {} - MockMongodImplementation(bool hasShardName = true) : _hasShardName(hasShardName) {} + MockMongoProcessInterfaceImplementation(bool hasShardName = true) + : _hasShardName(hasShardName) {} std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, CurrentOpUserMode userMode, @@ -183,8 +184,8 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDef TEST_F(DocumentSourceCurrentOpTest, ShouldReturnEOFImmediatelyIfNoCurrentOps) { const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - const auto mongod = std::make_shared<MockMongodImplementation>(); - currentOp->injectMongodInterface(mongod); + const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(); + currentOp->injectMongoProcessInterface(mongod); ASSERT(currentOp->getNext().isEOF()); } @@ -194,10 +195,10 @@ TEST_F(DocumentSourceCurrentOpTest, getExpCtx()->fromMongos = true; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")}; - const auto mongod = std::make_shared<MockMongodImplementation>(ops); + const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops); const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - currentOp->injectMongodInterface(mongod); + currentOp->injectMongoProcessInterface(mongod); const auto expectedOutput = Document{{"shard", kMockShardName}, @@ -212,10 +213,10 @@ TEST_F(DocumentSourceCurrentOpTest, getExpCtx()->fromMongos = false; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")}; - const auto mongod = std::make_shared<MockMongodImplementation>(ops); + const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops); const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - currentOp->injectMongodInterface(mongod); + currentOp->injectMongoProcessInterface(mongod); const auto expectedOutput = Document{{"client", std::string("192.168.1.10:50844")}, {"opid", 430}}; @@ -226,10 +227,10 @@ TEST_F(DocumentSourceCurrentOpTest, TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRequest) { getExpCtx()->fromMongos = true; - const auto mongod = std::make_shared<MockMongodImplementation>(false); + const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(false); const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - currentOp->injectMongodInterface(mongod); + currentOp->injectMongoProcessInterface(mongod); ASSERT_THROWS_CODE(currentOp->getNext(), AssertionException, 40465); } @@ -238,10 +239,10 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfOpIDIsNonNumericWhenModifyingInS getExpCtx()->fromMongos = true; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 'string' }")}; - const auto mongod = std::make_shared<MockMongodImplementation>(ops); + const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops); const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - currentOp->injectMongodInterface(mongod); + currentOp->injectMongoProcessInterface(mongod); ASSERT_THROWS_CODE(currentOp->getNext(), AssertionException, ErrorCodes::TypeMismatch); } diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 6c58124387b..ea98a632787 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -57,7 +57,7 @@ using std::vector; DocumentSourceFacet::DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines, const intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceNeedsMongod(expCtx), + : DocumentSourceNeedsMongoProcessInterface(expCtx), _teeBuffer(TeeBuffer::create(facetPipelines.size())), _facets(std::move(facetPipelines)) { for (size_t facetId = 0; facetId < _facets.size(); ++facetId) { @@ -215,11 +215,13 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::optimize() { return this; } -void DocumentSourceFacet::doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) { +void DocumentSourceFacet::doInjectMongoProcessInterface( + std::shared_ptr<MongoProcessInterface> pipelineContext) { for (auto&& facet : _facets) { for (auto&& stage : facet.pipeline->getSources()) { - if (auto stageNeedingMongod = dynamic_cast<DocumentSourceNeedsMongod*>(stage.get())) { - stageNeedingMongod->injectMongodInterface(mongod); + if (auto stageNeedingMongoProcessInterface = + dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(stage.get())) { + stageNeedingMongoProcessInterface->injectMongoProcessInterface(pipelineContext); } } } diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 9ca2ea149ec..caf70c349ed 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -55,7 +55,7 @@ class NamespaceString; * stage which will produce a document like the following: * {facetA: [<all input documents except the first one>], facetB: [<the first document>]}. */ -class DocumentSourceFacet final : public DocumentSourceNeedsMongod, +class DocumentSourceFacet final : public DocumentSourceNeedsMongoProcessInterface, public SplittableDocumentSource { public: struct FacetPipeline { @@ -132,7 +132,7 @@ public: // The following are overridden just to forward calls to sub-pipelines. void addInvolvedCollections(std::vector<NamespaceString>* collections) const final; - void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final; + void doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface>) final; void doDetachFromOperationContext() final; void doReattachToOperationContext(OperationContext* opCtx) final; StageConstraints constraints() const final; diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index f7b7a5ca741..97831f11589 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -174,7 +174,7 @@ BSONObj DocumentSourceGeoNear::buildGeoNearCmd() const { void DocumentSourceGeoNear::runCommand() { massert(16603, "Already ran geoNearCommand", !resultsIterator); - bool ok = _mongod->directClient()->runCommand( + bool ok = _mongoProcessInterface->directClient()->runCommand( pExpCtx->ns.db().toString(), buildGeoNearCmd(), cmdOutput); uassert(16604, "geoNear command failed: " + cmdOutput.toString(), ok); @@ -249,7 +249,7 @@ void DocumentSourceGeoNear::parseOptions(BSONObj options) { } DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongod(pExpCtx), + : DocumentSourceNeedsMongoProcessInterface(pExpCtx), coordsIsArray(false), limit(DocumentSourceGeoNear::kDefaultLimit), maxDistance(-1.0), diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 23e7c4e901c..427812716e4 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -33,7 +33,8 @@ namespace mongo { -class DocumentSourceGeoNear : public DocumentSourceNeedsMongod, public SplittableDocumentSource { +class DocumentSourceGeoNear : public DocumentSourceNeedsMongoProcessInterface, + public SplittableDocumentSource { public: static const long long kDefaultLimit; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 19f49a2b2a7..a600097578c 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -205,7 +205,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { // We've already allocated space for the trailing $match stage in '_fromPipeline'. _fromPipeline.back() = *matchStage; - auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx)); + auto pipeline = + uassertStatusOK(_mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx)); while (auto next = pipeline->getNext()) { uassert(40271, str::stream() @@ -454,7 +455,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( boost::optional<FieldPath> depthField, boost::optional<long long> maxDepth, boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc) - : DocumentSourceNeedsMongod(expCtx), + : DocumentSourceNeedsMongoProcessInterface(expCtx), _from(std::move(from)), _as(std::move(as)), _connectFromField(std::move(connectFromField)), diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index afd633d46bc..9a3bf92fe22 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -36,7 +36,7 @@ namespace mongo { -class DocumentSourceGraphLookUp final : public DocumentSourceNeedsMongod { +class DocumentSourceGraphLookUp final : public DocumentSourceNeedsMongoProcessInterface { public: static std::unique_ptr<LiteParsedDocumentSourceForeignCollections> liteParse( const AggregationRequest& request, const BSONElement& spec); diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 8923aa1ab46..c0037a3caaf 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -36,7 +36,7 @@ #include "mongo/db/pipeline/document_source_graph_lookup.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" -#include "mongo/db/pipeline/stub_mongod_interface.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -53,12 +53,12 @@ using DocumentSourceGraphLookUpTest = AggregationContextFixture; // /** - * A MongodInterface use for testing that supports making pipelines with an initial + * A MongoProcessInterface use for testing that supports making pipelines with an initial * DocumentSourceMock source. */ -class MockMongodImplementation final : public StubMongodInterface { +class MockMongoProcessInterfaceImplementation final : public StubMongoProcessInterface { public: - MockMongodImplementation(std::deque<DocumentSource::GetNextResult> results) + MockMongoProcessInterfaceImplementation(std::deque<DocumentSource::GetNextResult> results) : _results(std::move(results)) {} StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( @@ -114,8 +114,8 @@ TEST_F(DocumentSourceGraphLookUpTest, boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271); } @@ -144,8 +144,8 @@ TEST_F(DocumentSourceGraphLookUpTest, boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271); } @@ -173,8 +173,8 @@ TEST_F(DocumentSourceGraphLookUpTest, boost::none, boost::none, unwindStage); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); graphLookupStage->setSource(inputMock.get()); ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271); @@ -217,8 +217,8 @@ TEST_F(DocumentSourceGraphLookUpTest, boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); graphLookupStage->setSource(inputMock.get()); auto next = graphLookupStage->getNext(); @@ -284,8 +284,8 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePauses) { graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); auto next = graphLookupStage->getNext(); ASSERT_TRUE(next.isAdvanced()); @@ -358,8 +358,8 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePausesWhileUnwinding) { graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); // Assert it has the expected results. Note the results can be in either order. auto expectedA = @@ -472,8 +472,8 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupWithComparisonExpressionForStar boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); auto next = graphLookupStage->getNext(); ASSERT_TRUE(next.isAdvanced()); @@ -531,8 +531,8 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldExpandArraysAtEndOfConnectFromField) boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); graphLookupStage->setSource(inputMock.get()); auto next = graphLookupStage->getNext(); @@ -604,8 +604,8 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldNotExpandArraysWithinArraysAtEndOfCo boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongodInterface( - std::make_shared<MockMongodImplementation>(std::move(fromContents))); + graphLookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); graphLookupStage->setSource(inputMock.get()); auto next = graphLookupStage->getNext(); diff --git a/src/mongo/db/pipeline/document_source_index_stats.cpp b/src/mongo/db/pipeline/document_source_index_stats.cpp index 786f8f59cf0..5e12f7c72e8 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.cpp +++ b/src/mongo/db/pipeline/document_source_index_stats.cpp @@ -50,7 +50,7 @@ DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() { pExpCtx->checkForInterrupt(); if (_indexStatsMap.empty()) { - _indexStatsMap = _mongod->getIndexStats(pExpCtx->opCtx, pExpCtx->ns); + _indexStatsMap = _mongoProcessInterface->getIndexStats(pExpCtx->opCtx, pExpCtx->ns); _indexStatsIter = _indexStatsMap.begin(); } @@ -70,7 +70,7 @@ DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() { } DocumentSourceIndexStats::DocumentSourceIndexStats(const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongod(pExpCtx), _processName(getHostNameCachedAndPort()) {} + : DocumentSourceNeedsMongoProcessInterface(pExpCtx), _processName(getHostNameCachedAndPort()) {} intrusive_ptr<DocumentSource> DocumentSourceIndexStats::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index c5d4d7c2762..4c8a8bf5ca0 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -37,7 +37,7 @@ namespace mongo { * Provides a document source interface to retrieve index statistics for a given namespace. * Each document returned represents a single index and mongod instance. */ -class DocumentSourceIndexStats final : public DocumentSourceNeedsMongod { +class DocumentSourceIndexStats final : public DocumentSourceNeedsMongoProcessInterface { public: class LiteParsed final : public LiteParsedDocumentSource { public: diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 3a8de609c9f..7e77efe2d80 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -68,7 +68,7 @@ std::string pipelineToString(const vector<BSONObj>& pipeline) { DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongod(pExpCtx), + : DocumentSourceNeedsMongoProcessInterface(pExpCtx), _fromNs(std::move(fromNs)), _as(std::move(as)), _variables(pExpCtx->variables), @@ -251,21 +251,22 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline // If we don't have a cache, build and return the pipeline immediately. if (!_cache || _cache->isAbandoned()) { - return uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx)); + return uassertStatusOK( + _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx)); } // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a // cursor source. If the cache is present and serving, then we will not be adding a cursor - // source later, so inject a mongod interface into all stages that need one. - MongodInterface::MakePipelineOptions pipelineOpts; + // source later, so inject a MongoProcessInterface into all stages that need one. + MongoProcessInterface::MakePipelineOptions pipelineOpts; pipelineOpts.optimize = false; pipelineOpts.attachCursorSource = false; - pipelineOpts.forceInjectMongod = _cache->isServing(); + pipelineOpts.forceInjectMongoProcessInterface = _cache->isServing(); // Construct the basic pipeline without a cache stage. - auto pipeline = - uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); + auto pipeline = uassertStatusOK( + _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); // Add the cache stage at the end and optimize. During the optimization process, the cache will // either move itself to the correct position in the pipeline, or will abandon itself if no @@ -277,7 +278,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. - uassertStatusOK(_mongod->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get())); + uassertStatusOK( + _mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get())); } // If the cache has been abandoned, release it. diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 7cf136b0cf9..ba406d57abd 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -45,7 +45,7 @@ namespace mongo { * Queries separate collection for equality matches with documents in the pipeline collection. * Adds matching documents to a new array field in the input document. */ -class DocumentSourceLookUp final : public DocumentSourceNeedsMongod, +class DocumentSourceLookUp final : public DocumentSourceNeedsMongoProcessInterface, public SplittableDocumentSource { public: class LiteParsed final : public LiteParsedDocumentSource { diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index 7c6295ea021..c14d4fcbc4b 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -103,7 +103,7 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat // TODO SERVER-29134 we need to extract the namespace from the document and set them on the new // ExpressionContext if we're getting notifications from an entire database. auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getUuid()); - auto pipelineStatus = _mongod->makePipeline({matchSpec}, foreignExpCtx); + auto pipelineStatus = _mongoProcessInterface->makePipeline({matchSpec}, foreignExpCtx); if (pipelineStatus.getStatus() == ErrorCodes::NamespaceNotFound) { // We couldn't find the collection with UUID, it may have been dropped. return Value(BSONNULL); diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 02cfa7435d4..da60c01e096 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -40,7 +40,7 @@ namespace mongo { * Uses the ExpressionContext to determine what collection to look up into. * TODO SERVER-29134 When we allow change streams on multiple collections, this will need to change. */ -class DocumentSourceLookupChangePostImage final : public DocumentSourceNeedsMongod { +class DocumentSourceLookupChangePostImage final : public DocumentSourceNeedsMongoProcessInterface { public: static constexpr StringData kStageName = "$_internalLookupChangePostImage"_sd; static constexpr StringData kFullDocumentFieldName = @@ -102,7 +102,7 @@ public: private: DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceNeedsMongod(expCtx) {} + : DocumentSourceNeedsMongoProcessInterface(expCtx) {} /** * Uses the "documentKey" field from 'updateOp' to look up the current version of the document. diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index d60e8b1a6e6..0c31852a993 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -42,7 +42,7 @@ #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/field_path.h" -#include "mongo/db/pipeline/stub_mongod_interface.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/pipeline/value.h" namespace mongo { @@ -75,11 +75,11 @@ public: }; /** - * A mock MongodInterface which allows mocking a foreign pipeline. + * A mock MongoProcessInterface which allows mocking a foreign pipeline. */ -class MockMongodInterface final : public StubMongodInterface { +class MockMongoProcessInterface final : public StubMongoProcessInterface { public: - MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults) + MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} bool isSharded(const NamespaceString& ns) final { @@ -132,8 +132,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyO lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongodInterface( - std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + lookupChangeStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } @@ -154,8 +154,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationTyp lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongodInterface( - std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + lookupChangeStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } @@ -176,8 +176,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) { lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongodInterface( - std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + lookupChangeStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } @@ -198,8 +198,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongodInterface( - std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + lookupChangeStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } @@ -220,8 +220,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatch lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongodInterface( - std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + lookupChangeStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579); } @@ -244,8 +244,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni // Mock out the foreign collection to have two documents with the same document key. deque<DocumentSource::GetNextResult> foreignCollection = {Document{{"_id", 0}}, Document{{"_id", 0}}}; - lookupChangeStage->injectMongodInterface( - std::make_shared<MockMongodInterface>(std::move(foreignCollection))); + lookupChangeStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(std::move(foreignCollection))); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40580); } @@ -275,8 +275,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, Document{{"_id", 1}}}; - lookupChangeStage->injectMongodInterface( - std::make_shared<MockMongodInterface>(std::move(mockForeignContents))); + lookupChangeStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents))); auto next = lookupChangeStage->getNext(); ASSERT_TRUE(next.isAdvanced()); diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 66931114dbe..25789995667 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -41,7 +41,7 @@ #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/field_path.h" -#include "mongo/db/pipeline/stub_mongod_interface.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/query_knobs.h" @@ -409,15 +409,15 @@ TEST(MakeMatchStageFromInput, ArrayValueWithRegexUsesOrQuery) { // /** - * A mock MongodInterface which allows mocking a foreign pipeline. If 'removeLeadingQueryStages' is - * true then any $match, $sort or $project fields at the start of the pipeline will be removed, - * simulating the pipeline changes which occur when PipelineD::prepareCursorSource absorbs stages - * into the PlanExecutor. + * A mock MongoProcessInterface which allows mocking a foreign pipeline. If + * 'removeLeadingQueryStages' is true then any $match, $sort or $project fields at the start of the + * pipeline will be removed, simulating the pipeline changes which occur when + * PipelineD::prepareCursorSource absorbs stages into the PlanExecutor. */ -class MockMongodInterface final : public StubMongodInterface { +class MockMongoProcessInterface final : public StubMongoProcessInterface { public: - MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults, - bool removeLeadingQueryStages = false) + MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults, + bool removeLeadingQueryStages = false) : _mockResults(std::move(mockResults)), _removeLeadingQueryStages(removeLeadingQueryStages) {} @@ -492,8 +492,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) { // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, Document{{"_id", 1}}}; - lookup->injectMongodInterface( - std::make_shared<MockMongodInterface>(std::move(mockForeignContents))); + lookup->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents))); auto next = lookup->getNext(); ASSERT_TRUE(next.isAdvanced()); @@ -547,8 +547,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) { // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, Document{{"_id", 1}}}; - lookup->injectMongodInterface( - std::make_shared<MockMongodInterface>(std::move(mockForeignContents))); + lookup->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents))); auto next = lookup->getNext(); ASSERT_TRUE(next.isAdvanced()); @@ -638,8 +638,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) { auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongodInterface( - std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + lookupStage->injectMongoProcessInterface( + std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -672,8 +672,8 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongodInterface( - std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + lookupStage->injectMongoProcessInterface( + std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -706,8 +706,8 @@ TEST_F(DocumentSourceLookUpTest, ExprEmbeddedInMatchExpressionShouldBeOptimized) auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongodInterface( - std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + lookupStage->injectMongoProcessInterface( + std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -747,8 +747,8 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongodInterface( - std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + lookupStage->injectMongoProcessInterface( + std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -782,8 +782,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup) auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongodInterface( - std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + lookupStage->injectMongoProcessInterface( + std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -818,8 +818,8 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongodInterface( - std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + lookupStage->injectMongoProcessInterface( + std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -849,8 +849,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheEntirePipelineIfNonCorrelated) { auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongodInterface( - std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + lookupStage->injectMongoProcessInterface( + std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -891,7 +891,8 @@ TEST_F(DocumentSourceLookUpTest, deque<DocumentSource::GetNextResult> mockForeignContents{ Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}}; - lookupStage->injectMongodInterface(std::make_shared<MockMongodInterface>(mockForeignContents)); + lookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockForeignContents)); // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields. auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); @@ -967,7 +968,8 @@ TEST_F(DocumentSourceLookUpTest, deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}}, Document{{"x", 1}}}; - lookupStage->injectMongodInterface(std::make_shared<MockMongodInterface>(mockForeignContents)); + lookupStage->injectMongoProcessInterface( + std::make_shared<MockMongoProcessInterface>(mockForeignContents)); // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields. auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); @@ -1023,8 +1025,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl const bool removeLeadingQueryStages = true; - lookupStage->injectMongodInterface(std::shared_ptr<MockMongodInterface>( - new MockMongodInterface({}, removeLeadingQueryStages))); + lookupStage->injectMongoProcessInterface(std::shared_ptr<MockMongoProcessInterface>( + new MockMongoProcessInterface({}, removeLeadingQueryStages))); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); ASSERT(subPipeline); diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 3ac28ba4f62..08dde0b4dc0 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -44,7 +44,8 @@ DocumentSourceOut::~DocumentSourceOut() { // Make sure we drop the temp collection if anything goes wrong. Errors are ignored // here because nothing can be done about them. Additionally, if this fails and the // collection is left behind, it will be cleaned up next time the server is started. - if (_mongod && _tempNs.size()) _mongod->directClient()->dropCollection(_tempNs.ns());) + if (_mongoProcessInterface && _tempNs.size()) _mongoProcessInterface->directClient() + ->dropCollection(_tempNs.ns());) } std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceOut::liteParse( @@ -79,12 +80,12 @@ const char* DocumentSourceOut::getSourceName() const { static AtomicUInt32 aggOutCounter; void DocumentSourceOut::initialize() { - invariant(_mongod); - DBClientBase* conn = _mongod->directClient(); + invariant(_mongoProcessInterface); + DBClientBase* conn = _mongoProcessInterface->directClient(); // Save the original collection options and index specs so we can check they didn't change // during computation. - _originalOutOptions = _mongod->getCollectionOptions(_outputNs); + _originalOutOptions = _mongoProcessInterface->getCollectionOptions(_outputNs); _originalIndexes = conn->getIndexSpecs(_outputNs.ns()); // Check if it's sharded or capped to make sure we have a chance of succeeding before we do all @@ -94,7 +95,7 @@ void DocumentSourceOut::initialize() { uassert(17017, str::stream() << "namespace '" << _outputNs.ns() << "' is sharded so it can't be used for $out'", - !_mongod->isSharded(_outputNs)); + !_mongoProcessInterface->isSharded(_outputNs)); uassert(17152, str::stream() << "namespace '" << _outputNs.ns() << "' is capped so it can't be used for $out", @@ -144,7 +145,7 @@ void DocumentSourceOut::initialize() { } void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) { - BSONObj err = _mongod->insert(_tempNs, toInsert); + BSONObj err = _mongoProcessInterface->insert(_tempNs, toInsert); uassert(16996, str::stream() << "insert for $out failed: " << err, DBClientBase::getLastErrorString(err).empty()); @@ -194,7 +195,7 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { BSON("renameCollection" << _tempNs.ns() << "to" << _outputNs.ns() << "dropTarget" << true); - auto status = _mongod->renameIfOptionsAndIndexesHaveNotChanged( + auto status = _mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged( renameCommandObj, _outputNs, _originalOutOptions, _originalIndexes); uassert(16997, str::stream() << "$out failed: " << status.reason(), status.isOK()); @@ -211,7 +212,7 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs, const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongod(pExpCtx), + : DocumentSourceNeedsMongoProcessInterface(pExpCtx), _done(false), _tempNs(""), // Filled in during getNext(). _outputNs(outputNs) {} diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index ec8c188c0b7..5dac8b13ffc 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -32,7 +32,8 @@ namespace mongo { -class DocumentSourceOut final : public DocumentSourceNeedsMongod, public SplittableDocumentSource { +class DocumentSourceOut final : public DocumentSourceNeedsMongoProcessInterface, + public SplittableDocumentSource { public: static std::unique_ptr<LiteParsedDocumentSourceForeignCollections> liteParse( const AggregationRequest& request, const BSONElement& spec); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index c6642606c09..5bb3381671b 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -88,9 +88,10 @@ using std::string; using std::unique_ptr; namespace { -class MongodImplementation final : public DocumentSourceNeedsMongod::MongodInterface { +class MongodProcessInterface final + : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { public: - MongodImplementation(const intrusive_ptr<ExpressionContext>& ctx) + MongodProcessInterface(const intrusive_ptr<ExpressionContext>& ctx) : _ctx(ctx), _client(ctx->opCtx) {} void setOperationContext(OperationContext* opCtx) { @@ -194,7 +195,7 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace - // than the DocumentSource this MongodImplementation is injected into, but both + // than the DocumentSource this MongodProcessInterface is injected into, but both // ExpressionContext instances should still have the same OperationContext. invariant(_ctx->opCtx == expCtx->opCtx); @@ -211,7 +212,7 @@ public: if (opts.attachCursorSource) { cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); - } else if (opts.forceInjectMongod) { + } else if (opts.forceInjectMongoProcessInterface) { PipelineD::injectMongodInterface(pipeline.getValue().get()); } @@ -477,9 +478,10 @@ BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) { void PipelineD::injectMongodInterface(Pipeline* pipeline) { for (auto&& source : pipeline->_sources) { - if (auto needsMongod = dynamic_cast<DocumentSourceNeedsMongod*>(source.get())) { - needsMongod->injectMongodInterface( - std::make_shared<MongodImplementation>(pipeline->getContext())); + if (auto needsMongod = + dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get())) { + needsMongod->injectMongoProcessInterface( + std::make_shared<MongodProcessInterface>(pipeline->getContext())); } } } @@ -493,7 +495,7 @@ void PipelineD::prepareCursorSource(Collection* collection, // We will be modifying the source vector as we go. Pipeline::SourceContainer& sources = pipeline->_sources; - // Inject a MongodImplementation to sources that need them. + // Inject a MongodProcessInterface to sources that need them. injectMongodInterface(pipeline); if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) { diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index f49211a13ac..9aaae5bb27f 100644 --- a/src/mongo/db/pipeline/stub_mongod_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -35,12 +35,13 @@ namespace mongo { /** - * A stub MongodInterface that can be used for testing. Create a subclass and override methods as - * needed. + * A stub MongoProcessInterface that can be used for testing. Create a subclass and override + * methods as needed. */ -class StubMongodInterface : public DocumentSourceNeedsMongod::MongodInterface { +class StubMongoProcessInterface + : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { public: - virtual ~StubMongodInterface() = default; + virtual ~StubMongoProcessInterface() = default; void setOperationContext(OperationContext* opCtx) override { MONGO_UNREACHABLE; |