diff options
Diffstat (limited to 'src/mongo/db/pipeline')
43 files changed, 941 insertions, 901 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 7ea9d967c4a..17622862af1 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -699,216 +699,4 @@ protected: virtual ~SplittableDocumentSource() {} }; - -/** - * This class marks DocumentSources which need functionality specific to a mongos or a mongod. It - * causes a MongodProcessInterface to be injected when in a mongod and a MongosProcessInterface when - * in a mongos. - */ -class DocumentSourceNeedsMongoProcessInterface : public DocumentSource { -public: - /** - * Any functionality needed by an aggregation stage that is either context specific to a mongod - * or mongos process, or is only compiled in to one of those two binaries must be accessed via - * this interface. This allows all DocumentSources to be parsed on either mongos or mongod, but - * only executable where it makes sense. - */ - class MongoProcessInterface { - public: - enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle }; - enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; - enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps }; - - struct MakePipelineOptions { - MakePipelineOptions(){}; - - bool optimize = true; - bool attachCursorSource = true; - - // Ordinarily, a MongoProcessInterface is injected into the pipeline at the point - // when the cursor source is added. If true, 'forceInjectMongoProcessInterface' will - // inject MongoProcessInterfaces into the pipeline even if 'attachCursorSource' is - // false. If 'attachCursorSource' is true, then the value of - // 'forceInjectMongoProcessInterface' is irrelevant. - bool forceInjectMongoProcessInterface = false; - }; - - virtual ~MongoProcessInterface(){}; - - /** - * Sets the OperationContext of the DBDirectClient returned by directClient(). This method - * must be called after updating the 'opCtx' member of the ExpressionContext associated with - * the document source. - */ - virtual void setOperationContext(OperationContext* opCtx) = 0; - - /** - * Always returns a DBDirectClient. The return type in the function signature is a - * DBClientBase* because DBDirectClient isn't linked into mongos. - */ - virtual DBClientBase* directClient() = 0; - - // Note that in some rare cases this could return a false negative but will never return - // a false positive. This method will be fixed in the future once it becomes possible to - // avoid false negatives. - virtual bool isSharded(const NamespaceString& ns) = 0; - - /** - * Inserts 'objs' into 'ns' and returns the "detailed" last error object. - */ - virtual BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) = 0; - - virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) = 0; - - /** - * Appends operation latency statistics for collection "nss" to "builder" - */ - virtual void appendLatencyStats(const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const = 0; - - /** - * Appends storage statistics for collection "nss" to "builder" - */ - virtual Status appendStorageStats(const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const = 0; - - /** - * Appends the record count for collection "nss" to "builder". - */ - virtual Status appendRecordCount(const NamespaceString& nss, - BSONObjBuilder* builder) const = 0; - - /** - * Gets the collection options for the collection given by 'nss'. - */ - virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0; - - /** - * Performs the given rename command if the collection given by 'targetNs' has the same - * options as specified in 'originalCollectionOptions', and has the same indexes as - * 'originalIndexes'. - */ - virtual Status renameIfOptionsAndIndexesHaveNotChanged( - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) = 0; - - /** - * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of - * the returned pipeline will depend upon the supplied MakePipelineOptions: - * - The boolean opts.optimize determines whether the pipeline will be optimized. - * - If opts.attachCursorSource is false, the pipeline will be returned without attempting - * to add an initial cursor source. - * - If opts.forceInjectMongoProcessInterface is true, then a MongoProcessInterface will be - * provided to each stage which requires one, regardless of whether a cursor source is - * attached to the pipeline. - * - * This function returns a non-OK status if parsing the pipeline failed. - */ - virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts = MakePipelineOptions{}) = 0; - - /** - * Attaches a cursor source to the start of a pipeline. Performs no further optimization. - * This function asserts if the collection to be aggregated is sharded. NamespaceNotFound - * will be returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. - * That should be the only case where NamespaceNotFound is returned. - */ - virtual Status attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; - - /** - * Returns a vector of owned BSONObjs, each of which contains details of an in-progress - * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report - * operations for all authenticated users; otherwise, report only the current user's - * operations. - */ - virtual std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, - CurrentOpUserMode userMode, - CurrentOpTruncateMode) const = 0; - - /** - * Returns the name of the local shard if sharding is enabled, or an empty string. - */ - virtual std::string getShardName(OperationContext* opCtx) const = 0; - - /** - * Returns the fields of the document key (in order) for the current collection, including - * the shard key and _id. If _id is not in the shard key, it is added last. - */ - virtual std::vector<FieldPath> collectDocumentKeyFields(UUID) const = 0; - - /** - * Returns zero or one documents with the document key 'documentKey'. 'documentKey' is - * treated as a unique identifier of a document, and may include an _id or all fields from - * the shard key and an _id. Throws if more than one match was found. Returns boost::none if - * no matching documents were found, including cases where the given namespace does not - * exist. - */ - virtual boost::optional<Document> lookupSingleDocument( - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) = 0; - - /** - * Returns a vector of all local cursors. - */ - virtual std::vector<GenericCursor> getCursors( - const boost::intrusive_ptr<ExpressionContext>& expCtx) const = 0; - - // Add new methods as needed. - }; - - DocumentSourceNeedsMongoProcessInterface(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx) {} - - void injectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongoProcessInterface) { - _mongoProcessInterface = mongoProcessInterface; - doInjectMongoProcessInterface(mongoProcessInterface); - } - - /** - * Derived classes may override this method to register custom inject functionality. - */ - virtual void doInjectMongoProcessInterface( - std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {} - - void detachFromOperationContext() override { - invariant(_mongoProcessInterface); - _mongoProcessInterface->setOperationContext(nullptr); - doDetachFromOperationContext(); - } - - /** - * Derived classes may override this method to register custom detach functionality. - */ - virtual void doDetachFromOperationContext() {} - - void reattachToOperationContext(OperationContext* opCtx) final { - invariant(_mongoProcessInterface); - _mongoProcessInterface->setOperationContext(opCtx); - doReattachToOperationContext(opCtx); - } - - /** - * Derived classes may override this method to register custom reattach functionality. - */ - virtual void doReattachToOperationContext(OperationContext* opCtx) {} - -protected: - // It is invalid to delete through a DocumentSourceNeedsMongoProcessInterface-typed pointer. - virtual ~DocumentSourceNeedsMongoProcessInterface() {} - - // Gives subclasses access to a MongoProcessInterface implementation - std::shared_ptr<MongoProcessInterface> _mongoProcessInterface; -}; - - } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 537b32737da..95032ec3cae 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -433,7 +433,7 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D // the documentKey fields to include the shard key. We only need to re-check the documentKey // while the collection is unsharded; if the collection is or becomes sharded, then the // documentKey is final and will not change. - if (_mongoProcess && !_documentKeyFieldsSharded) { + if (!_documentKeyFieldsSharded) { // If this is not a shard server, 'catalogCache' will be nullptr and we will skip the // routing table check. auto catalogCache = Grid::get(_expCtx->opCtx)->catalogCache(); @@ -443,7 +443,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D return routingInfo.isOK() && routingInfo.getValue().cm(); }(); if (_documentKeyFields.empty() || collectionIsSharded) { - _documentKeyFields = _mongoProcess->collectDocumentKeyFields(uuid.getUuid()); + _documentKeyFields = _expCtx->mongoProcessInterface->collectDocumentKeyFields( + _expCtx->opCtx, _expCtx->ns, uuid.getUuid()); _documentKeyFieldsSharded = collectionIsSharded; } } diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 5a6e9d0dcf7..44c28591d7b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -49,6 +49,7 @@ #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/uuid.h" @@ -110,7 +111,9 @@ struct MockMongoProcessInterface final : public StubMongoProcessInterface { MockMongoProcessInterface(std::vector<FieldPath> fields) : _fields(std::move(fields)) {} - std::vector<FieldPath> collectDocumentKeyFields(UUID) const final { + std::vector<FieldPath> collectDocumentKeyFields(OperationContext*, + const NamespaceString&, + UUID) const final { return _fields; } @@ -131,9 +134,8 @@ public: vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry); auto transform = stages[2].get(); - auto mongoProcess = std::make_shared<MockMongoProcessInterface>(docKeyFields); - using NeedyDS = DocumentSourceNeedsMongoProcessInterface; - dynamic_cast<NeedyDS&>(*transform).injectMongoProcessInterface(std::move(mongoProcess)); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(docKeyFields); auto next = transform->getNext(); // Match stage should pass the doc down if expectedDoc is given. @@ -152,6 +154,8 @@ public: list<intrusive_ptr<DocumentSource>> result = DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(std::vector<FieldPath>{}); // This match stage is a DocumentSourceOplogMatch, which we explicitly disallow from // executing as a safety mechanism, since it needs to use the collection-default collation, diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 0ef2c383990..6fc95e0cec0 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -91,9 +91,7 @@ intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResu DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability( const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token) - : DocumentSourceNeedsMongoProcessInterface(expCtx), - _token(std::move(token)), - _verifiedResumability(false) {} + : DocumentSource(expCtx), _token(std::move(token)), _verifiedResumability(false) {} DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { pExpCtx->checkForInterrupt(); @@ -118,8 +116,8 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { // with the resume token. auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = - uassertStatusOK(_mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); + auto pipeline = uassertStatusOK( + pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); if (auto first = pipeline->getNext()) { auto firstOplogEntry = Value(*first); uassert(40576, diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 956135f9d7a..b06bae41da1 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -54,7 +54,7 @@ namespace mongo { * This source need only run on a sharded collection. For unsharded collections, * DocumentSourceEnsureResumeTokenPresent is sufficient. */ -class DocumentSourceShardCheckResumability final : public DocumentSourceNeedsMongoProcessInterface { +class DocumentSourceShardCheckResumability final : public DocumentSource { public: GetNextResult getNext() final; const char* getSourceName() const final; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index acf4f02e9e7..b4dce391492 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -218,11 +218,11 @@ public: MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { return false; } - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { @@ -259,8 +259,7 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); addDocument(resumeTimestamp, "ID"); // We should see the resume token. auto result = shardCheckResumability->getNext(); @@ -276,8 +275,7 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); addDocument(resumeTimestamp, "ID"); // We should see the resume token. auto result = shardCheckResumability->getNext(); @@ -291,8 +289,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIs auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog; - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); addDocument(resumeTimestamp, "ID"); // We should see the resume token. auto result = shardCheckResumability->getNext(); @@ -308,8 +305,7 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } @@ -321,8 +317,7 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576); } @@ -331,8 +326,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedWithNoDocumentsInPipelineAndOplo auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); deque<DocumentSource::GetNextResult> mockOplog; - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isEOF()); } @@ -346,8 +340,7 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); addDocument(docTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); @@ -363,8 +356,7 @@ TEST_F(ShardCheckResumabilityTest, auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); addDocument(docTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); ASSERT_THROWS_CODE(shardCheckResumability->getNext(), AssertionException, 40576); } @@ -377,16 +369,14 @@ TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); addDocument(docTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); auto result1 = shardCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); auto result2 = shardCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } @@ -401,8 +391,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAf addDocument(docTimestamp, "ID"); deque<DocumentSource::GetNextResult> mockOplog( {{Document{{"ts", oplogTimestamp}}}, {Document{{"ts", oplogFutureTimestamp}}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); auto result1 = shardCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); @@ -418,14 +407,12 @@ TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { auto shardCheckResumability = createShardCheckResumability(resumeTimestamp, "0"); deque<DocumentSource::GetNextResult> mockOplog({Document{{"ts", oplogTimestamp}}}); - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); auto result1 = shardCheckResumability->getNext(); ASSERT_TRUE(result1.isEOF()); mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; - shardCheckResumability->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockOplog)); + getExpCtx()->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>(mockOplog); auto result2 = shardCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 3a6e5fc9591..7d05a806e98 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -106,7 +106,7 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { builder.append("ns", pExpCtx->ns.ns()); - auto shardName = _mongoProcessInterface->getShardName(pExpCtx->opCtx); + auto shardName = pExpCtx->mongoProcessInterface->getShardName(pExpCtx->opCtx); if (!shardName.empty()) { builder.append("shard", shardName); @@ -121,14 +121,15 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { if (_collStatsSpec["latencyStats"].type() == BSONType::Object) { includeHistograms = _collStatsSpec["latencyStats"]["histograms"].boolean(); } - _mongoProcessInterface->appendLatencyStats(pExpCtx->ns, includeHistograms, &builder); + pExpCtx->mongoProcessInterface->appendLatencyStats( + pExpCtx->opCtx, pExpCtx->ns, includeHistograms, &builder); } if (_collStatsSpec.hasField("storageStats")) { // If the storageStats field exists, it must have been validated as an object when parsing. BSONObjBuilder storageBuilder(builder.subobjStart("storageStats")); - Status status = _mongoProcessInterface->appendStorageStats( - pExpCtx->ns, _collStatsSpec["storageStats"].Obj(), &storageBuilder); + Status status = pExpCtx->mongoProcessInterface->appendStorageStats( + pExpCtx->opCtx, pExpCtx->ns, _collStatsSpec["storageStats"].Obj(), &storageBuilder); storageBuilder.doneFast(); if (!status.isOK()) { uasserted(40280, @@ -138,7 +139,8 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { } if (_collStatsSpec.hasField("count")) { - Status status = _mongoProcessInterface->appendRecordCount(pExpCtx->ns, &builder); + Status status = pExpCtx->mongoProcessInterface->appendRecordCount( + pExpCtx->opCtx, pExpCtx->ns, &builder); if (!status.isOK()) { uasserted(40481, str::stream() << "Unable to retrieve count in $collStats stage: " diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index baae1fcb945..5b7bf719a14 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -36,7 +36,7 @@ namespace mongo { * Provides a document source interface to retrieve collection-level statistics for a given * collection. */ -class DocumentSourceCollStats : public DocumentSourceNeedsMongoProcessInterface { +class DocumentSourceCollStats : public DocumentSource { public: class LiteParsed final : public LiteParsedDocumentSource { public: @@ -68,7 +68,7 @@ public: }; DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongoProcessInterface(pExpCtx) {} + : DocumentSource(pExpCtx) {} GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index 029f754eeae..c650f9ae0b9 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -90,13 +90,13 @@ DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() { pExpCtx->checkForInterrupt(); if (_ops.empty()) { - _ops = _mongoProcessInterface->getCurrentOps( - _includeIdleConnections, _includeOpsFromAllUsers, _truncateOps); + _ops = pExpCtx->mongoProcessInterface->getCurrentOps( + pExpCtx->opCtx, _includeIdleConnections, _includeOpsFromAllUsers, _truncateOps); _opsIter = _ops.begin(); if (pExpCtx->fromMongos) { - _shardName = _mongoProcessInterface->getShardName(pExpCtx->opCtx); + _shardName = pExpCtx->mongoProcessInterface->getShardName(pExpCtx->opCtx); uassert(40465, "Aggregation request specified 'fromMongos' but unable to retrieve shard name " diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 2e15403e52f..14e95983f70 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -32,7 +32,7 @@ namespace mongo { -class DocumentSourceCurrentOp final : public DocumentSourceNeedsMongoProcessInterface { +class DocumentSourceCurrentOp final : public DocumentSource { public: class LiteParsed final : public LiteParsedDocumentSource { public: @@ -100,7 +100,7 @@ private: ConnMode includeIdleConnections = ConnMode::kExcludeIdle, UserMode includeOpsFromAllUsers = UserMode::kExcludeOthers, TruncationMode truncateOps = TruncationMode::kNoTruncation) - : DocumentSourceNeedsMongoProcessInterface(pExpCtx), + : DocumentSource(pExpCtx), _includeIdleConnections(includeIdleConnections), _includeOpsFromAllUsers(includeOpsFromAllUsers), _truncateOps(truncateOps) {} diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp index 82853b2f007..cbafc412e22 100644 --- a/src/mongo/db/pipeline/document_source_current_op_test.cpp +++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp @@ -65,7 +65,8 @@ public: MockMongoProcessInterfaceImplementation(bool hasShardName = true) : _hasShardName(hasShardName) {} - std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, + std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, + CurrentOpConnectionsMode connMode, CurrentOpUserMode userMode, CurrentOpTruncateMode truncateMode) const { return _ops; @@ -183,9 +184,10 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldSerializeOmittedOptionalArgumentsAsDef } TEST_F(DocumentSourceCurrentOpTest, ShouldReturnEOFImmediatelyIfNoCurrentOps) { + getExpCtx()->mongoProcessInterface = + std::make_shared<MockMongoProcessInterfaceImplementation>(); + const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(); - currentOp->injectMongoProcessInterface(mongod); ASSERT(currentOp->getNext().isEOF()); } @@ -195,10 +197,10 @@ TEST_F(DocumentSourceCurrentOpTest, getExpCtx()->fromMongos = true; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")}; - const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops); + getExpCtx()->mongoProcessInterface = + std::make_shared<MockMongoProcessInterfaceImplementation>(ops); const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - currentOp->injectMongoProcessInterface(mongod); const auto expectedOutput = Document{{"shard", kMockShardName}, @@ -213,10 +215,10 @@ TEST_F(DocumentSourceCurrentOpTest, getExpCtx()->fromMongos = false; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 430 }")}; - const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops); + getExpCtx()->mongoProcessInterface = + std::make_shared<MockMongoProcessInterfaceImplementation>(ops); const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - currentOp->injectMongoProcessInterface(mongod); const auto expectedOutput = Document{{"client", std::string("192.168.1.10:50844")}, {"opid", 430}}; @@ -227,10 +229,10 @@ TEST_F(DocumentSourceCurrentOpTest, TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfNoShardNameAvailableForShardedRequest) { getExpCtx()->fromMongos = true; - const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(false); + getExpCtx()->mongoProcessInterface = + std::make_shared<MockMongoProcessInterfaceImplementation>(false); const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - currentOp->injectMongoProcessInterface(mongod); ASSERT_THROWS_CODE(currentOp->getNext(), AssertionException, 40465); } @@ -239,10 +241,10 @@ TEST_F(DocumentSourceCurrentOpTest, ShouldFailIfOpIDIsNonNumericWhenModifyingInS getExpCtx()->fromMongos = true; std::vector<BSONObj> ops{fromjson("{ client: '192.168.1.10:50844', opid: 'string' }")}; - const auto mongod = std::make_shared<MockMongoProcessInterfaceImplementation>(ops); + getExpCtx()->mongoProcessInterface = + std::make_shared<MockMongoProcessInterfaceImplementation>(ops); const auto currentOp = DocumentSourceCurrentOp::create(getExpCtx()); - currentOp->injectMongoProcessInterface(mongod); ASSERT_THROWS_CODE(currentOp->getNext(), AssertionException, ErrorCodes::TypeMismatch); } diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 1fa0ae1fc2a..050ffa9f331 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -57,7 +57,7 @@ using std::vector; DocumentSourceFacet::DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines, const intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceNeedsMongoProcessInterface(expCtx), + : DocumentSource(expCtx), _teeBuffer(TeeBuffer::create(facetPipelines.size())), _facets(std::move(facetPipelines)) { for (size_t facetId = 0; facetId < _facets.size(); ++facetId) { @@ -215,25 +215,13 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::optimize() { return this; } -void DocumentSourceFacet::doInjectMongoProcessInterface( - std::shared_ptr<MongoProcessInterface> pipelineContext) { - for (auto&& facet : _facets) { - for (auto&& stage : facet.pipeline->getSources()) { - if (auto stageNeedingMongoProcessInterface = - dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(stage.get())) { - stageNeedingMongoProcessInterface->injectMongoProcessInterface(pipelineContext); - } - } - } -} - -void DocumentSourceFacet::doDetachFromOperationContext() { +void DocumentSourceFacet::detachFromOperationContext() { for (auto&& facet : _facets) { facet.pipeline->detachFromOperationContext(); } } -void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) { +void DocumentSourceFacet::reattachToOperationContext(OperationContext* opCtx) { for (auto&& facet : _facets) { facet.pipeline->reattachToOperationContext(opCtx); } diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index ff7c12f4ae1..32248772188 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -55,15 +55,14 @@ class NamespaceString; * stage which will produce a document like the following: * {facetA: [<all input documents except the first one>], facetB: [<the first document>]}. */ -class DocumentSourceFacet final : public DocumentSourceNeedsMongoProcessInterface, - public SplittableDocumentSource { +class DocumentSourceFacet final : public DocumentSource, public SplittableDocumentSource { public: struct FacetPipeline { - FacetPipeline(std::string name, std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline) + FacetPipeline(std::string name, std::unique_ptr<Pipeline, PipelineDeleter> pipeline) : name(std::move(name)), pipeline(std::move(pipeline)) {} std::string name; - std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline; + std::unique_ptr<Pipeline, PipelineDeleter> pipeline; }; class LiteParsed : public LiteParsedDocumentSource { @@ -136,9 +135,8 @@ public: // The following are overridden just to forward calls to sub-pipelines. void addInvolvedCollections(std::vector<NamespaceString>* collections) const final; - void doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface>) final; - void doDetachFromOperationContext() final; - void doReattachToOperationContext(OperationContext* opCtx) final; + void detachFromOperationContext() final; + void reattachToOperationContext(OperationContext* opCtx) final; StageConstraints constraints(Pipeline::SplitState pipeState) const final; protected: diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 1ed1f4a779c..c706a5ca31e 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -463,8 +463,22 @@ TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) { ASSERT_TRUE(dummy->isOptimized); } -TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) { +/** + * An implementation of the MongoProcessInterface that is okay with changing the OperationContext, + * but has no other parts of the interface implemented. + */ +class StubMongoProcessOkWithOpCtxChanges : public StubMongoProcessInterface { +public: + void setOperationContext(OperationContext* opCtx) final { + return; + } +}; + +TEST_F(DocumentSourceFacetTest, ShouldPropagateDetachingAndReattachingOfOpCtx) { auto ctx = getExpCtx(); + // We're going to be changing the OperationContext, so we need to use a MongoProcessInterface + // that won't throw when we do so. + ctx->mongoProcessInterface = stdx::make_unique<StubMongoProcessOkWithOpCtxChanges>(); auto firstDummy = DocumentSourcePassthrough::create(); auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({firstDummy}, ctx)); @@ -480,12 +494,12 @@ TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) { // Test detaching. ASSERT_FALSE(firstDummy->isDetachedFromOpCtx); ASSERT_FALSE(secondDummy->isDetachedFromOpCtx); - facetStage->doDetachFromOperationContext(); + facetStage->detachFromOperationContext(); ASSERT_TRUE(firstDummy->isDetachedFromOpCtx); ASSERT_TRUE(secondDummy->isDetachedFromOpCtx); // Test reattaching. - facetStage->doReattachToOperationContext(ctx->opCtx); + facetStage->reattachToOperationContext(ctx->opCtx); ASSERT_FALSE(firstDummy->isDetachedFromOpCtx); ASSERT_FALSE(secondDummy->isDetachedFromOpCtx); } diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index 4b81ff86bdf..39a31253573 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -185,7 +185,7 @@ BSONObj DocumentSourceGeoNear::buildGeoNearCmd() const { void DocumentSourceGeoNear::runCommand() { massert(16603, "Already ran geoNearCommand", !resultsIterator); - bool ok = _mongoProcessInterface->directClient()->runCommand( + bool ok = pExpCtx->mongoProcessInterface->directClient()->runCommand( pExpCtx->ns.db().toString(), buildGeoNearCmd(), cmdOutput); if (!ok) { uassertStatusOK(getStatusFromCommandResult(cmdOutput)); @@ -275,7 +275,7 @@ void DocumentSourceGeoNear::parseOptions(BSONObj options) { } DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongoProcessInterface(pExpCtx), + : DocumentSource(pExpCtx), coordsIsArray(false), limit(DocumentSourceGeoNear::kDefaultLimit), maxDistance(-1.0), diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 45eda260dba..376e4a03ea0 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -33,8 +33,7 @@ namespace mongo { -class DocumentSourceGeoNear : public DocumentSourceNeedsMongoProcessInterface, - public SplittableDocumentSource { +class DocumentSourceGeoNear : public DocumentSource, public SplittableDocumentSource { public: static const long long kDefaultLimit; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index a600097578c..07fac74d963 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -205,8 +205,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { // We've already allocated space for the trailing $match stage in '_fromPipeline'. _fromPipeline.back() = *matchStage; - auto pipeline = - uassertStatusOK(_mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx)); + auto pipeline = uassertStatusOK( + pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx)); while (auto next = pipeline->getNext()) { uassert(40271, str::stream() @@ -436,11 +436,11 @@ void DocumentSourceGraphLookUp::serializeToArray( } } -void DocumentSourceGraphLookUp::doDetachFromOperationContext() { +void DocumentSourceGraphLookUp::detachFromOperationContext() { _fromExpCtx->opCtx = nullptr; } -void DocumentSourceGraphLookUp::doReattachToOperationContext(OperationContext* opCtx) { +void DocumentSourceGraphLookUp::reattachToOperationContext(OperationContext* opCtx) { _fromExpCtx->opCtx = opCtx; } @@ -455,7 +455,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( boost::optional<FieldPath> depthField, boost::optional<long long> maxDepth, boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc) - : DocumentSourceNeedsMongoProcessInterface(expCtx), + : DocumentSource(expCtx), _from(std::move(from)), _as(std::move(as)), _connectFromField(std::move(connectFromField)), diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index d91c1675a81..9b5882e3f52 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -36,7 +36,7 @@ namespace mongo { -class DocumentSourceGraphLookUp final : public DocumentSourceNeedsMongoProcessInterface { +class DocumentSourceGraphLookUp final : public DocumentSource { public: static std::unique_ptr<LiteParsedDocumentSourceForeignCollections> liteParse( const AggregationRequest& request, const BSONElement& spec); @@ -73,9 +73,9 @@ public: collections->push_back(_from); } - void doDetachFromOperationContext() final; + void detachFromOperationContext() final; - void doReattachToOperationContext(OperationContext* opCtx) final; + void reattachToOperationContext(OperationContext* opCtx) final; static boost::intrusive_ptr<DocumentSourceGraphLookUp> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index c0037a3caaf..6dc9849baff 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -56,12 +56,12 @@ using DocumentSourceGraphLookUpTest = AggregationContextFixture; * A MongoProcessInterface use for testing that supports making pipelines with an initial * DocumentSourceMock source. */ -class MockMongoProcessInterfaceImplementation final : public StubMongoProcessInterface { +class MockMongoProcess final : public StubMongoProcessInterface { public: - MockMongoProcessInterfaceImplementation(std::deque<DocumentSource::GetNextResult> results) + MockMongoProcess(std::deque<DocumentSource::GetNextResult> results) : _results(std::move(results)) {} - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { @@ -82,7 +82,7 @@ public: } Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) override { + Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_results)); return Status::OK(); } @@ -94,7 +94,6 @@ private: TEST_F(DocumentSourceGraphLookUpTest, ShouldErrorWhenDoingInitialMatchIfDocumentInFromCollectionIsMissingId) { auto expCtx = getExpCtx(); - std::deque<DocumentSource::GetNextResult> inputs{Document{{"_id", 0}}}; auto inputMock = DocumentSourceMock::create(std::move(inputs)); @@ -102,6 +101,7 @@ TEST_F(DocumentSourceGraphLookUpTest, NamespaceString fromNs("test", "graph_lookup"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, fromNs, @@ -114,9 +114,6 @@ TEST_F(DocumentSourceGraphLookUpTest, boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); - ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271); } @@ -132,6 +129,7 @@ TEST_F(DocumentSourceGraphLookUpTest, NamespaceString fromNs("test", "graph_lookup"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, fromNs, @@ -144,8 +142,6 @@ TEST_F(DocumentSourceGraphLookUpTest, boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271); } @@ -161,6 +157,7 @@ TEST_F(DocumentSourceGraphLookUpTest, NamespaceString fromNs("test", "graph_lookup"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, boost::none); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, @@ -173,8 +170,6 @@ TEST_F(DocumentSourceGraphLookUpTest, boost::none, boost::none, unwindStage); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); graphLookupStage->setSource(inputMock.get()); ASSERT_THROWS_CODE(graphLookupStage->getNext(), AssertionException, 40271); @@ -205,6 +200,7 @@ TEST_F(DocumentSourceGraphLookUpTest, NamespaceString fromNs("test", "graph_lookup"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, fromNs, @@ -217,8 +213,6 @@ TEST_F(DocumentSourceGraphLookUpTest, boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); graphLookupStage->setSource(inputMock.get()); auto next = graphLookupStage->getNext(); @@ -270,6 +264,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePauses) { NamespaceString fromNs("test", "foreign"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, fromNs, @@ -284,9 +279,6 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePauses) { graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); - auto next = graphLookupStage->getNext(); ASSERT_TRUE(next.isAdvanced()); @@ -338,6 +330,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePausesWhileUnwinding) { NamespaceString fromNs("test", "foreign"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); const bool preserveNullAndEmptyArrays = false; const boost::optional<std::string> includeArrayIndex = boost::none; @@ -358,9 +351,6 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldPropagatePausesWhileUnwinding) { graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); - // Assert it has the expected results. Note the results can be in either order. auto expectedA = Document{{"startPoint", 0}, {"results", Document{{"_id", "a"_sd}, {"to", 0}, {"from", 1}}}}; @@ -403,6 +393,8 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupShouldReportAsFieldIsModified) auto expCtx = getExpCtx(); NamespaceString fromNs("test", "foreign"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcess>(std::deque<DocumentSource::GetNextResult>{}); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, fromNs, @@ -425,6 +417,8 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupShouldReportFieldsModifiedByAbs auto expCtx = getExpCtx(); NamespaceString fromNs("test", "foreign"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcess>(std::deque<DocumentSource::GetNextResult>{}); auto unwindStage = DocumentSourceUnwind::create(expCtx, "results", false, std::string("arrIndex")); auto graphLookupStage = @@ -455,6 +449,7 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupWithComparisonExpressionForStar expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); std::deque<DocumentSource::GetNextResult> fromContents{Document{{"_id", 0}, {"to", true}}, Document{{"_id", 1}, {"to", false}}}; + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); auto graphLookupStage = DocumentSourceGraphLookUp::create( expCtx, @@ -472,8 +467,6 @@ TEST_F(DocumentSourceGraphLookUpTest, GraphLookupWithComparisonExpressionForStar boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); auto next = graphLookupStage->getNext(); ASSERT_TRUE(next.isAdvanced()); @@ -519,6 +512,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldExpandArraysAtEndOfConnectFromField) NamespaceString fromNs("test", "graph_lookup"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, fromNs, @@ -531,8 +525,6 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldExpandArraysAtEndOfConnectFromField) boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); graphLookupStage->setSource(inputMock.get()); auto next = graphLookupStage->getNext(); @@ -592,6 +584,7 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldNotExpandArraysWithinArraysAtEndOfCo NamespaceString fromNs("test", "graph_lookup"); expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcess>(std::move(fromContents)); auto graphLookupStage = DocumentSourceGraphLookUp::create(expCtx, fromNs, @@ -604,8 +597,6 @@ TEST_F(DocumentSourceGraphLookUpTest, ShouldNotExpandArraysWithinArraysAtEndOfCo boost::none, boost::none); graphLookupStage->setSource(inputMock.get()); - graphLookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterfaceImplementation>(std::move(fromContents))); graphLookupStage->setSource(inputMock.get()); auto next = graphLookupStage->getNext(); diff --git a/src/mongo/db/pipeline/document_source_index_stats.cpp b/src/mongo/db/pipeline/document_source_index_stats.cpp index 5e12f7c72e8..dfb69786d18 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.cpp +++ b/src/mongo/db/pipeline/document_source_index_stats.cpp @@ -50,7 +50,7 @@ DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() { pExpCtx->checkForInterrupt(); if (_indexStatsMap.empty()) { - _indexStatsMap = _mongoProcessInterface->getIndexStats(pExpCtx->opCtx, pExpCtx->ns); + _indexStatsMap = pExpCtx->mongoProcessInterface->getIndexStats(pExpCtx->opCtx, pExpCtx->ns); _indexStatsIter = _indexStatsMap.begin(); } @@ -70,7 +70,7 @@ DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() { } DocumentSourceIndexStats::DocumentSourceIndexStats(const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongoProcessInterface(pExpCtx), _processName(getHostNameCachedAndPort()) {} + : DocumentSource(pExpCtx), _processName(getHostNameCachedAndPort()) {} intrusive_ptr<DocumentSource> DocumentSourceIndexStats::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 592f7f02a0d..b485db6d748 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -37,7 +37,7 @@ namespace mongo { * Provides a document source interface to retrieve index statistics for a given namespace. * Each document returned represents a single index and mongod instance. */ -class DocumentSourceIndexStats final : public DocumentSourceNeedsMongoProcessInterface { +class DocumentSourceIndexStats final : public DocumentSource { public: class LiteParsed final : public LiteParsedDocumentSource { public: diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.cpp b/src/mongo/db/pipeline/document_source_list_local_cursors.cpp index 084cf03a264..0bafd966e71 100644 --- a/src/mongo/db/pipeline/document_source_list_local_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_list_local_cursors.cpp @@ -73,5 +73,5 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceListLocalCursors::createFromB DocumentSourceListLocalCursors::DocumentSourceListLocalCursors( const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongoProcessInterface(pExpCtx) {} + : DocumentSource(pExpCtx), _cursors(pExpCtx->mongoProcessInterface->getCursors(pExpCtx)) {} } diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.h b/src/mongo/db/pipeline/document_source_list_local_cursors.h index a349057fd86..4746c4933e7 100644 --- a/src/mongo/db/pipeline/document_source_list_local_cursors.h +++ b/src/mongo/db/pipeline/document_source_list_local_cursors.h @@ -44,7 +44,7 @@ namespace mongo { * as true, and returns just sessions for the currently logged in user if * 'allUsers' is specified as false, or not specified at all. */ -class DocumentSourceListLocalCursors final : public DocumentSourceNeedsMongoProcessInterface { +class DocumentSourceListLocalCursors final : public DocumentSource { public: static const char* kStageName; @@ -94,10 +94,6 @@ public: return constraints; } - void doInjectMongoProcessInterface(std::shared_ptr<MongoProcessInterface> mongo) override { - _cursors = mongo->getCursors(pExpCtx); - } - static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index bf8a890924f..e4fd70920f6 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -70,7 +70,7 @@ constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth; DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongoProcessInterface(pExpCtx), + : DocumentSource(pExpCtx), _fromNs(std::move(fromNs)), _as(std::move(as)), _variables(pExpCtx->variables), @@ -249,7 +249,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { return output.freeze(); } -std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline( +std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( const Document& inputDoc) { // Copy all 'let' variables into the foreign pipeline's expression context. copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); @@ -260,21 +260,18 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline // If we don't have a cache, build and return the pipeline immediately. if (!_cache || _cache->isAbandoned()) { return uassertStatusOK( - _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx)); + pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx)); } // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a - // cursor source. If the cache is present and serving, then we will not be adding a cursor - // source later, so inject a MongoProcessInterface into all stages that need one. + // cursor source. MongoProcessInterface::MakePipelineOptions pipelineOpts; - pipelineOpts.optimize = false; pipelineOpts.attachCursorSource = false; - pipelineOpts.forceInjectMongoProcessInterface = _cache->isServing(); // Construct the basic pipeline without a cache stage. auto pipeline = uassertStatusOK( - _mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); + pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); // Add the cache stage at the end and optimize. During the optimization process, the cache will // either move itself to the correct position in the pipeline, or will abandon itself if no @@ -286,8 +283,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. - uassertStatusOK( - _mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get())); + uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( + _fromExpCtx, pipeline.get())); } // If the cache has been abandoned, release it. @@ -712,7 +709,7 @@ DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* return SEE_NEXT; } -void DocumentSourceLookUp::doDetachFromOperationContext() { +void DocumentSourceLookUp::detachFromOperationContext() { if (_pipeline) { // We have a pipeline we're going to be executing across multiple calls to getNext(), so we // use Pipeline::detachFromOperationContext() to take care of updating '_fromExpCtx->opCtx'. @@ -723,7 +720,7 @@ void DocumentSourceLookUp::doDetachFromOperationContext() { } } -void DocumentSourceLookUp::doReattachToOperationContext(OperationContext* opCtx) { +void DocumentSourceLookUp::reattachToOperationContext(OperationContext* opCtx) { if (_pipeline) { // We have a pipeline we're going to be executing across multiple calls to getNext(), so we // use Pipeline::reattachToOperationContext() to take care of updating '_fromExpCtx->opCtx'. diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 4662b606aed..530c62f985c 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -45,8 +45,7 @@ namespace mongo { * Queries separate collection for equality matches with documents in the pipeline collection. * Adds matching documents to a new array field in the input document. */ -class DocumentSourceLookUp final : public DocumentSourceNeedsMongoProcessInterface, - public SplittableDocumentSource { +class DocumentSourceLookUp final : public DocumentSource, public SplittableDocumentSource { public: static constexpr size_t kMaxSubPipelineDepth = 20; @@ -135,9 +134,9 @@ public: collections->push_back(_fromNs); } - void doDetachFromOperationContext() final; + void detachFromOperationContext() final; - void doReattachToOperationContext(OperationContext* opCtx) final; + void reattachToOperationContext(OperationContext* opCtx) final; static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); @@ -183,7 +182,7 @@ public: return _variablesParseState; } - std::unique_ptr<Pipeline, Pipeline::Deleter> getSubPipeline_forTest(const Document& inputDoc) { + std::unique_ptr<Pipeline, PipelineDeleter> getSubPipeline_forTest(const Document& inputDoc) { return buildPipeline(inputDoc); } @@ -267,7 +266,7 @@ private: * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a * cursor and/or cache source as appropriate. */ - std::unique_ptr<Pipeline, Pipeline::Deleter> buildPipeline(const Document& inputDoc); + std::unique_ptr<Pipeline, PipelineDeleter> buildPipeline(const Document& inputDoc); /** * The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that @@ -319,7 +318,7 @@ private: std::vector<BSONObj> _userPipeline; // A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective // functions. If sub-$lookup stages are present, their pipelines are constructed recursively. - std::unique_ptr<Pipeline, Pipeline::Deleter> _parsedIntrospectionPipeline; + std::unique_ptr<Pipeline, PipelineDeleter> _parsedIntrospectionPipeline; std::vector<LetVariable> _letVariables; @@ -329,7 +328,7 @@ private: // The following members are used to hold onto state across getNext() calls when '_unwindSrc' is // not null. long long _cursorIndex = 0; - std::unique_ptr<Pipeline, Pipeline::Deleter> _pipeline; + std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; boost::optional<Document> _input; boost::optional<Document> _nextValue; }; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index b616b5ec659..89af387cf42 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -112,8 +112,8 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat << resumeToken.getData().clusterTime)) : boost::none; invariant(resumeToken.getData().uuid); - auto lookedUpDoc = _mongoProcessInterface->lookupSingleDocument( - nss, *resumeToken.getData().uuid, documentKey, readConcern); + auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocument( + pExpCtx, nss, *resumeToken.getData().uuid, documentKey, readConcern); // Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may // not have returned any results if the document was deleted in the time since the update op. diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 2a4ccc972fb..416088fb742 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -40,7 +40,7 @@ namespace mongo { * Uses the ExpressionContext to determine what collection to look up into. * TODO SERVER-29134 When we allow change streams on multiple collections, this will need to change. */ -class DocumentSourceLookupChangePostImage final : public DocumentSourceNeedsMongoProcessInterface { +class DocumentSourceLookupChangePostImage final : public DocumentSource { public: static constexpr StringData kStageName = "$_internalLookupChangePostImage"_sd; static constexpr StringData kFullDocumentFieldName = @@ -106,7 +106,7 @@ public: private: DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceNeedsMongoProcessInterface(expCtx) {} + : DocumentSource(expCtx) {} /** * Uses the "documentKey" field from 'updateOp' to look up the current version of the document. diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 1d003c9c09e..ffcfb6fb904 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -83,11 +83,11 @@ public: MockMongoProcessInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { return false; } - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final { @@ -113,11 +113,12 @@ public: return Status::OK(); } - boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) { - boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss)); + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) { auto swPipeline = makePipeline({BSON("$match" << documentKey)}, expCtx); if (swPipeline == ErrorCodes::NamespaceNotFound) { return boost::none; @@ -158,8 +159,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyO lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } @@ -180,8 +181,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationTyp lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } @@ -202,8 +203,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) { lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } @@ -224,8 +225,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578); } @@ -246,8 +247,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatch lookupChangeStage->setSource(mockLocalSource.get()); // Mock out the foreign collection. - lookupChangeStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{})); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(deque<DocumentSource::GetNextResult>{}); ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579); } @@ -270,8 +271,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni // Mock out the foreign collection to have two documents with the same document key. deque<DocumentSource::GetNextResult> foreignCollection = {Document{{"_id", 0}}, Document{{"_id", 0}}}; - lookupChangeStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(std::move(foreignCollection))); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(std::move(foreignCollection)); ASSERT_THROWS_CODE( lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments); @@ -302,8 +303,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, Document{{"_id", 1}}}; - lookupChangeStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents))); + getExpCtx()->mongoProcessInterface = + stdx::make_unique<MockMongoProcessInterface>(std::move(mockForeignContents)); auto next = lookupChangeStage->getNext(); ASSERT_TRUE(next.isAdvanced()); diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 587411e15f2..ea96b7f9353 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -493,11 +493,11 @@ public: : _mockResults(std::move(mockResults)), _removeLeadingQueryStages(removeLeadingQueryStages) {} - bool isSharded(const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { return false; } - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { @@ -564,8 +564,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) { // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, Document{{"_id", 1}}}; - lookup->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents)); auto next = lookup->getNext(); ASSERT_TRUE(next.isAdvanced()); @@ -619,8 +619,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) { // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, Document{{"_id", 1}}}; - lookup->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::move(mockForeignContents)); auto next = lookup->getNext(); ASSERT_TRUE(next.isAdvanced()); @@ -710,8 +710,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) { auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongoProcessInterface( - std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{}); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -744,8 +744,8 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongoProcessInterface( - std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{}); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -774,8 +774,8 @@ TEST_F(DocumentSourceLookUpTest, ExprEmbeddedInMatchExpressionShouldBeOptimized) auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongoProcessInterface( - std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{}); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -815,8 +815,8 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongoProcessInterface( - std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{}); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -850,8 +850,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup) auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongoProcessInterface( - std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{}); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -886,8 +886,8 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongoProcessInterface( - std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{}); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -917,8 +917,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheEntirePipelineIfNonCorrelated) { auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - lookupStage->injectMongoProcessInterface( - std::shared_ptr<MockMongoProcessInterface>(new MockMongoProcessInterface({}))); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(std::deque<DocumentSource::GetNextResult>{}); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -959,8 +959,8 @@ TEST_F(DocumentSourceLookUpTest, deque<DocumentSource::GetNextResult> mockForeignContents{ Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}}; - lookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockForeignContents)); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(mockForeignContents); // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields. auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); @@ -1036,8 +1036,8 @@ TEST_F(DocumentSourceLookUpTest, deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}}, Document{{"x", 1}}}; - lookupStage->injectMongoProcessInterface( - std::make_shared<MockMongoProcessInterface>(mockForeignContents)); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoProcessInterface>(mockForeignContents); // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields. auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); @@ -1093,8 +1093,8 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl const bool removeLeadingQueryStages = true; - lookupStage->injectMongoProcessInterface(std::shared_ptr<MockMongoProcessInterface>( - new MockMongoProcessInterface({}, removeLeadingQueryStages))); + expCtx->mongoProcessInterface = std::make_shared<MockMongoProcessInterface>( + std::deque<DocumentSource::GetNextResult>{}, removeLeadingQueryStages); auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); ASSERT(subPipeline); diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 08dde0b4dc0..50678f82143 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -44,8 +44,9 @@ DocumentSourceOut::~DocumentSourceOut() { // Make sure we drop the temp collection if anything goes wrong. Errors are ignored // here because nothing can be done about them. Additionally, if this fails and the // collection is left behind, it will be cleaned up next time the server is started. - if (_mongoProcessInterface && _tempNs.size()) _mongoProcessInterface->directClient() - ->dropCollection(_tempNs.ns());) + if (_tempNs.size()) { + pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns()); + }); } std::unique_ptr<LiteParsedDocumentSourceForeignCollections> DocumentSourceOut::liteParse( @@ -80,12 +81,11 @@ const char* DocumentSourceOut::getSourceName() const { static AtomicUInt32 aggOutCounter; void DocumentSourceOut::initialize() { - invariant(_mongoProcessInterface); - DBClientBase* conn = _mongoProcessInterface->directClient(); + DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient(); // Save the original collection options and index specs so we can check they didn't change // during computation. - _originalOutOptions = _mongoProcessInterface->getCollectionOptions(_outputNs); + _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(_outputNs); _originalIndexes = conn->getIndexSpecs(_outputNs.ns()); // Check if it's sharded or capped to make sure we have a chance of succeeding before we do all @@ -95,7 +95,7 @@ void DocumentSourceOut::initialize() { uassert(17017, str::stream() << "namespace '" << _outputNs.ns() << "' is sharded so it can't be used for $out'", - !_mongoProcessInterface->isSharded(_outputNs)); + !pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)); uassert(17152, str::stream() << "namespace '" << _outputNs.ns() << "' is capped so it can't be used for $out", @@ -145,7 +145,7 @@ void DocumentSourceOut::initialize() { } void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) { - BSONObj err = _mongoProcessInterface->insert(_tempNs, toInsert); + BSONObj err = pExpCtx->mongoProcessInterface->insert(pExpCtx, _tempNs, toInsert); uassert(16996, str::stream() << "insert for $out failed: " << err, DBClientBase::getLastErrorString(err).empty()); @@ -195,8 +195,8 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { BSON("renameCollection" << _tempNs.ns() << "to" << _outputNs.ns() << "dropTarget" << true); - auto status = _mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged( - renameCommandObj, _outputNs, _originalOutOptions, _originalIndexes); + auto status = pExpCtx->mongoProcessInterface->renameIfOptionsAndIndexesHaveNotChanged( + pExpCtx->opCtx, renameCommandObj, _outputNs, _originalOutOptions, _originalIndexes); uassert(16997, str::stream() << "$out failed: " << status.reason(), status.isOK()); // We don't need to drop the temp collection in our destructor if the rename succeeded. @@ -212,7 +212,7 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs, const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongoProcessInterface(pExpCtx), + : DocumentSource(pExpCtx), _done(false), _tempNs(""), // Filled in during getNext(). _outputNs(outputNs) {} diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index a9d824a1731..26e49e9f54c 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -32,8 +32,7 @@ namespace mongo { -class DocumentSourceOut final : public DocumentSourceNeedsMongoProcessInterface, - public SplittableDocumentSource { +class DocumentSourceOut final : public DocumentSource, public SplittableDocumentSource { public: static std::unique_ptr<LiteParsedDocumentSourceForeignCollections> liteParse( const AggregationRequest& request, const BSONElement& spec); diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp index f48f5897add..136147c2d31 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp @@ -46,7 +46,7 @@ DocumentSourceSingleDocumentTransformation::DocumentSourceSingleDocumentTransfor const intrusive_ptr<ExpressionContext>& pExpCtx, std::unique_ptr<TransformerInterface> parsedTransform, std::string name) - : DocumentSourceNeedsMongoProcessInterface(pExpCtx), + : DocumentSource(pExpCtx), _parsedTransform(std::move(parsedTransform)), _name(std::move(name)) {} diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index d2f280f41d5..6b18e8cd74d 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -38,8 +38,7 @@ namespace mongo { * a ParsedSingleDocumentTransformation. It is not a registered DocumentSource, and it cannot be * created from BSON. */ -class DocumentSourceSingleDocumentTransformation final - : public DocumentSourceNeedsMongoProcessInterface { +class DocumentSourceSingleDocumentTransformation final : public DocumentSource { public: /** * This class defines the minimal interface that every parser wishing to take advantage of @@ -88,14 +87,7 @@ public: return false; } - protected: - MongoProcessInterface* _mongoProcess{nullptr}; - private: - void injectMongoProcess(MongoProcessInterface* p) { - _mongoProcess = p; - } - friend class DocumentSourceSingleDocumentTransformation; }; @@ -112,12 +104,6 @@ public: DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; GetModPathsReturn getModifiedPaths() const final; - void doInjectMongoProcessInterface( - std::shared_ptr<MongoProcessInterface> mongoProcessInterface) override { - - _parsedTransform->injectMongoProcess(mongoProcessInterface.get()); - } - StageConstraints constraints(Pipeline::SplitState pipeState) const final { StageConstraints constraints( StreamType::kStreaming, diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 799a8b14fcb..361542fd2db 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/query/collation/collation_spec.h" #include "mongo/db/query/collation/collator_factory_interface.h" @@ -43,6 +44,7 @@ ExpressionContext::ResolvedNamespace::ResolvedNamespace(NamespaceString ns, ExpressionContext::ExpressionContext(OperationContext* opCtx, const AggregationRequest& request, std::unique_ptr<CollatorInterface> collator, + std::shared_ptr<MongoProcessInterface> processInterface, StringMap<ResolvedNamespace> resolvedNamespaces) : ExpressionContext(opCtx, collator.get()) { explain = request.getExplain(); @@ -52,12 +54,15 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, allowDiskUse = request.shouldAllowDiskUse(); bypassDocumentValidation = request.shouldBypassDocumentValidation(); ns = request.getNamespaceString(); + mongoProcessInterface = std::move(processInterface); collation = request.getCollation(); _ownedCollator = std::move(collator); _resolvedNamespaces = std::move(resolvedNamespaces); } + ExpressionContext::ExpressionContext(OperationContext* opCtx, const CollatorInterface* collator) : opCtx(opCtx), + mongoProcessInterface(std::make_shared<StubMongoProcessInterface>()), timeZoneDatabase(opCtx && opCtx->getServiceContext() ? TimeZoneDatabase::get(opCtx->getServiceContext()) : nullptr), @@ -66,6 +71,14 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, const CollatorInte _documentComparator(_collator), _valueComparator(_collator) {} +ExpressionContext::ExpressionContext(NamespaceString nss, + std::shared_ptr<MongoProcessInterface> processInterface, + const TimeZoneDatabase* tzDb) + : ns(std::move(nss)), + mongoProcessInterface(std::move(processInterface)), + timeZoneDatabase(tzDb), + variablesParseState(variables.useIdGenerator()) {} + void ExpressionContext::checkForInterrupt() { // This check could be expensive, at least in relative terms, so don't check every time. if (--_interruptCounter == 0) { @@ -127,7 +140,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith( boost::optional<UUID> uuid, boost::optional<std::unique_ptr<CollatorInterface>> collator) const { intrusive_ptr<ExpressionContext> expCtx = - new ExpressionContext(std::move(ns), timeZoneDatabase); + new ExpressionContext(std::move(ns), mongoProcessInterface, timeZoneDatabase); expCtx->uuid = std::move(uuid); expCtx->explain = explain; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 3df31c97ddc..2bafa8253db 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -39,6 +39,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_comparator.h" +#include "mongo/db/pipeline/mongo_process_interface.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/pipeline/variables.h" #include "mongo/db/query/collation/collator_interface.h" @@ -100,6 +101,7 @@ public: ExpressionContext(OperationContext* opCtx, const AggregationRequest& request, std::unique_ptr<CollatorInterface> collator, + std::shared_ptr<MongoProcessInterface> mongoProcessInterface, StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces); /** @@ -180,6 +182,12 @@ public: OperationContext* opCtx; + // An interface for accessing information or performing operations that have different + // implementations on mongod and mongos, or that only make sense on one of the two. + // Additionally, putting some of this functionality behind an interface prevents aggregation + // libraries from having large numbers of dependencies. This pointer is always non-null. + std::shared_ptr<MongoProcessInterface> mongoProcessInterface; + const TimeZoneDatabase* timeZoneDatabase; // Collation requested by the user for this pipeline. Empty if the user did not request a @@ -197,10 +205,9 @@ public: protected: static const int kInterruptCheckPeriod = 128; - ExpressionContext(NamespaceString nss, const TimeZoneDatabase* tzDb) - : ns(std::move(nss)), - timeZoneDatabase(tzDb), - variablesParseState(variables.useIdGenerator()) {} + ExpressionContext(NamespaceString nss, + std::shared_ptr<MongoProcessInterface>, + const TimeZoneDatabase* tzDb); /** * Sets '_ownedCollator' and resets '_collator', 'documentComparator' and 'valueComparator'. diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h index 2747862f52b..db7bbed9599 100644 --- a/src/mongo/db/pipeline/expression_context_for_test.h +++ b/src/mongo/db/pipeline/expression_context_for_test.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/query/datetime/date_time_support.h" #include "mongo/db/query/query_test_service_context.h" @@ -48,7 +49,8 @@ public: : ExpressionContextForTest(NamespaceString{"test"_sd, "namespace"_sd}) {} ExpressionContextForTest(NamespaceString nss) - : ExpressionContext(std::move(nss), kNullTimeZoneDatabase), + : ExpressionContext( + std::move(nss), std::make_shared<StubMongoProcessInterface>(), kNullTimeZoneDatabase), _testOpCtx(_serviceContext.makeOperationContext()) { TimeZoneDatabase::set(_serviceContext.getServiceContext(), stdx::make_unique<TimeZoneDatabase>()); @@ -60,7 +62,8 @@ public: } ExpressionContextForTest(OperationContext* opCtx, const AggregationRequest& request) - : ExpressionContext(opCtx, request, nullptr, {}) {} + : ExpressionContext( + opCtx, request, nullptr, std::make_shared<StubMongoProcessInterface>(), {}) {} /** * Sets the resolved definition for an involved namespace. diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h new file mode 100644 index 00000000000..d32e8276c63 --- /dev/null +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <boost/optional.hpp> +#include <list> +#include <memory> +#include <string> +#include <vector> + +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/collection_index_usage_tracker.h" +#include "mongo/db/generic_cursor.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/field_path.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/db/query/explain_options.h" + +namespace mongo { + +class ExpressionContext; +class Pipeline; +class PipelineDeleter; + +/** + * Any functionality needed by an aggregation stage that is either context specific to a mongod or + * mongos process, or is only compiled in to one of those two binaries must be accessed via this + * interface. This allows all DocumentSources to be parsed on either mongos or mongod, but only + * executable where it makes sense. + */ +class MongoProcessInterface { +public: + enum class CurrentOpConnectionsMode { kIncludeIdle, kExcludeIdle }; + enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; + enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps }; + + struct MakePipelineOptions { + MakePipelineOptions(){}; + + bool optimize = true; + bool attachCursorSource = true; + }; + + virtual ~MongoProcessInterface(){}; + + /** + * Sets the OperationContext of the DBDirectClient returned by directClient(). This method must + * be called after updating the 'opCtx' member of the ExpressionContext associated with the + * document source. + */ + virtual void setOperationContext(OperationContext* opCtx) = 0; + + /** + * Always returns a DBDirectClient. The return type in the function signature is a DBClientBase* + * because DBDirectClient isn't linked into mongos. + */ + virtual DBClientBase* directClient() = 0; + + /** + * Note that in some rare cases this could return a false negative but will never return a false + * positive. This method will be fixed in the future once it becomes possible to avoid false + * negatives. + */ + virtual bool isSharded(OperationContext* opCtx, const NamespaceString& ns) = 0; + + /** + * Inserts 'objs' into 'ns' and returns the "detailed" last error object. + */ + virtual BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) = 0; + + virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) = 0; + + /** + * Appends operation latency statistics for collection "nss" to "builder" + */ + virtual void appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const = 0; + + /** + * Appends storage statistics for collection "nss" to "builder" + */ + virtual Status appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const = 0; + + /** + * Appends the record count for collection "nss" to "builder". + */ + virtual Status appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const = 0; + + /** + * Gets the collection options for the collection given by 'nss'. + */ + virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0; + + /** + * Performs the given rename command if the collection given by 'targetNs' has the same options + * as specified in 'originalCollectionOptions', and has the same indexes as 'originalIndexes'. + */ + virtual Status renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) = 0; + + /** + * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of the + * returned pipeline will depend upon the supplied MakePipelineOptions: + * - The boolean opts.optimize determines whether the pipeline will be optimized. + * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to + * add an initial cursor source. + * + * This function returns a non-OK status if parsing the pipeline failed. + */ + virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts = MakePipelineOptions{}) = 0; + + /** + * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This + * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be + * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be + * the only case where NamespaceNotFound is returned. + */ + virtual Status attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; + + /** + * Returns a vector of owned BSONObjs, each of which contains details of an in-progress + * operation or, optionally, an idle connection. If userMode is kIncludeAllUsers, report + * operations for all authenticated users; otherwise, report only the current user's operations. + */ + virtual std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, + CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode, + CurrentOpTruncateMode) const = 0; + + /** + * Returns the name of the local shard if sharding is enabled, or an empty string. + */ + virtual std::string getShardName(OperationContext* opCtx) const = 0; + + /** + * Returns the fields of the document key (in order) for the collection given by 'nss' and + * 'UUID', including the shard key and _id. If _id is not in the shard key, it is added last. + */ + virtual std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx, + const NamespaceString& nss, + UUID uuid) const = 0; + + /** + * Returns zero or one documents with the document key 'documentKey'. 'documentKey' is treated + * as a unique identifier of a document, and may include an _id or all fields from the shard key + * and an _id. Throws if more than one match was found. Returns boost::none if no matching + * documents were found, including cases where the given namespace does not exist. + */ + virtual boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) = 0; + + /** + * Returns a vector of all local cursors. + */ + virtual std::vector<GenericCursor> getCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const = 0; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 6f71c129707..4e12ba4358d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -78,17 +78,17 @@ Pipeline::~Pipeline() { invariant(_disposed); } -StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parse( +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parse( const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false); } -StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseFacetPipeline( +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseFacetPipeline( const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true); } -StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseTopLevelOrFacetPipeline( +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOrFacetPipeline( const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx, const bool isFacetPipeline) { @@ -103,22 +103,22 @@ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::parseTopLevel return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline); } -StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::create( +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::create( SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false); } -StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::createFacetPipeline( +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createFacetPipeline( SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true); } -StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> Pipeline::createTopLevelOrFacetPipeline( +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createTopLevelOrFacetPipeline( SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx, const bool isFacetPipeline) { - std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline(new Pipeline(std::move(stages), expCtx), - Pipeline::Deleter(expCtx->opCtx)); + std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx), + PipelineDeleter(expCtx->opCtx)); try { if (isFacetPipeline) { pipeline->validateFacetPipeline(); @@ -276,6 +276,7 @@ bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) { void Pipeline::detachFromOperationContext() { pCtx->opCtx = nullptr; + pCtx->mongoProcessInterface->setOperationContext(nullptr); for (auto&& source : _sources) { source->detachFromOperationContext(); @@ -284,6 +285,7 @@ void Pipeline::detachFromOperationContext() { void Pipeline::reattachToOperationContext(OperationContext* opCtx) { pCtx->opCtx = opCtx; + pCtx->mongoProcessInterface->setOperationContext(opCtx); for (auto&& source : _sources) { source->reattachToOperationContext(opCtx); @@ -307,7 +309,7 @@ void Pipeline::dispose(OperationContext* opCtx) { } } -std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() { +std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() { invariant(!isSplitForShards()); invariant(!isSplitForMerge()); invariant(!_unsplitSources); @@ -315,8 +317,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() { // Create and initialize the shard spec we'll return. We start with an empty pipeline on the // shards and all work being done in the merger. Optimizations can move operations between // the pipelines to be more efficient. - std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipeline(new Pipeline(pCtx), - Pipeline::Deleter(pCtx->opCtx)); + std::unique_ptr<Pipeline, PipelineDeleter> shardPipeline(new Pipeline(pCtx), + PipelineDeleter(pCtx->opCtx)); // Keep a copy of the original source list in case we need to reset the pipeline from split to // unsplit later. @@ -337,7 +339,7 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() { } void Pipeline::unsplitFromSharded( - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard) { + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMergingShard) { invariant(isSplitForShards()); invariant(!isSplitForMerge()); invariant(pipelineForMergingShard); diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 51cf4c34da0..6aba6d38d66 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -50,6 +50,7 @@ class ExpressionContext; class DocumentSource; class CollatorInterface; class OperationContext; +class PipelineDeleter; /** * A Pipeline object represents a list of DocumentSources and is responsible for optimizing the @@ -66,46 +67,6 @@ public: enum class SplitState { kUnsplit, kSplitForShards, kSplitForMerge }; /** - * This class will ensure a Pipeline is disposed before it is deleted. - */ - class Deleter { - public: - /** - * Constructs an empty deleter. Useful for creating a - * unique_ptr<Pipeline, Pipeline::Deleter> without populating it. - */ - Deleter() {} - - explicit Deleter(OperationContext* opCtx) : _opCtx(opCtx) {} - - /** - * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume - * responsibility for calling PlanExecutor::dispose(), they can call dismissDisposal(). If - * dismissed, a Deleter will not call dispose() when deleting the PlanExecutor. - */ - void dismissDisposal() { - _dismissed = true; - } - - /** - * Calls dispose() on 'pipeline', unless this Deleter has been dismissed. - */ - void operator()(Pipeline* pipeline) { - // It is illegal to call this method on a default-constructed Deleter. - invariant(_opCtx); - if (!_dismissed) { - pipeline->dispose(_opCtx); - } - delete pipeline; - } - - private: - OperationContext* _opCtx = nullptr; - - bool _dismissed = false; - }; - - /** * List of supported match expression features in a pipeline. */ static constexpr MatchExpressionParser::AllowedFeatureSet kAllowedMatcherFeatures = @@ -122,7 +83,7 @@ public: * will not be used during execution of the pipeline. Doing so may cause comparisons made during * parse-time to return the wrong results. */ - static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parse( + static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parse( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -133,7 +94,7 @@ public: * optimized, but the caller may convert it to an optimized pipeline by calling * optimizePipeline(). */ - static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parseFacetPipeline( + static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parseFacetPipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -143,7 +104,7 @@ public: * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage * is present but is not the last stage. */ - static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> create( + static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> create( SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** @@ -152,7 +113,7 @@ public: * Returns a non-OK status if any stage is invalid. For example, if the pipeline is empty or if * any stage is an initial source. */ - static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> createFacetPipeline( + static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> createFacetPipeline( SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** @@ -185,7 +146,7 @@ public: * Must be called before deleting a Pipeline. * * There are multiple cleanup scenarios: - * - This Pipeline will only ever use one OperationContext. In this case the Pipeline::Deleter + * - This Pipeline will only ever use one OperationContext. In this case the PipelineDeleter * will automatically call dispose() before deleting the Pipeline, and the owner need not * call dispose(). * - This Pipeline may use multiple OperationContexts over its lifetime. In this case it @@ -199,7 +160,7 @@ public: * results within mongos. This permanently alters this pipeline for the merging operation, and * returns a Pipeline object that should be executed on each targeted shard. */ - std::unique_ptr<Pipeline, Pipeline::Deleter> splitForSharded(); + std::unique_ptr<Pipeline, PipelineDeleter> splitForSharded(); /** * Reassemble a split shard pipeline into its original form. Upon return, this pipeline will @@ -207,7 +168,7 @@ public: * returned by a call to splitForSharded(). It is an error to call this on the merge part of the * pipeline, or on a pipeline that has not been split. */ - void unsplitFromSharded(std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard); + void unsplitFromSharded(std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMergingShard); /** * Returns true if this pipeline has not been split. @@ -334,12 +295,13 @@ private: }; friend class Optimizations::Sharded; + friend class PipelineDeleter; /** * Used by both Pipeline::parse() and Pipeline::parseFacetPipeline() to build and validate the * pipeline. */ - static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> parseTopLevelOrFacetPipeline( + static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parseTopLevelOrFacetPipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const bool isFacetPipeline); @@ -348,7 +310,7 @@ private: * Used by both Pipeline::create() and Pipeline::createFacetPipeline() to build and validate the * pipeline. */ - static StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> createTopLevelOrFacetPipeline( + static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> createTopLevelOrFacetPipeline( SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx, const bool isSubPipeline); @@ -410,4 +372,45 @@ private: boost::intrusive_ptr<ExpressionContext> pCtx; bool _disposed = false; }; + +/** + * This class will ensure a Pipeline is disposed before it is deleted. + */ +class PipelineDeleter { +public: + /** + * Constructs an empty deleter. Useful for creating a + * unique_ptr<Pipeline, PipelineDeleter> without populating it. + */ + PipelineDeleter() {} + + explicit PipelineDeleter(OperationContext* opCtx) : _opCtx(opCtx) {} + + /** + * If an owner of a std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> wants to assume + * responsibility for calling PlanExecutor::dispose(), they can call dismissDisposal(). If + * dismissed, a PipelineDeleter will not call dispose() when deleting the PlanExecutor. + */ + void dismissDisposal() { + _dismissed = true; + } + + /** + * Calls dispose() on 'pipeline', unless this PipelineDeleter has been dismissed. + */ + void operator()(Pipeline* pipeline) { + // It is illegal to call this method on a default-constructed PipelineDeleter. + invariant(_opCtx); + if (!_dismissed) { + pipeline->dispose(_opCtx); + } + delete pipeline; + } + +private: + OperationContext* _opCtx = nullptr; + + bool _dismissed = false; +}; + } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f19c9aa3abf..2638e7c9a82 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -95,360 +95,6 @@ using std::unique_ptr; namespace { -class MongodProcessInterface final - : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { -public: - MongodProcessInterface(const intrusive_ptr<ExpressionContext>& ctx) - : _ctx(ctx), _client(ctx->opCtx) {} - - void setOperationContext(OperationContext* opCtx) { - invariant(_ctx->opCtx == opCtx); - _client.setOpCtx(opCtx); - } - - DBClientBase* directClient() final { - return &_client; - } - - bool isSharded(const NamespaceString& nss) final { - AutoGetCollectionForReadCommand autoColl(_ctx->opCtx, nss); - // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding - // state. - auto css = CollectionShardingState::get(_ctx->opCtx, nss); - return bool(css->getMetadata()); - } - - BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final { - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (_ctx->bypassDocumentValidation) - maybeDisableValidation.emplace(_ctx->opCtx); - - _client.insert(ns.ns(), objs); - return _client.getLastErrorDetailed(); - } - - CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) final { - AutoGetCollectionForReadCommand autoColl(opCtx, ns); - - Collection* collection = autoColl.getCollection(); - if (!collection) { - LOG(2) << "Collection not found on index stats retrieval: " << ns.ns(); - return CollectionIndexUsageMap(); - } - - return collection->infoCache()->getIndexUsageStats(); - } - - void appendLatencyStats(const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const final { - Top::get(_ctx->opCtx->getServiceContext()) - .appendLatencyStats(nss.ns(), includeHistograms, builder); - } - - Status appendStorageStats(const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const final { - return appendCollectionStorageStats(_ctx->opCtx, nss, param, builder); - } - - Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const final { - return appendCollectionRecordCount(_ctx->opCtx, nss, builder); - } - - BSONObj getCollectionOptions(const NamespaceString& nss) final { - const auto infos = - _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); - } - - Status renameIfOptionsAndIndexesHaveNotChanged( - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) final { - Lock::GlobalWrite globalLock(_ctx->opCtx); - - if (SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions != - getCollectionOptions(targetNs))) { - return {ErrorCodes::CommandFailed, - str::stream() << "collection options of target collection " << targetNs.ns() - << " changed during processing. Original options: " - << originalCollectionOptions - << ", new options: " - << getCollectionOptions(targetNs)}; - } - - auto currentIndexes = _client.getIndexSpecs(targetNs.ns()); - if (originalIndexes.size() != currentIndexes.size() || - !std::equal(originalIndexes.begin(), - originalIndexes.end(), - currentIndexes.begin(), - SimpleBSONObjComparator::kInstance.makeEqualTo())) { - return {ErrorCodes::CommandFailed, - str::stream() << "indexes of target collection " << targetNs.ns() - << " changed during processing."}; - } - - BSONObj info; - bool ok = _client.runCommand("admin", renameCommandObj, info); - return ok ? Status::OK() : Status{ErrorCodes::CommandFailed, - str::stream() << "renameCollection failed: " << info}; - } - - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts = MakePipelineOptions{}) final { - // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace - // than the DocumentSource this MongodProcessInterface is injected into, but both - // ExpressionContext instances should still have the same OperationContext. - invariant(_ctx->opCtx == expCtx->opCtx); - - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } - - if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); - } - - Status cursorStatus = Status::OK(); - - if (opts.attachCursorSource) { - cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); - } else if (opts.forceInjectMongoProcessInterface) { - PipelineD::injectMongodInterface(pipeline.getValue().get()); - } - - return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; - } - - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { - invariant(_ctx->opCtx == expCtx->opCtx); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); - - boost::optional<AutoGetCollectionForReadCommand> autoColl; - if (expCtx->uuid) { - autoColl.emplace(expCtx->opCtx, expCtx->ns.db(), *expCtx->uuid); - if (autoColl->getCollection() == nullptr) { - // The UUID doesn't exist anymore. - return {ErrorCodes::NamespaceNotFound, - "No namespace with UUID " + expCtx->uuid->toString()}; - } - } else { - autoColl.emplace(expCtx->opCtx, expCtx->ns); - } - - // makePipeline() is only called to perform secondary aggregation requests and expects the - // collection representing the document source to be not-sharded. We confirm sharding state - // here to avoid taking a collection lock elsewhere for this purpose alone. - // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor - // until after we release the lock, leaving room for a collection to be sharded inbetween. - // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding - // state. - auto css = CollectionShardingState::get(_ctx->opCtx, expCtx->ns); - uassert(4567, - str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", - !bool(css->getMetadata())); - - PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); - // Optimize again, since there may be additional optimizations that can be done after adding - // the initial cursor stage. - pipeline->optimizePipeline(); - - return Status::OK(); - } - - std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, - CurrentOpUserMode userMode, - CurrentOpTruncateMode truncateMode) const { - AuthorizationSession* ctxAuth = AuthorizationSession::get(_ctx->opCtx->getClient()); - - const std::string hostName = getHostNameCachedAndPort(); - - std::vector<BSONObj> ops; - - for (ServiceContext::LockedClientsCursor cursor( - _ctx->opCtx->getClient()->getServiceContext()); - Client* client = cursor.next();) { - invariant(client); - - stdx::lock_guard<Client> lk(*client); - - // If auth is disabled, ignore the allUsers parameter. - if (ctxAuth->getAuthorizationManager().isAuthEnabled() && - userMode == CurrentOpUserMode::kExcludeOthers && - !ctxAuth->isCoauthorizedWithClient(client)) { - continue; - } - - const OperationContext* clientOpCtx = client->getOperationContext(); - - if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) { - continue; - } - - BSONObjBuilder infoBuilder; - - infoBuilder.append("host", hostName); - - client->reportState(infoBuilder); - - const auto& clientMetadata = - ClientMetadataIsMasterState::get(client).getClientMetadata(); - - if (clientMetadata) { - auto appName = clientMetadata.get().getApplicationName(); - if (!appName.empty()) { - infoBuilder.append("appName", appName); - } - - auto clientMetadataDocument = clientMetadata.get().getDocument(); - infoBuilder.append("clientMetadata", clientMetadataDocument); - } - - // Fill out the rest of the BSONObj with opCtx specific details. - infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx)); - infoBuilder.append( - "currentOpTime", - _ctx->opCtx->getServiceContext()->getPreciseClockSource()->now().toString()); - - if (clientOpCtx) { - infoBuilder.append("opid", clientOpCtx->getOpID()); - if (clientOpCtx->isKillPending()) { - infoBuilder.append("killPending", true); - } - - if (clientOpCtx->getLogicalSessionId()) { - BSONObjBuilder bob(infoBuilder.subobjStart("lsid")); - clientOpCtx->getLogicalSessionId()->serialize(&bob); - } - - CurOp::get(clientOpCtx) - ->reportState(&infoBuilder, - (truncateMode == CurrentOpTruncateMode::kTruncateOps)); - - Locker::LockerInfo lockerInfo; - clientOpCtx->lockState()->getLockerInfo(&lockerInfo); - fillLockerInfo(lockerInfo, infoBuilder); - } - - ops.emplace_back(infoBuilder.obj()); - } - - return ops; - } - - std::string getShardName(OperationContext* opCtx) const { - if (ShardingState::get(opCtx)->enabled()) { - return ShardingState::get(opCtx)->getShardName(); - } - - return std::string(); - } - - std::vector<FieldPath> collectDocumentKeyFields(UUID uuid) const final { - if (!ShardingState::get(_ctx->opCtx)->enabled()) { - return {"_id"}; // Nothing is sharded. - } - - auto scm = [this]() -> ScopedCollectionMetadata { - AutoGetCollection autoColl(_ctx->opCtx, _ctx->ns, MODE_IS); - return CollectionShardingState::get(_ctx->opCtx, _ctx->ns)->getMetadata(); - }(); - - if (!scm) { - return {"_id"}; // Collection is not sharded. - } - - uassert(ErrorCodes::InvalidUUID, - str::stream() << "Collection " << _ctx->ns.ns() - << " UUID differs from UUID on change stream operations", - scm->uuidMatches(uuid)); - - // Unpack the shard key. - std::vector<FieldPath> result; - bool gotId = false; - for (auto& field : scm->getKeyPatternFields()) { - result.emplace_back(field->dottedField()); - gotId |= (result.back().fullPath() == "_id"); - } - if (!gotId) { // If not part of the shard key, "_id" comes last. - result.emplace_back("_id"); - } - return result; - } - - boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) final { - invariant(!readConcern); // We don't currently support a read concern on mongod - it's only - // expected to be necessary on mongos. - // - // Be sure to do the lookup using the collection default collation. - auto foreignExpCtx = - _ctx->copyWith(nss, collectionUUID, _getCollectionDefaultCollator(nss, collectionUUID)); - auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); - if (swPipeline == ErrorCodes::NamespaceNotFound) { - return boost::none; - } - auto pipeline = uassertStatusOK(std::move(swPipeline)); - - auto lookedUpDocument = pipeline->getNext(); - if (auto next = pipeline->getNext()) { - uasserted(ErrorCodes::TooManyMatchingDocuments, - str::stream() << "found more than one document with document key " - << documentKey.toString() - << " [" - << lookedUpDocument->toString() - << ", " - << next->toString() - << "]"); - } - return lookedUpDocument; - } - - std::vector<GenericCursor> getCursors(const intrusive_ptr<ExpressionContext>& expCtx) const { - return CursorManager::getAllCursors(expCtx->opCtx); - } - -private: - /** - * Looks up the collection default collator for the collection given by 'collectionUUID'. A - * collection's default collation is not allowed to change, so we cache the result to allow for - * quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr' if the - * collection does not exist or if the collection's default collation is the simple collation. - */ - std::unique_ptr<CollatorInterface> _getCollectionDefaultCollator(const NamespaceString& nss, - UUID collectionUUID) { - if (_collatorCache.find(collectionUUID) == _collatorCache.end()) { - AutoGetCollection autoColl(_ctx->opCtx, nss, collectionUUID, MODE_IS); - if (!autoColl.getCollection()) { - // This collection doesn't exist - since we looked up by UUID, it will never exist - // in the future, so we cache a null pointer as the default collation. - _collatorCache[collectionUUID] = nullptr; - } else { - auto defaultCollator = autoColl.getCollection()->getDefaultCollator(); - // Clone the collator so that we can safely use the pointer if the collection - // disappears right after we release the lock. - _collatorCache[collectionUUID] = - defaultCollator ? defaultCollator->clone() : nullptr; - } - } - return _collatorCache[collectionUUID] ? _collatorCache[collectionUUID]->clone() : nullptr; - } - - intrusive_ptr<ExpressionContext> _ctx; - DBDirectClient _client; - std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache; -}; - /** * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {} * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough @@ -580,16 +226,6 @@ BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) { } } // namespace -void PipelineD::injectMongodInterface(Pipeline* pipeline) { - for (auto&& source : pipeline->_sources) { - if (auto needsMongod = - dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get())) { - needsMongod->injectMongoProcessInterface( - std::make_shared<MongodProcessInterface>(pipeline->getContext())); - } - } -} - void PipelineD::prepareCursorSource(Collection* collection, const NamespaceString& nss, const AggregationRequest* aggRequest, @@ -599,9 +235,6 @@ void PipelineD::prepareCursorSource(Collection* collection, // We will be modifying the source vector as we go. Pipeline::SourceContainer& sources = pipeline->_sources; - // Inject a MongodProcessInterface to sources that need them. - injectMongodInterface(pipeline); - if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) { return; } @@ -941,4 +574,338 @@ void PipelineD::getPlanSummaryStats(const Pipeline* pPipeline, PlanSummaryStats* statsOut->hasSortStage = hasSortStage; } +PipelineD::MongoDProcessInterface::MongoDProcessInterface(OperationContext* opCtx) + : _client(opCtx) {} + +void PipelineD::MongoDProcessInterface::setOperationContext(OperationContext* opCtx) { + _client.setOpCtx(opCtx); +} + +DBClientBase* PipelineD::MongoDProcessInterface::directClient() { + return &_client; +} + +bool PipelineD::MongoDProcessInterface::isSharded(OperationContext* opCtx, + const NamespaceString& nss) { + AutoGetCollectionForReadCommand autoColl(opCtx, nss); + // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding + // state. + auto css = CollectionShardingState::get(opCtx, nss); + return bool(css->getMetadata()); +} + +BSONObj PipelineD::MongoDProcessInterface::insert( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) { + boost::optional<DisableDocumentValidation> maybeDisableValidation; + if (expCtx->bypassDocumentValidation) + maybeDisableValidation.emplace(expCtx->opCtx); + + _client.insert(ns.ns(), objs); + return _client.getLastErrorDetailed(); +} + +CollectionIndexUsageMap PipelineD::MongoDProcessInterface::getIndexStats( + OperationContext* opCtx, const NamespaceString& ns) { + AutoGetCollectionForReadCommand autoColl(opCtx, ns); + + Collection* collection = autoColl.getCollection(); + if (!collection) { + LOG(2) << "Collection not found on index stats retrieval: " << ns.ns(); + return CollectionIndexUsageMap(); + } + + return collection->infoCache()->getIndexUsageStats(); +} + +void PipelineD::MongoDProcessInterface::appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const { + Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder); +} + +Status PipelineD::MongoDProcessInterface::appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const { + return appendCollectionStorageStats(opCtx, nss, param, builder); +} + +Status PipelineD::MongoDProcessInterface::appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const { + return appendCollectionRecordCount(opCtx, nss, builder); +} + +BSONObj PipelineD::MongoDProcessInterface::getCollectionOptions(const NamespaceString& nss) { + const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); +} + +Status PipelineD::MongoDProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) { + Lock::GlobalWrite globalLock(opCtx); + + if (SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions != + getCollectionOptions(targetNs))) { + return {ErrorCodes::CommandFailed, + str::stream() << "collection options of target collection " << targetNs.ns() + << " changed during processing. Original options: " + << originalCollectionOptions + << ", new options: " + << getCollectionOptions(targetNs)}; + } + + auto currentIndexes = _client.getIndexSpecs(targetNs.ns()); + if (originalIndexes.size() != currentIndexes.size() || + !std::equal(originalIndexes.begin(), + originalIndexes.end(), + currentIndexes.begin(), + SimpleBSONObjComparator::kInstance.makeEqualTo())) { + return {ErrorCodes::CommandFailed, + str::stream() << "indexes of target collection " << targetNs.ns() + << " changed during processing."}; + } + + BSONObj info; + bool ok = _client.runCommand("admin", renameCommandObj, info); + return ok ? Status::OK() : Status{ErrorCodes::CommandFailed, + str::stream() << "renameCollection failed: " << info}; +} + +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> +PipelineD::MongoDProcessInterface::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) { + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + if (opts.optimize) { + pipeline.getValue()->optimizePipeline(); + } + + Status cursorStatus = Status::OK(); + + if (opts.attachCursorSource) { + cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); + } + + return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; +} + +Status PipelineD::MongoDProcessInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); + + boost::optional<AutoGetCollectionForReadCommand> autoColl; + if (expCtx->uuid) { + autoColl.emplace(expCtx->opCtx, expCtx->ns.db(), *expCtx->uuid); + if (autoColl->getCollection() == nullptr) { + // The UUID doesn't exist anymore. + return {ErrorCodes::NamespaceNotFound, + "No namespace with UUID " + expCtx->uuid->toString()}; + } + } else { + autoColl.emplace(expCtx->opCtx, expCtx->ns); + } + + // makePipeline() is only called to perform secondary aggregation requests and expects the + // collection representing the document source to be not-sharded. We confirm sharding state + // here to avoid taking a collection lock elsewhere for this purpose alone. + // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor + // until after we release the lock, leaving room for a collection to be sharded inbetween. + // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding + // state. + auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns); + uassert(4567, + str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", + !bool(css->getMetadata())); + + PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); + + return Status::OK(); +} + +std::vector<BSONObj> PipelineD::MongoDProcessInterface::getCurrentOps( + OperationContext* opCtx, + CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode, + CurrentOpTruncateMode truncateMode) const { + AuthorizationSession* ctxAuth = AuthorizationSession::get(opCtx->getClient()); + + const std::string hostName = getHostNameCachedAndPort(); + + std::vector<BSONObj> ops; + + for (ServiceContext::LockedClientsCursor cursor(opCtx->getClient()->getServiceContext()); + Client* client = cursor.next();) { + invariant(client); + + stdx::lock_guard<Client> lk(*client); + + // If auth is disabled, ignore the allUsers parameter. + if (ctxAuth->getAuthorizationManager().isAuthEnabled() && + userMode == CurrentOpUserMode::kExcludeOthers && + !ctxAuth->isCoauthorizedWithClient(client)) { + continue; + } + + const OperationContext* clientOpCtx = client->getOperationContext(); + + if (!clientOpCtx && connMode == CurrentOpConnectionsMode::kExcludeIdle) { + continue; + } + + BSONObjBuilder infoBuilder; + + infoBuilder.append("host", hostName); + + client->reportState(infoBuilder); + const auto& clientMetadata = ClientMetadataIsMasterState::get(client).getClientMetadata(); + + if (clientMetadata) { + auto appName = clientMetadata.get().getApplicationName(); + if (!appName.empty()) { + infoBuilder.append("appName", appName); + } + + auto clientMetadataDocument = clientMetadata.get().getDocument(); + infoBuilder.append("clientMetadata", clientMetadataDocument); + } + + // Fill out the rest of the BSONObj with opCtx specific details. + infoBuilder.appendBool("active", static_cast<bool>(clientOpCtx)); + infoBuilder.append("currentOpTime", + opCtx->getServiceContext()->getPreciseClockSource()->now().toString()); + + if (clientOpCtx) { + infoBuilder.append("opid", clientOpCtx->getOpID()); + if (clientOpCtx->isKillPending()) { + infoBuilder.append("killPending", true); + } + + if (clientOpCtx->getLogicalSessionId()) { + BSONObjBuilder bob(infoBuilder.subobjStart("lsid")); + clientOpCtx->getLogicalSessionId()->serialize(&bob); + } + + CurOp::get(clientOpCtx) + ->reportState(&infoBuilder, (truncateMode == CurrentOpTruncateMode::kTruncateOps)); + + Locker::LockerInfo lockerInfo; + clientOpCtx->lockState()->getLockerInfo(&lockerInfo); + fillLockerInfo(lockerInfo, infoBuilder); + } + + ops.emplace_back(infoBuilder.obj()); + } + + return ops; +} + +std::string PipelineD::MongoDProcessInterface::getShardName(OperationContext* opCtx) const { + if (ShardingState::get(opCtx)->enabled()) { + return ShardingState::get(opCtx)->getShardName(); + } + + return std::string(); +} + +std::vector<FieldPath> PipelineD::MongoDProcessInterface::collectDocumentKeyFields( + OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const { + if (!ShardingState::get(opCtx)->enabled()) { + return {"_id"}; // Nothing is sharded. + } + + auto scm = [this, opCtx, &nss]() -> ScopedCollectionMetadata { + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + return CollectionShardingState::get(opCtx, nss)->getMetadata(); + }(); + + if (!scm) { + return {"_id"}; // Collection is not sharded. + } + + uassert(ErrorCodes::StaleConfig, + str::stream() << "Collection " << nss.ns() + << " UUID differs from UUID on change stream operations", + scm->uuidMatches(uuid)); + + // Unpack the shard key. + std::vector<FieldPath> result; + bool gotId = false; + for (auto& field : scm->getKeyPatternFields()) { + result.emplace_back(field->dottedField()); + gotId |= (result.back().fullPath() == "_id"); + } + if (!gotId) { // If not part of the shard key, "_id" comes last. + result.emplace_back("_id"); + } + return result; +} + +std::vector<GenericCursor> PipelineD::MongoDProcessInterface::getCursors( + const intrusive_ptr<ExpressionContext>& expCtx) const { + return CursorManager::getAllCursors(expCtx->opCtx); +} + +boost::optional<Document> PipelineD::MongoDProcessInterface::lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) { + invariant(!readConcern); // We don't currently support a read concern on mongod - it's only + // expected to be necessary on mongos. + + // Be sure to do the lookup using the collection default collation. + auto foreignExpCtx = expCtx->copyWith( + nss, collectionUUID, _getCollectionDefaultCollator(expCtx->opCtx, nss, collectionUUID)); + auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); + if (swPipeline == ErrorCodes::NamespaceNotFound) { + return boost::none; + } + auto pipeline = uassertStatusOK(std::move(swPipeline)); + + auto lookedUpDocument = pipeline->getNext(); + if (auto next = pipeline->getNext()) { + uasserted(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "found more than one document with document key " + << documentKey.toString() + << " [" + << lookedUpDocument->toString() + << ", " + << next->toString() + << "]"); + } + return lookedUpDocument; +} + +std::unique_ptr<CollatorInterface> PipelineD::MongoDProcessInterface::_getCollectionDefaultCollator( + OperationContext* opCtx, const NamespaceString& nss, UUID collectionUUID) { + if (_collatorCache.find(collectionUUID) == _collatorCache.end()) { + AutoGetCollection autoColl(opCtx, nss, collectionUUID, MODE_IS); + if (!autoColl.getCollection()) { + // This collection doesn't exist - since we looked up by UUID, it will never exist in + // the future, so we cache a null pointer as the default collation. + _collatorCache[collectionUUID] = nullptr; + } else { + auto defaultCollator = autoColl.getCollection()->getDefaultCollator(); + // Clone the collator so that we can safely use the pointer if the collection + // disappears right after we release the lock. + _collatorCache[collectionUUID] = defaultCollator ? defaultCollator->clone() : nullptr; + } + } + return _collatorCache[collectionUUID] ? _collatorCache[collectionUUID]->clone() : nullptr; +} } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 467aa9a8d1c..1551a9d035f 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -32,8 +32,10 @@ #include <memory> #include "mongo/bson/bsonobj.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/db/pipeline/mongo_process_interface.h" #include "mongo/db/query/plan_executor.h" namespace mongo { @@ -59,6 +61,75 @@ struct DepsTracker; */ class PipelineD { public: + class MongoDProcessInterface final : public MongoProcessInterface { + public: + MongoDProcessInterface(OperationContext* opCtx); + + void setOperationContext(OperationContext* opCtx) final; + DBClientBase* directClient() final; + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; + BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) final; + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) final; + void appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const final; + Status appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const final; + Status appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const final; + BSONObj getCollectionOptions(const NamespaceString& nss) final; + Status renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) final; + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts = MakePipelineOptions{}) final; + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final; + std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, + CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode, + CurrentOpTruncateMode truncateMode) const final; + std::string getShardName(OperationContext* opCtx) const final; + std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx, + const NamespaceString& nss, + UUID uuid) const final; + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) final; + std::vector<GenericCursor> getCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; + + private: + /** + * Looks up the collection default collator for the collection given by 'collectionUUID'. A + * collection's default collation is not allowed to change, so we cache the result to allow + * for quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr' + * if the collection does not exist or if the collection's default collation is the simple + * collation. + */ + std::unique_ptr<CollatorInterface> _getCollectionDefaultCollator(OperationContext* opCtx, + const NamespaceString& nss, + UUID collectionUUID); + + DBDirectClient _client; + std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache; + }; + /** * If the first stage in the pipeline does not generate its own output documents, attaches a * DocumentSourceCursor to the front of the pipeline which will output documents from the diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 30875726968..5ec188a2ef6 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1783,8 +1783,8 @@ public: virtual ~Base() {} protected: - std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipe; - std::unique_ptr<Pipeline, Pipeline::Deleter> shardPipe; + std::unique_ptr<Pipeline, PipelineDeleter> mergePipe; + std::unique_ptr<Pipeline, PipelineDeleter> shardPipe; private: OperationContextNoop _opCtx; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 6b0032b1fd3..0ec37b15eb4 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -26,20 +26,22 @@ * it in the license file. */ -#include "mongo/platform/basic.h" +#pragma once -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/mongo_process_interface.h" +#include "mongo/db/pipeline/pipeline.h" #include "mongo/util/assert_util.h" namespace mongo { /** - * A stub MongoProcessInterface that can be used for testing. Create a subclass and override - * methods as needed. + * A stub MongoProcessInterface that provides default implementations of all methods, which can then + * be individually overridden for testing. This class may also be used in scenarios where a + * placeholder MongoProcessInterface is required by an interface but will not be called. To + * guarantee the latter, method implementations in this class are marked MONGO_UNREACHABLE. */ -class StubMongoProcessInterface - : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { +class StubMongoProcessInterface : public MongoProcessInterface { public: virtual ~StubMongoProcessInterface() = default; @@ -51,11 +53,13 @@ public: MONGO_UNREACHABLE; } - bool isSharded(const NamespaceString& ns) override { + bool isSharded(OperationContext* opCtx, const NamespaceString& ns) override { MONGO_UNREACHABLE; } - BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) override { + BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) override { MONGO_UNREACHABLE; } @@ -64,19 +68,23 @@ public: MONGO_UNREACHABLE; } - void appendLatencyStats(const NamespaceString& nss, + void appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, bool includeHistograms, BSONObjBuilder* builder) const override { MONGO_UNREACHABLE; } - Status appendStorageStats(const NamespaceString& nss, + Status appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, const BSONObj& param, BSONObjBuilder* builder) const override { MONGO_UNREACHABLE; } - Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const override { + Status appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const override { MONGO_UNREACHABLE; } @@ -85,6 +93,7 @@ public: } Status renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, const BSONObj& renameCommandObj, const NamespaceString& targetNs, const BSONObj& originalCollectionOptions, @@ -92,7 +101,7 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) override { @@ -104,7 +113,8 @@ public: MONGO_UNREACHABLE; } - std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, + std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, + CurrentOpConnectionsMode connMode, CurrentOpUserMode userMode, CurrentOpTruncateMode truncateMode) const override { MONGO_UNREACHABLE; @@ -114,14 +124,18 @@ public: MONGO_UNREACHABLE; } - std::vector<FieldPath> collectDocumentKeyFields(UUID) const override { + std::vector<FieldPath> collectDocumentKeyFields(OperationContext*, + const NamespaceString&, + UUID) const override { MONGO_UNREACHABLE; } - boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) { + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) { MONGO_UNREACHABLE; } |