diff options
Diffstat (limited to 'src/mongo')
92 files changed, 444 insertions, 471 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index b34cd5987d7..25efa885ef3 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -348,6 +348,7 @@ pipelineEnv.Library( '$BUILD_DIR/mongo/db/bson/dotted_path_support', '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', + '$BUILD_DIR/mongo/db/exec/scoped_timer', '$BUILD_DIR/mongo/db/exec/sort_executor', '$BUILD_DIR/mongo/db/generic_cursor', '$BUILD_DIR/mongo/db/index/key_generator', diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 75911585dfe..bdad5fb8fb7 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/exec/scoped_timer.h" #include "mongo/db/matcher/expression_algo.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_internal_shard_filter.h" @@ -51,8 +52,9 @@ using std::list; using std::string; using std::vector; -DocumentSource::DocumentSource(const intrusive_ptr<ExpressionContext>& pCtx) - : pSource(nullptr), pExpCtx(pCtx) {} +DocumentSource::DocumentSource(const StringData stageName, + const intrusive_ptr<ExpressionContext>& pCtx) + : pSource(nullptr), pExpCtx(pCtx), _commonStats(stageName.rawData()) {} namespace { // Used to keep track of which DocumentSources are registered under which name. @@ -85,6 +87,21 @@ list<intrusive_ptr<DocumentSource>> DocumentSource::parse( return it->second(stageSpec, expCtx); } +DocumentSource::GetNextResult DocumentSource::getNext() { + pExpCtx->checkForInterrupt(); + invariant(pExpCtx->opCtx->getServiceContext()); + invariant(pExpCtx->opCtx->getServiceContext()->getFastClockSource()); + ScopedTimer timer(pExpCtx->opCtx->getServiceContext()->getFastClockSource(), + &_commonStats.executionTimeMillis); + ++_commonStats.works; + + GetNextResult next = doGetNext(); + if (next.isAdvanced()) { + ++_commonStats.advanced; + } + return next; +} + const char* DocumentSource::getSourceName() const { static const char unknown[] = "[UNKNOWN]"; return unknown; diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 435219fdfda..d37fd19d2c6 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -43,6 +43,7 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/collection_index_usage_tracker.h" #include "mongo/db/commands.h" +#include "mongo/db/exec/plan_stats.h" #include "mongo/db/generic_cursor.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" @@ -241,14 +242,12 @@ public: * The main execution API of a DocumentSource. Returns an intermediate query result generated by * this DocumentSource. * - * All implementers must call pExpCtx->checkForInterrupt(). - * * For performance reasons, a streaming stage must not keep references to documents across calls * to getNext(). Such stages must retrieve a result from their child and then release it (or * return it) before asking for another result. Failing to do so can result in extra work, since * the Document/Value library must copy data on write when that data has a refcount above one. */ - virtual GetNextResult getNext() = 0; + GetNextResult getNext(); /** * Returns a struct containing information about any special constraints imposed on using this @@ -305,7 +304,6 @@ public: virtual void addInvolvedCollections( stdx::unordered_set<NamespaceString>* collectionNames) const {} - virtual void detachFromOperationContext() {} virtual void reattachToOperationContext(OperationContext* opCtx) {} @@ -470,7 +468,14 @@ public: } protected: - explicit DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + DocumentSource(const StringData stageName, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** + * The main execution API of a DocumentSource. Returns an intermediate query result generated by + * this DocumentSource. See comment at getNext(). + */ + virtual GetNextResult doGetNext() = 0; /** * Attempt to perform an optimization with the following source in the pipeline. 'container' @@ -507,6 +512,8 @@ protected: boost::intrusive_ptr<ExpressionContext> pExpCtx; private: + CommonStats _commonStats; + /** * Create a Value that represents the document source. * diff --git a/src/mongo/db/pipeline/document_source_add_fields.cpp b/src/mongo/db/pipeline/document_source_add_fields.cpp index dd7550a0c1c..a362f38495d 100644 --- a/src/mongo/db/pipeline/document_source_add_fields.cpp +++ b/src/mongo/db/pipeline/document_source_add_fields.cpp @@ -66,7 +66,7 @@ intrusive_ptr<DocumentSource> DocumentSourceAddFields::create( throw; } }(), - userSpecifiedName.toString(), + userSpecifiedName == kStageName ? kStageName : kAliasNameSet, isIndependentOfAnyCollection)); return addFields; } diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index f8fa5e86ceb..26cb0e9a89b 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -84,12 +84,10 @@ std::string nextFileName() { } // namespace const char* DocumentSourceBucketAuto::getSourceName() const { - return "$bucketAuto"; + return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceBucketAuto::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceBucketAuto::doGetNext() { if (!_populated) { const auto populationResult = populateSorter(); if (populationResult.isPaused()) { @@ -426,7 +424,7 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto( std::vector<AccumulationStatement> accumulationStatements, const boost::intrusive_ptr<GranularityRounder>& granularityRounder, uint64_t maxMemoryUsageBytes) - : DocumentSource(pExpCtx), + : DocumentSource(kStageName, pExpCtx), _nBuckets(numBuckets), _maxMemoryUsageBytes(maxMemoryUsageBytes), _groupByExpression(groupByExpression), diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index ec2ff61b7e7..65e2cd960a2 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -44,9 +44,10 @@ namespace mongo { */ class DocumentSourceBucketAuto final : public DocumentSource { public: + static constexpr StringData kStageName = "$bucketAuto"_sd; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; DepsTracker::State getDependencies(DepsTracker* deps) const final; - GetNextResult getNext() final; + const char* getSourceName() const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -93,6 +94,7 @@ public: const std::vector<AccumulationStatement>& getAccumulatedFields() const; protected: + GetNextResult doGetNext() final; void doDispose() final; private: diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 9050b9990dd..a9d80a0e2a0 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -242,7 +242,6 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac } } - BSONObj DocumentSourceChangeStream::buildMatchFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp startFrom, diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 7487e565f1e..70357cd433e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -150,14 +150,12 @@ public: enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster }; - /** * Helpers for Determining which regex to match a change stream against. */ static ChangeStreamType getChangeStreamType(const NamespaceString& nss); static std::string getNsRegexForChangeStream(const NamespaceString& nss); - /** * Produce the BSON object representing the filter for the $match stage to filter oplog entries * to only those relevant for this $changeStream stage. @@ -215,7 +213,7 @@ public: const char* getSourceName() const final; - GetNextResult getNext() final { + GetNextResult doGetNext() final { // We should never execute this stage directly. We expect this stage to be absorbed into the // cursor feeding the pipeline, and executing this stage may result in the use of the wrong // collation. The comparisons against the oplog must use the simple collation, regardless of diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp index 1fedb3cb940..a4a43c654ee 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp @@ -52,9 +52,7 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt } // namespace -DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceCloseCursor::doGetNext() { // Close cursor if we have returned an invalidate entry. if (_shouldCloseCursor) { uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated"); diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index ab28dee5e8d..6fc042b9bb4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -43,11 +43,11 @@ namespace mongo { */ class DocumentSourceCloseCursor final : public DocumentSource { public: - GetNextResult getNext() final; + static constexpr StringData kStageName = "$changeStream"_sd; const char* getSourceName() const final { // This is used in error reporting. - return "$changeStream"; + return DocumentSourceCloseCursor::kStageName.rawData(); } StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -88,7 +88,9 @@ private: * Use the create static method to create a DocumentSourceCloseCursor. */ DocumentSourceCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx) {} + : DocumentSource(kStageName, expCtx) {} + + GetNextResult doGetNext() final; bool _shouldCloseCursor = false; }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 1c6739b8497..5a558a924b9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -82,7 +82,7 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( const boost::intrusive_ptr<ExpressionContext>& expCtx, const ServerGlobalParams::FeatureCompatibility::Version& fcv, BSONObj changeStreamSpec) - : DocumentSource(expCtx), + : DocumentSource(DocumentSourceChangeStreamTransform::kStageName, expCtx), _changeStreamSpec(changeStreamSpec.getOwned()), _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()), _fcv(fcv) { @@ -393,9 +393,7 @@ DocumentSource::GetModPathsReturn DocumentSourceChangeStreamTransform::getModifi return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}}; } -DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::doGetNext() { uassert(50988, "Illegal attempt to execute an internal change stream stage on mongos. A $changeStream " "stage must be the first stage in a pipeline", diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index c20f5864e67..df8cecf49f0 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -39,6 +39,7 @@ namespace mongo { class DocumentSourceChangeStreamTransform : public DocumentSource { public: + static constexpr StringData kStageName = "$_internalChangeStreamTransform"_sd; /** * Creates a new transformation stage from the given specification. */ @@ -58,11 +59,13 @@ public: return boost::none; } - DocumentSource::GetNextResult getNext(); const char* getSourceName() const { return DocumentSourceChangeStream::kStageName.rawData(); } +protected: + DocumentSource::GetNextResult doGetNext() override; + private: // This constructor is private, callers should use the 'create()' method above. DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>&, diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp index 376e0dc97c9..5b286b585b0 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp @@ -58,9 +58,7 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt } // namespace -DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() { if (_queuedInvalidate) { const auto res = DocumentSource::GetNextResult(std::move(_queuedInvalidate.get())); _queuedInvalidate.reset(); diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h index 349ad68c589..69b4dcb742c 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -40,11 +40,11 @@ namespace mongo { */ class DocumentSourceCheckInvalidate final : public DocumentSource { public: - GetNextResult getNext() final; + static constexpr StringData kStageName = "$_checkInvalidate"_sd; const char* getSourceName() const final { // This is used in error reporting. - return "$_checkInvalidate"; + return DocumentSourceCheckInvalidate::kStageName.rawData(); } StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -80,11 +80,14 @@ private: */ DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<ResumeTokenData> startAfterInvalidate) - : DocumentSource(expCtx), _startAfterInvalidate(std::move(startAfterInvalidate)) { + : DocumentSource(kStageName, expCtx), + _startAfterInvalidate(std::move(startAfterInvalidate)) { invariant(!_startAfterInvalidate || _startAfterInvalidate->fromInvalidate == ResumeTokenData::kFromInvalidate); } + GetNextResult doGetNext() final; + boost::optional<ResumeTokenData> _startAfterInvalidate; boost::optional<Document> _queuedInvalidate; }; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 5b9eeabaf6c..d5f6da9a8c0 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -168,7 +168,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte } // namespace const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const { - return "$_ensureResumeTokenPresent"; + return kStageName.rawData(); } Value DocumentSourceEnsureResumeTokenPresent::serialize( @@ -186,11 +186,9 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} - -DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() { - pExpCtx->checkForInterrupt(); + : DocumentSource(kStageName, expCtx), _tokenFromClient(std::move(token)) {} +DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext() { if (_resumeStatus == ResumeStatus::kSurpassedToken) { // We've already verified the resume token is present. return pSource->getNext(); @@ -256,7 +254,7 @@ DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken( } const char* DocumentSourceShardCheckResumability::getSourceName() const { - return "$_checkShardResumability"; + return kStageName.rawData(); } Value DocumentSourceShardCheckResumability::serialize( @@ -279,11 +277,9 @@ intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResu DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability( const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) - : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {} - -DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { - pExpCtx->checkForInterrupt(); + : DocumentSource(kStageName, expCtx), _tokenFromClient(std::move(token)) {} +DocumentSource::GetNextResult DocumentSourceShardCheckResumability::doGetNext() { if (_surpassedResumeToken) return pSource->getNext(); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 8c90a88b564..f13d4741f30 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -59,7 +59,8 @@ namespace mongo { */ class DocumentSourceShardCheckResumability final : public DocumentSource { public: - GetNextResult getNext() final; + static constexpr StringData kStageName = "$_internalCheckShardResumability"_sd; + const char* getSourceName() const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -92,6 +93,8 @@ private: DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); + GetNextResult doGetNext() final; + void _assertOplogHasEnoughHistory(const GetNextResult& nextInput); ResumeTokenData _tokenFromClient; @@ -105,6 +108,7 @@ private: */ class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource { public: + static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd; // Used to record the results of comparing the token data extracted from documents in the // resumed stream against the client's resume token. enum class ResumeStatus { @@ -113,7 +117,6 @@ public: kCheckNextDoc // The next document produced by the stream may contain the resume token. }; - GetNextResult getNext() final; const char* getSourceName() const final; StageConstraints constraints(Pipeline::SplitState) const final { @@ -154,6 +157,8 @@ private: DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); + GetNextResult doGetNext() final; + /** * Check the given event to determine whether it matches the client's resume token. If so, we * swallow this event and return the next event in the stream. Otherwise, return boost::none. diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index bbf031b9f08..72c504a6093 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -46,7 +46,7 @@ REGISTER_DOCUMENT_SOURCE(collStats, DocumentSourceCollStats::createFromBson); const char* DocumentSourceCollStats::getSourceName() const { - return "$collStats"; + return kStageName.rawData(); } intrusive_ptr<DocumentSource> DocumentSourceCollStats::createFromBson( @@ -98,9 +98,7 @@ intrusive_ptr<DocumentSource> DocumentSourceCollStats::createFromBson( return collStats; } -DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceCollStats::doGetNext() { if (_finished) { return GetNextResult::makeEOF(); } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index a950afb1813..6aa1b27192a 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -39,6 +39,8 @@ namespace mongo { */ class DocumentSourceCollStats : public DocumentSource { public: + static constexpr StringData kStageName = "$collStats"_sd; + class LiteParsed final : public LiteParsedDocumentSource { public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, @@ -69,9 +71,7 @@ public: }; DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx) {} - - GetNextResult getNext() final; + : DocumentSource(kStageName, pExpCtx) {} const char* getSourceName() const final; @@ -98,6 +98,8 @@ public: BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); private: + GetNextResult doGetNext() final; + // The raw object given to $collStats containing user specified options. BSONObj _collStatsSpec; bool _finished = false; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index 72225d8185f..20f67ef9175 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -103,14 +103,11 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li return std::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers, localOps); } - const char* DocumentSourceCurrentOp::getSourceName() const { return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceCurrentOp::doGetNext() { if (_ops.empty()) { _ops = pExpCtx->mongoProcessInterface->getCurrentOps(pExpCtx, _includeIdleConnections, diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 0e86973a009..fffc51503a3 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -102,8 +102,6 @@ public: CursorMode idleCursors = CursorMode::kExcludeCursors, BacktraceMode backtrace = BacktraceMode::kExcludeBacktrace); - GetNextResult getNext() final; - const char* getSourceName() const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -140,7 +138,7 @@ private: TruncationMode truncateOps, CursorMode idleCursors, BacktraceMode backtrace) - : DocumentSource(pExpCtx), + : DocumentSource(kStageName, pExpCtx), _includeIdleConnections(includeIdleConnections), _includeIdleSessions(includeIdleSessions), _includeOpsFromAllUsers(includeOpsFromAllUsers), @@ -149,6 +147,8 @@ private: _idleCursors(idleCursors), _backtrace(backtrace) {} + GetNextResult doGetNext() final; + ConnMode _includeIdleConnections = ConnMode::kExcludeIdle; SessionMode _includeIdleSessions = SessionMode::kIncludeIdle; UserMode _includeOpsFromAllUsers = UserMode::kExcludeOthers; diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 934f37d2d1d..fde3c279d33 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -53,12 +53,10 @@ using std::shared_ptr; using std::string; const char* DocumentSourceCursor::getSourceName() const { - return "$cursor"; + return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceCursor::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceCursor::doGetNext() { if (_currentBatch.empty()) { loadBatch(); } @@ -308,7 +306,7 @@ DocumentSourceCursor::DocumentSourceCursor( std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, const intrusive_ptr<ExpressionContext>& pCtx, bool trackOplogTimestamp) - : DocumentSource(pCtx), + : DocumentSource(kStageName, pCtx), _docsAddedToBatches(0), _exec(std::move(exec)), _trackOplogTS(trackOplogTimestamp) { diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 815b6635b14..fa28dd114a0 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -46,8 +46,8 @@ namespace mongo { */ class DocumentSourceCursor : public DocumentSource { public: + static constexpr StringData kStageName = "$cursor"_sd; // virtuals from DocumentSource - GetNextResult getNext() final; const char* getSourceName() const override; @@ -162,6 +162,8 @@ protected: const boost::intrusive_ptr<ExpressionContext>& pExpCtx, bool trackOplogTimestamp = false); + GetNextResult doGetNext() final; + ~DocumentSourceCursor(); /** diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp index b45938c45d0..2220a73b496 100644 --- a/src/mongo/db/pipeline/document_source_exchange.cpp +++ b/src/mongo/db/pipeline/document_source_exchange.cpp @@ -88,7 +88,7 @@ constexpr size_t Exchange::kMaxBufferSize; constexpr size_t Exchange::kMaxNumberConsumers; const char* DocumentSourceExchange::getSourceName() const { - return "$_internalExchange"; + return kStageName.rawData(); } Value DocumentSourceExchange::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { @@ -100,12 +100,12 @@ DocumentSourceExchange::DocumentSourceExchange( const boost::intrusive_ptr<Exchange> exchange, size_t consumerId, std::unique_ptr<ResourceYielder> yielder) - : DocumentSource(expCtx), + : DocumentSource(kStageName, expCtx), _exchange(exchange), _consumerId(consumerId), _resourceYielder(std::move(yielder)) {} -DocumentSource::GetNextResult DocumentSourceExchange::getNext() { +DocumentSource::GetNextResult DocumentSourceExchange::doGetNext() { return _exchange->getNext(pExpCtx->opCtx, _consumerId, _resourceYielder.get()); } diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index 68b3a037047..e30d66698d1 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -191,6 +191,8 @@ private: class DocumentSourceExchange final : public DocumentSource { public: + static constexpr StringData kStageName = "$_internalExchange"_sd; + /** * Create an Exchange consumer. 'resourceYielder' is so the exchange may temporarily yield * resources (such as the Session) while waiting for other threads to do @@ -202,8 +204,6 @@ public: size_t consumerId, std::unique_ptr<ResourceYielder> yielder); - GetNextResult getNext() final; - StageConstraints constraints(Pipeline::SplitState pipeState) const final { return {StreamType::kStreaming, PositionRequirement::kNone, @@ -230,8 +230,6 @@ public: invariant(!source); } - GetNextResult getNext(size_t consumerId); - size_t getConsumers() const { return _exchange->getConsumers(); } @@ -249,6 +247,8 @@ public: } private: + GetNextResult doGetNext() final; + boost::intrusive_ptr<Exchange> _exchange; const size_t _consumerId; diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp index ef4f626e7b6..c6a5c4945fd 100644 --- a/src/mongo/db/pipeline/document_source_exchange_test.cpp +++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp @@ -39,7 +39,9 @@ #include "mongo/platform/random.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/system_clock_source.h" #include "mongo/util/time_support.h" @@ -56,6 +58,42 @@ public: } }; +namespace { +/** + * This class is used for an Exchange consumer to temporarily relinquish control of a mutex + * while it's blocked. + */ +class MutexYielder : public ResourceYielder { +public: + MutexYielder(stdx::mutex* mutex) : _lock(*mutex, stdx::defer_lock) {} + + void yield(OperationContext* opCtx) override { + _lock.unlock(); + } + + void unyield(OperationContext* opCtx) override { + _lock.lock(); + } + + stdx::unique_lock<stdx::mutex>& getLock() { + return _lock; + } + +private: + stdx::unique_lock<stdx::mutex> _lock; +}; + +/** + * Used to keep track of each client and operation context. + */ +struct ThreadInfo { + ServiceContext::UniqueClient client; + ServiceContext::UniqueOperationContext opCtx; + boost::intrusive_ptr<DocumentSourceExchange> documentSourceExchange; + MutexYielder* yielder; +}; +} // namespace + class DocumentSourceExchangeTest : public AggregationContextFixture { protected: std::unique_ptr<executor::TaskExecutor> _executor; @@ -109,6 +147,23 @@ protected: IDLParserErrorContext ctx("internalExchange"); return ExchangeSpec::parse(ctx, spec); } + + auto createNProducers(size_t nConsumers, boost::intrusive_ptr<Exchange> ex) { + std::vector<ThreadInfo> threads; + for (size_t idx = 0; idx < nConsumers; ++idx) { + ServiceContext::UniqueClient client = + getServiceContext()->makeClient("exchange client"); + ServiceContext::UniqueOperationContext opCtxOwned = + getServiceContext()->makeOperationContext(client.get()); + OperationContext* opCtx = opCtxOwned.get(); + threads.emplace_back(ThreadInfo{ + std::move(client), + std::move(opCtxOwned), + new DocumentSourceExchange(new ExpressionContext(opCtx, nullptr), ex, idx, nullptr), + }); + } + return threads; + } }; TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) { @@ -150,29 +205,25 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) { boost::intrusive_ptr<Exchange> ex = new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; - - for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); - } - + std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; for (size_t id = 0; id < nConsumers; ++id) { - auto handle = _executor->scheduleWork( - [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) { - PseudoRandom prng(getNewSeed()); + DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get(); + auto handle = _executor->scheduleWork([docSourceExchange, id, nDocs, nConsumers]( + const executor::TaskExecutor::CallbackArgs& cb) { + PseudoRandom prng(getNewSeed()); - auto input = prods[id]->getNext(); + auto input = docSourceExchange->getNext(); - size_t docs = 0; + size_t docs = 0; - for (; input.isAdvanced(); input = prods[id]->getNext()) { - sleepmillis(prng.nextInt32() % 20 + 1); - ++docs; - } - ASSERT_EQ(docs, nDocs / nConsumers); - }); + for (; input.isAdvanced(); input = docSourceExchange->getNext()) { + sleepmillis(prng.nextInt32() % 20 + 1); + ++docs; + } + ASSERT_EQ(docs, nDocs / nConsumers); + }); handles.emplace_back(std::move(handle.getValue())); } @@ -197,38 +248,34 @@ TEST_F(DocumentSourceExchangeTest, ExchangeNConsumerEarlyout) { boost::intrusive_ptr<Exchange> ex = new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; - - for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); - } - + std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; for (size_t id = 0; id < nConsumers; ++id) { - auto handle = _executor->scheduleWork( - [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) { - PseudoRandom prng(getNewSeed()); + DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get(); + auto handle = _executor->scheduleWork([docSourceExchange, id, nDocs, nConsumers]( + const executor::TaskExecutor::CallbackArgs& cb) { + PseudoRandom prng(getNewSeed()); - auto input = prods[id]->getNext(); + auto input = docSourceExchange->getNext(); - size_t docs = 0; + size_t docs = 0; - for (; input.isAdvanced(); input = prods[id]->getNext()) { - sleepmillis(prng.nextInt32() % 20 + 1); - ++docs; + for (; input.isAdvanced(); input = docSourceExchange->getNext()) { + sleepmillis(prng.nextInt32() % 20 + 1); + ++docs; - // The consumer 1 bails out early wihout consuming all its documents. - if (id == 1 && docs == 100) { - // Pretend we have seen all docs. - docs = nDocs / nConsumers; + // The consumer 1 bails out early wihout consuming all its documents. + if (id == 1 && docs == 100) { + // Pretend we have seen all docs. + docs = nDocs / nConsumers; - prods[id]->dispose(); - break; - } + docSourceExchange->dispose(); + break; } - ASSERT_EQ(docs, nDocs / nConsumers); - }); + } + ASSERT_EQ(docs, nDocs / nConsumers); + }); handles.emplace_back(std::move(handle.getValue())); } @@ -251,20 +298,16 @@ TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) { boost::intrusive_ptr<Exchange> ex = new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; - - for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); - } - + std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; for (size_t id = 0; id < nConsumers; ++id) { + DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get(); auto handle = _executor->scheduleWork( - [prods, id, nDocs](const executor::TaskExecutor::CallbackArgs& cb) { + [docSourceExchange, id, nDocs](const executor::TaskExecutor::CallbackArgs& cb) { size_t docs = 0; - for (auto input = prods[id]->getNext(); input.isAdvanced(); - input = prods[id]->getNext()) { + for (auto input = docSourceExchange->getNext(); input.isAdvanced(); + input = docSourceExchange->getNext()) { ++docs; } ASSERT_EQ(docs, nDocs); @@ -302,30 +345,26 @@ TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) { boost::intrusive_ptr<Exchange> ex = new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; - - for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); - } - + std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; for (size_t id = 0; id < nConsumers; ++id) { - auto handle = _executor->scheduleWork( - [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) { - size_t docs = 0; - for (auto input = prods[id]->getNext(); input.isAdvanced(); - input = prods[id]->getNext()) { - size_t value = input.getDocument()["a"].getInt(); + DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get(); + auto handle = _executor->scheduleWork([docSourceExchange, id, nDocs, nConsumers]( + const executor::TaskExecutor::CallbackArgs& cb) { + size_t docs = 0; + for (auto input = docSourceExchange->getNext(); input.isAdvanced(); + input = docSourceExchange->getNext()) { + size_t value = input.getDocument()["a"].getInt(); - ASSERT(value >= id * 100); - ASSERT(value < (id + 1) * 100); + ASSERT(value >= id * 100); + ASSERT(value < (id + 1) * 100); - ++docs; - } + ++docs; + } - ASSERT_EQ(docs, nDocs / nConsumers); - }); + ASSERT_EQ(docs, nDocs / nConsumers); + }); handles.emplace_back(std::move(handle.getValue())); } @@ -368,30 +407,26 @@ TEST_F(DocumentSourceExchangeTest, RangeShardingExchangeNConsumer) { boost::intrusive_ptr<Exchange> ex = new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; - - for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); - } - + std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; for (size_t id = 0; id < nConsumers; ++id) { - auto handle = _executor->scheduleWork( - [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) { - size_t docs = 0; - for (auto input = prods[id]->getNext(); input.isAdvanced(); - input = prods[id]->getNext()) { - size_t value = input.getDocument()["a"].getInt(); + DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get(); + auto handle = _executor->scheduleWork([docSourceExchange, id, nDocs, nConsumers]( + const executor::TaskExecutor::CallbackArgs& cb) { + size_t docs = 0; + for (auto input = docSourceExchange->getNext(); input.isAdvanced(); + input = docSourceExchange->getNext()) { + size_t value = input.getDocument()["a"].getInt(); - ASSERT(value >= id * 100); - ASSERT(value < (id + 1) * 100); + ASSERT(value >= id * 100); + ASSERT(value < (id + 1) * 100); - ++docs; - } + ++docs; + } - ASSERT_EQ(docs, nDocs / nConsumers); - }); + ASSERT_EQ(docs, nDocs / nConsumers); + }); handles.emplace_back(std::move(handle.getValue())); } @@ -425,39 +460,35 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) { boost::intrusive_ptr<Exchange> ex = new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; - - for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); - } - + std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; AtomicWord<size_t> processedDocs{0}; for (size_t id = 0; id < nConsumers; ++id) { - auto handle = _executor->scheduleWork( - [prods, id, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) { - PseudoRandom prng(getNewSeed()); + DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get(); + auto handle = _executor->scheduleWork([docSourceExchange, id, &processedDocs]( + const executor::TaskExecutor::CallbackArgs& cb) { + PseudoRandom prng(getNewSeed()); - auto input = prods[id]->getNext(); + auto input = docSourceExchange->getNext(); - size_t docs = 0; - for (; input.isAdvanced(); input = prods[id]->getNext()) { - size_t value = input.getDocument()["a"].getInt(); + size_t docs = 0; + for (; input.isAdvanced(); input = docSourceExchange->getNext()) { + size_t value = input.getDocument()["a"].getInt(); - ASSERT(value >= id * 100); - ASSERT(value < (id + 1) * 100); + ASSERT(value >= id * 100); + ASSERT(value < (id + 1) * 100); - ++docs; + ++docs; - // This helps randomizing thread scheduling forcing different threads to load - // buffers. The sleep API is inherently imprecise so we cannot guarantee 100% - // reproducibility. - sleepmillis(prng.nextInt32() % 50 + 1); - } - processedDocs.fetchAndAdd(docs); - }); + // This helps randomizing thread scheduling forcing different threads to load + // buffers. The sleep API is inherently imprecise so we cannot guarantee 100% + // reproducibility. + sleepmillis(prng.nextInt32() % 50 + 1); + } + processedDocs.fetchAndAdd(docs); + }); handles.emplace_back(std::move(handle.getValue())); } @@ -497,39 +528,6 @@ TEST_F(DocumentSourceExchangeTest, RandomExchangeNConsumerResourceYielding) { boost::intrusive_ptr<Exchange> ex = new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - /** - * This class is used for an Exchange consumer to temporarily relinquish control of a mutex - * while it's blocked. - */ - class MutexYielder : public ResourceYielder { - public: - MutexYielder(stdx::mutex* mutex) : _lock(*mutex, stdx::defer_lock) {} - - void yield(OperationContext* opCtx) override { - _lock.unlock(); - } - - void unyield(OperationContext* opCtx) override { - _lock.lock(); - } - - stdx::unique_lock<stdx::mutex>& getLock() { - return _lock; - } - - private: - stdx::unique_lock<stdx::mutex> _lock; - }; - - /** - * Used to keep track of each client and operation context. - */ - struct ThreadInfo { - ServiceContext::UniqueClient client; - ServiceContext::UniqueOperationContext opCtx; - boost::intrusive_ptr<DocumentSourceExchange> documentSourceExchange; - MutexYielder* yielder; - }; std::vector<ThreadInfo> threads; for (size_t idx = 0; idx < nConsumers; ++idx) { @@ -554,27 +552,27 @@ TEST_F(DocumentSourceExchangeTest, RandomExchangeNConsumerResourceYielding) { for (size_t id = 0; id < nConsumers; ++id) { ThreadInfo* threadInfo = &threads[id]; - auto handle = _executor->scheduleWork( - [threadInfo, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) { - DocumentSourceExchange* exchange = threadInfo->documentSourceExchange.get(); - const auto getNext = [exchange, threadInfo]() { - // Will acquire 'artificalGlobalMutex'. Within getNext() it will be released and - // reacquired by the MutexYielder if the Exchange has to block. - threadInfo->yielder->getLock().lock(); - auto res = exchange->getNext(); - threadInfo->yielder->getLock().unlock(); - return res; - }; - - for (auto input = getNext(); input.isAdvanced(); input = getNext()) { - // This helps randomizing thread scheduling forcing different threads to load - // buffers. The sleep API is inherently imprecise so we cannot guarantee 100% - // reproducibility. - PseudoRandom prng(getNewSeed()); - sleepmillis(prng.nextInt32() % 50 + 1); - processedDocs.fetchAndAdd(1); - } - }); + auto handle = _executor->scheduleWork([threadInfo, &processedDocs]( + const executor::TaskExecutor::CallbackArgs& cb) { + DocumentSourceExchange* docSourceExchange = threadInfo->documentSourceExchange.get(); + const auto getNext = [docSourceExchange, threadInfo]() { + // Will acquire 'artificalGlobalMutex'. Within getNext() it will be released and + // reacquired by the MutexYielder if the Exchange has to block. + threadInfo->yielder->getLock().lock(); + auto res = docSourceExchange->getNext(); + threadInfo->yielder->getLock().unlock(); + return res; + }; + + for (auto input = getNext(); input.isAdvanced(); input = getNext()) { + // This helps randomizing thread scheduling forcing different threads to load + // buffers. The sleep API is inherently imprecise so we cannot guarantee 100% + // reproducibility. + PseudoRandom prng(getNewSeed()); + sleepmillis(prng.nextInt32() % 50 + 1); + processedDocs.fetchAndAdd(1); + } + }); handles.emplace_back(std::move(handle.getValue())); } @@ -610,34 +608,29 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomHashExchangeNConsumer) { boost::intrusive_ptr<Exchange> ex = new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); - std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods; - - for (size_t idx = 0; idx < nConsumers; ++idx) { - prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr)); - } - + std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; - AtomicWord<size_t> processedDocs{0}; for (size_t id = 0; id < nConsumers; ++id) { - auto handle = _executor->scheduleWork( - [prods, id, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) { - PseudoRandom prng(getNewSeed()); - - auto input = prods[id]->getNext(); - - size_t docs = 0; - for (; input.isAdvanced(); input = prods[id]->getNext()) { - ++docs; - - // This helps randomizing thread scheduling forcing different threads to load - // buffers. The sleep API is inherently imprecise so we cannot guarantee 100% - // reproducibility. - sleepmillis(prng.nextInt32() % 50 + 1); - } - processedDocs.fetchAndAdd(docs); - }); + auto docSourceExchange = threads[id].documentSourceExchange.get(); + auto handle = _executor->scheduleWork([docSourceExchange, id, &processedDocs]( + const executor::TaskExecutor::CallbackArgs& cb) { + PseudoRandom prng(getNewSeed()); + + auto input = docSourceExchange->getNext(); + + size_t docs = 0; + for (; input.isAdvanced(); input = docSourceExchange->getNext()) { + ++docs; + + // This helps randomizing thread scheduling forcing different threads to load + // buffers. The sleep API is inherently imprecise so we cannot guarantee 100% + // reproducibility. + sleepmillis(prng.nextInt32() % 50 + 1); + } + processedDocs.fetchAndAdd(docs); + }); handles.emplace_back(std::move(handle.getValue())); } diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 16d14e1edf4..ff10abe4284 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -57,7 +57,7 @@ using std::vector; DocumentSourceFacet::DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines, const intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx), + : DocumentSource(kStageName, expCtx), _teeBuffer(TeeBuffer::create(facetPipelines.size())), _facets(std::move(facetPipelines)) { for (size_t facetId = 0; facetId < _facets.size(); ++facetId) { @@ -179,9 +179,7 @@ void DocumentSourceFacet::doDispose() { } } -DocumentSource::GetNextResult DocumentSourceFacet::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceFacet::doGetNext() { if (_done) { return GetNextResult::makeEOF(); } diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 1a2c54a8649..4a6e0aecc5a 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -58,6 +58,7 @@ class NamespaceString; */ class DocumentSourceFacet final : public DocumentSource { public: + static constexpr StringData kStageName = "$facet"_sd; struct FacetPipeline { FacetPipeline(std::string name, std::unique_ptr<Pipeline, PipelineDeleter> pipeline) : name(std::move(name)), pipeline(std::move(pipeline)) {} @@ -105,11 +106,6 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx); /** - * Blocking call. Will consume all input and produces one output document. - */ - GetNextResult getNext() final; - - /** * Optimizes inner pipelines. */ boost::intrusive_ptr<DocumentSource> optimize() final; @@ -120,7 +116,7 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; const char* getSourceName() const final { - return "$facet"; + return DocumentSourceFacet::kStageName.rawData(); } /** @@ -155,6 +151,10 @@ public: bool usedDisk() final; protected: + /** + * Blocking call. Will consume all input and produces one output document. + */ + GetNextResult doGetNext() final; void doDispose() final; private: diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 5e8668b7de9..1ed94365190 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -244,7 +244,7 @@ public: LookupRequirement::kAllowed}; } - DocumentSource::GetNextResult getNext() final { + DocumentSource::GetNextResult doGetNext() final { return pSource->getNext(); } diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index 6df14d1e05a..8e1067751b6 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -43,7 +43,6 @@ namespace mongo { using boost::intrusive_ptr; constexpr StringData DocumentSourceGeoNear::kKeyFieldName; -constexpr const char* DocumentSourceGeoNear::kStageName; REGISTER_DOCUMENT_SOURCE(geoNear, LiteParsedDocumentSourceDefault::parse, @@ -234,7 +233,7 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con } DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {} + : DocumentSource(kStageName, pExpCtx), coordsIsArray(false), spherical(false) {} boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGeoNear::distributedPlanLogic() { diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index cbb490dd807..ae72b3fc027 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -37,7 +37,7 @@ namespace mongo { class DocumentSourceGeoNear : public DocumentSource { public: static constexpr StringData kKeyFieldName = "key"_sd; - static constexpr auto kStageName = "$geoNear"; + static constexpr StringData kStageName = "$geoNear"_sd; /** * Only exposed for testing. @@ -46,7 +46,7 @@ public: const boost::intrusive_ptr<ExpressionContext>&); const char* getSourceName() const final { - return kStageName; + return DocumentSourceGeoNear::kStageName.rawData(); } StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -63,9 +63,8 @@ public: * DocumentSourceGeoNear should always be replaced by a DocumentSourceGeoNearCursor before * executing a pipeline, so this method should never be called. */ - GetNextResult getNext() final { - // TODO: Replace with a MONGO_UNREACHABLE as part of SERVER-38995. - uasserted(51048, "DocumentSourceGeoNear's getNext should never be called"); + GetNextResult doGetNext() final { + MONGO_UNREACHABLE; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; @@ -133,7 +132,6 @@ public: */ boost::optional<DistributedPlanLogic> distributedPlanLogic() final; - private: explicit DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_geo_near_cursor.cpp b/src/mongo/db/pipeline/document_source_geo_near_cursor.cpp index 6a369e84a69..24253846166 100644 --- a/src/mongo/db/pipeline/document_source_geo_near_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near_cursor.cpp @@ -49,7 +49,6 @@ #include "mongo/db/query/plan_executor.h" namespace mongo { -constexpr const char* DocumentSourceGeoNearCursor::kStageName; boost::intrusive_ptr<DocumentSourceGeoNearCursor> DocumentSourceGeoNearCursor::create( Collection* collection, @@ -81,7 +80,7 @@ DocumentSourceGeoNearCursor::DocumentSourceGeoNearCursor( } const char* DocumentSourceGeoNearCursor::getSourceName() const { - return kStageName; + return DocumentSourceGeoNearCursor::kStageName.rawData(); } Document DocumentSourceGeoNearCursor::transformBSONObjToDocument(const BSONObj& obj) const { diff --git a/src/mongo/db/pipeline/document_source_geo_near_cursor.h b/src/mongo/db/pipeline/document_source_geo_near_cursor.h index c23e01dc146..52e8d1cc554 100644 --- a/src/mongo/db/pipeline/document_source_geo_near_cursor.h +++ b/src/mongo/db/pipeline/document_source_geo_near_cursor.h @@ -53,7 +53,7 @@ public: /** * The name of this stage. */ - static constexpr auto kStageName = "$geoNearCursor"; + static constexpr StringData kStageName = "$geoNearCursor"_sd; /** * Create a new DocumentSourceGeoNearCursor. If specified, 'distanceMultiplier' must be diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index c96ac4b4114..ea781c2b1b9 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -84,12 +84,10 @@ REGISTER_DOCUMENT_SOURCE(graphLookup, DocumentSourceGraphLookUp::createFromBson); const char* DocumentSourceGraphLookUp::getSourceName() const { - return "$graphLookup"; + return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceGraphLookUp::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceGraphLookUp::doGetNext() { if (_unwind) { return getNextUnwound(); } @@ -445,7 +443,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( boost::optional<FieldPath> depthField, boost::optional<long long> maxDepth, boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc) - : DocumentSource(expCtx), + : DocumentSource(kStageName, expCtx), _from(std::move(from)), _as(std::move(as)), _connectFromField(std::move(connectFromField)), diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index b36f79677c1..c9eccb39de9 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -39,6 +39,8 @@ namespace mongo { class DocumentSourceGraphLookUp final : public DocumentSource { public: + static constexpr StringData kStageName = "$graphLookup"_sd; + class LiteParsed : public LiteParsedDocumentSourceForeignCollections { public: LiteParsed(NamespaceString foreignNss, PrivilegeVector privileges) @@ -52,8 +54,6 @@ public: static std::unique_ptr<LiteParsed> liteParse(const AggregationRequest& request, const BSONElement& spec); - GetNextResult getNext() final; - const char* getSourceName() const final; const FieldPath& getConnectFromField() const { @@ -130,6 +130,7 @@ public: BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); protected: + GetNextResult doGetNext() final; void doDispose() final; /** diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 6cfb08dfaf8..6ec80d8bbe5 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -132,9 +132,7 @@ const char* DocumentSourceGroup::getSourceName() const { return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceGroup::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() { if (!_initialized) { const auto initializationResult = initialize(); if (initializationResult.isPaused()) { @@ -328,7 +326,7 @@ intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create( DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& pExpCtx, boost::optional<size_t> maxMemoryUsageBytes) - : DocumentSource(pExpCtx), + : DocumentSource(kStageName, pExpCtx), _usedDisk(false), _doingMerge(false), _maxMemoryUsageBytes(maxMemoryUsageBytes ? *maxMemoryUsageBytes diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 31e29ea2954..fc4dc8c7515 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -96,7 +96,6 @@ public: boost::intrusive_ptr<DocumentSource> optimize() final; DepsTracker::State getDependencies(DepsTracker* deps) const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - GetNextResult getNext() final; const char* getSourceName() const final; GetModPathsReturn getModifiedPaths() const final; StringMap<boost::intrusive_ptr<Expression>> getIdFields() const; @@ -131,7 +130,6 @@ public: return constraints; } - /** * Add an accumulator, which will become a field in each Document that results from grouping. */ @@ -179,6 +177,7 @@ public: const; protected: + GetNextResult doGetNext() final; void doDispose() final; private: diff --git a/src/mongo/db/pipeline/document_source_index_stats.cpp b/src/mongo/db/pipeline/document_source_index_stats.cpp index c24671624f6..7d02339794d 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.cpp +++ b/src/mongo/db/pipeline/document_source_index_stats.cpp @@ -44,12 +44,10 @@ REGISTER_DOCUMENT_SOURCE(indexStats, DocumentSourceIndexStats::createFromBson); const char* DocumentSourceIndexStats::getSourceName() const { - return "$indexStats"; + return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceIndexStats::doGetNext() { if (_indexStatsMap.empty()) { _indexStatsMap = pExpCtx->mongoProcessInterface->getIndexStats(pExpCtx->opCtx, pExpCtx->ns); _indexStatsIter = _indexStatsMap.begin(); @@ -71,7 +69,7 @@ DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() { } DocumentSourceIndexStats::DocumentSourceIndexStats(const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx), _processName(getHostNameCachedAndPort()) {} + : DocumentSource(kStageName, pExpCtx), _processName(getHostNameCachedAndPort()) {} intrusive_ptr<DocumentSource> DocumentSourceIndexStats::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 2493970e333..9198f817b33 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -40,6 +40,8 @@ namespace mongo { */ class DocumentSourceIndexStats final : public DocumentSource { public: + static constexpr StringData kStageName = "$indexStats"_sd; + class LiteParsed final : public LiteParsedDocumentSource { public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, @@ -66,7 +68,6 @@ public: }; // virtuals from DocumentSource - GetNextResult getNext() final; const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; @@ -92,6 +93,7 @@ public: private: DocumentSourceIndexStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + GetNextResult doGetNext() final; CollectionIndexUsageMap _indexStatsMap; CollectionIndexUsageMap::const_iterator _indexStatsIter; diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp index 13a0c173424..f5729e4570a 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp @@ -55,8 +55,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalInhibitOptimization:: return new DocumentSourceInternalInhibitOptimization(expCtx); } -DocumentSource::GetNextResult DocumentSourceInternalInhibitOptimization::getNext() { - pExpCtx->checkForInterrupt(); +DocumentSource::GetNextResult DocumentSourceInternalInhibitOptimization::doGetNext() { return pSource->getNext(); } diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index 75f3e637a7d..2c13583a78a 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -47,7 +47,7 @@ public: BSONElement, const boost::intrusive_ptr<ExpressionContext>&); DocumentSourceInternalInhibitOptimization(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx) {} + : DocumentSource(kStageName, expCtx) {} const char* getSourceName() const final { return kStageName.rawData(); @@ -67,9 +67,8 @@ public: return boost::none; } - GetNextResult getNext() final; - private: + GetNextResult doGetNext() final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; }; diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp b/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp index a548cb6366c..7beb9441feb 100644 --- a/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp +++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp @@ -46,11 +46,9 @@ namespace mongo { DocumentSourceInternalShardFilter::DocumentSourceInternalShardFilter( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, std::unique_ptr<ShardFilterer> shardFilterer) - : DocumentSource(pExpCtx), _shardFilterer(std::move(shardFilterer)) {} - -DocumentSource::GetNextResult DocumentSourceInternalShardFilter::getNext() { - pExpCtx->checkForInterrupt(); + : DocumentSource(kStageName, pExpCtx), _shardFilterer(std::move(shardFilterer)) {} +DocumentSource::GetNextResult DocumentSourceInternalShardFilter::doGetNext() { auto next = pSource->getNext(); invariant(_shardFilterer); for (; next.isAdvanced(); next = pSource->getNext()) { diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.h b/src/mongo/db/pipeline/document_source_internal_shard_filter.h index 8fbfe6b07f1..2a13791b793 100644 --- a/src/mongo/db/pipeline/document_source_internal_shard_filter.h +++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.h @@ -63,8 +63,6 @@ public: ChangeStreamRequirement::kBlacklist); } - GetNextResult getNext() override; - Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; @@ -76,6 +74,8 @@ public: Pipeline::SourceContainer* container) override; private: + GetNextResult doGetNext() override; + std::unique_ptr<ShardFilterer> _shardFilterer; }; diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp index 0eb5a85f0d0..4857fac3d58 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp @@ -81,8 +81,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create return new DocumentSourceInternalSplitPipeline(expCtx, mergeType); } -DocumentSource::GetNextResult DocumentSourceInternalSplitPipeline::getNext() { - pExpCtx->checkForInterrupt(); +DocumentSource::GetNextResult DocumentSourceInternalSplitPipeline::doGetNext() { return pSource->getNext(); } diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index d2d4b14e685..5e1580ce4e0 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -74,12 +74,12 @@ public: : LookupRequirement::kAllowed}; } - GetNextResult getNext() final; - private: DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, HostTypeRequirement mergeType) - : DocumentSource(expCtx), _mergeType(mergeType) {} + : DocumentSource(kStageName, expCtx), _mergeType(mergeType) {} + + GetNextResult doGetNext() final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; HostTypeRequirement _mergeType = HostTypeRequirement::kNone; diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp index f8b2ca3c0aa..f5930738a41 100644 --- a/src/mongo/db/pipeline/document_source_limit.cpp +++ b/src/mongo/db/pipeline/document_source_limit.cpp @@ -44,7 +44,7 @@ using boost::intrusive_ptr; DocumentSourceLimit::DocumentSourceLimit(const intrusive_ptr<ExpressionContext>& pExpCtx, long long limit) - : DocumentSource(pExpCtx), _limit(limit) {} + : DocumentSource(kStageName, pExpCtx), _limit(limit) {} REGISTER_DOCUMENT_SOURCE(limit, LiteParsedDocumentSourceDefault::parse, @@ -70,9 +70,7 @@ Pipeline::SourceContainer::iterator DocumentSourceLimit::doOptimizeAt( return std::next(itr); } -DocumentSource::GetNextResult DocumentSourceLimit::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceLimit::doGetNext() { if (_nReturned >= _limit) { return GetNextResult::makeEOF(); } diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 3cecb165cf0..8bdd83686fe 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -59,7 +59,6 @@ public: LookupRequirement::kAllowed}; } - GetNextResult getNext() final; const char* getSourceName() const final { return kStageName.rawData(); } @@ -96,6 +95,7 @@ public: private: DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit); + GetNextResult doGetNext() final; long long _limit; long long _nReturned = 0; diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp index 9195d0aa0aa..535d4ebb1e2 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp @@ -42,11 +42,7 @@ REGISTER_TEST_DOCUMENT_SOURCE(listCachedAndActiveUsers, DocumentSourceListCachedAndActiveUsers::LiteParsed::parse, DocumentSourceListCachedAndActiveUsers::createFromBson); -const char* DocumentSourceListCachedAndActiveUsers::kStageName = "$listCachedAndActiveUsers"; - -DocumentSource::GetNextResult DocumentSourceListCachedAndActiveUsers::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceListCachedAndActiveUsers::doGetNext() { if (!_users.empty()) { const auto info = std::move(_users.back()); _users.pop_back(); @@ -75,7 +71,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceListCachedAndActiveUsers::cre DocumentSourceListCachedAndActiveUsers::DocumentSourceListCachedAndActiveUsers( const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx), _users() { + : DocumentSource(kStageName, pExpCtx), _users() { auto authMgr = AuthorizationManager::get(pExpCtx->opCtx->getServiceContext()); _users = authMgr->getUserCacheInfo(); } diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h index 51dea36c162..3f297e25a49 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h @@ -43,7 +43,7 @@ namespace mongo { */ class DocumentSourceListCachedAndActiveUsers final : public DocumentSource { public: - static const char* kStageName; + static constexpr StringData kStageName = "$listCachedAndActiveUsers"_sd; class LiteParsed final : public LiteParsedDocumentSource { public: @@ -78,10 +78,8 @@ public: } }; - GetNextResult getNext() final; - const char* getSourceName() const final { - return kStageName; + return kStageName.rawData(); } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { @@ -112,6 +110,8 @@ public: private: DocumentSourceListCachedAndActiveUsers(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + GetNextResult doGetNext() final; + std::vector<AuthorizationManager::CachedUserInfo> _users; }; diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.cpp b/src/mongo/db/pipeline/document_source_list_local_sessions.cpp index 53beab31a77..b388ee0e688 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.cpp +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.cpp @@ -41,11 +41,7 @@ REGISTER_DOCUMENT_SOURCE(listLocalSessions, DocumentSourceListLocalSessions::LiteParsed::parse, DocumentSourceListLocalSessions::createFromBson); -const char* DocumentSourceListLocalSessions::kStageName = "$listLocalSessions"; - -DocumentSource::GetNextResult DocumentSourceListLocalSessions::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceListLocalSessions::doGetNext() { while (!_ids.empty()) { const auto& id = _ids.back(); _ids.pop_back(); @@ -75,7 +71,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceListLocalSessions::createFrom DocumentSourceListLocalSessions::DocumentSourceListLocalSessions( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const ListSessionsSpec& spec) - : DocumentSource(pExpCtx), _spec(spec) { + : DocumentSource(kStageName, pExpCtx), _spec(spec) { const auto& opCtx = pExpCtx->opCtx; _cache = LogicalSessionCache::get(opCtx); if (_spec.getAllUsers()) { diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 74403ac9d45..0cc7ba1a929 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -52,14 +52,15 @@ std::vector<SHA256Block> listSessionsUsersToDigests(const std::vector<ListSessio */ class DocumentSourceListLocalSessions final : public DocumentSource { public: - static const char* kStageName; + static constexpr StringData kStageName = "$listLocalSessions"_sd; class LiteParsed final : public LiteParsedDocumentSource { public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return std::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec)); + return std::make_unique<LiteParsed>( + listSessionsParseSpec(DocumentSourceListLocalSessions::kStageName, spec)); } explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {} @@ -82,7 +83,9 @@ public: void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const { uassert(ErrorCodes::InvalidOptions, - str::stream() << "Aggregation stage " << kStageName << " cannot run with a " + str::stream() << "Aggregation stage " + << DocumentSourceListLocalSessions::kStageName + << " cannot run with a " << "readConcern other than 'local', or in a multi-document " << "transaction. Current readConcern: " << readConcern.toString(), readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern); @@ -92,10 +95,8 @@ public: const ListSessionsSpec _spec; }; - GetNextResult getNext() final; - const char* getSourceName() const final { - return kStageName; + return DocumentSourceListLocalSessions::kStageName.rawData(); } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { @@ -127,6 +128,8 @@ private: DocumentSourceListLocalSessions(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const ListSessionsSpec& spec); + GetNextResult doGetNext() final; + const ListSessionsSpec _spec; const LogicalSessionCache* _cache; std::vector<LogicalSessionId> _ids; diff --git a/src/mongo/db/pipeline/document_source_list_sessions.cpp b/src/mongo/db/pipeline/document_source_list_sessions.cpp index d6351375bd9..5eba38b1510 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.cpp +++ b/src/mongo/db/pipeline/document_source_list_sessions.cpp @@ -38,8 +38,6 @@ namespace mongo { -const char* DocumentSourceListSessions::kStageName = "$listSessions"; - REGISTER_DOCUMENT_SOURCE(listSessions, DocumentSourceListSessions::LiteParsed::parse, DocumentSourceListSessions::createFromBson); @@ -75,7 +73,6 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceListSessions::createFromBson( return new DocumentSourceListSessions(query, pExpCtx, spec.getAllUsers(), spec.getUsers()); } - Value DocumentSourceListSessions::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { ListSessionsSpec spec; diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index e8068747799..75e5bfd7fee 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -47,13 +47,14 @@ namespace mongo { */ class DocumentSourceListSessions final : public DocumentSourceMatch { public: - static const char* kStageName; + static constexpr StringData kStageName = "$listSessions"_sd; class LiteParsed final : public LiteParsedDocumentSource { public: static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec) { - return std::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec)); + return std::make_unique<LiteParsed>( + listSessionsParseSpec(DocumentSourceListSessions::kStageName, spec)); } explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {} @@ -79,7 +80,7 @@ public: }; const char* getSourceName() const final { - return kStageName; + return DocumentSourceListSessions::kStageName.rawData(); } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index f14b3db0394..0c42c595cfe 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -53,7 +53,7 @@ constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth; DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx), + : DocumentSource(kStageName, expCtx), _fromNs(std::move(fromNs)), _as(std::move(as)), _variables(expCtx->variables), @@ -161,7 +161,7 @@ REGISTER_DOCUMENT_SOURCE(lookup, DocumentSourceLookUp::createFromBson); const char* DocumentSourceLookUp::getSourceName() const { - return "$lookup"; + return kStageName.rawData(); } StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { @@ -227,9 +227,7 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& valu } // namespace -DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() { if (_unwindSrc) { return unwindResult(); } diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index d885dc5bcff..b618ec6230f 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -49,6 +49,7 @@ namespace mongo { class DocumentSourceLookUp final : public DocumentSource { public: static constexpr size_t kMaxSubPipelineDepth = 20; + static constexpr StringData kStageName = "$lookup"_sd; struct LetVariable { LetVariable(std::string name, boost::intrusive_ptr<Expression> expression, Variables::Id id) @@ -112,7 +113,6 @@ public: const boost::optional<LiteParsedPipeline> _liteParsedPipeline; }; - GetNextResult getNext() final; const char* getSourceName() const final; void serializeToArray( std::vector<Value>& array, @@ -220,6 +220,7 @@ public: } protected: + GetNextResult doGetNext() final; void doDispose() final; /** diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index cb24b7b9ae8..1019454c133 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -51,9 +51,7 @@ Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType } } // namespace -DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::doGetNext() { auto input = pSource->getNext(); if (!input.isAdvanced()) { return input; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 83bad0c7ee5..40d50586025 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -93,11 +93,6 @@ public: return DepsTracker::State::SEE_NEXT; } - /** - * Performs the lookup to retrieve the full document. - */ - GetNextResult getNext() final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { if (explain) { return Value{Document{{kStageName, Document()}}}; @@ -111,7 +106,12 @@ public: private: DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx) {} + : DocumentSource(kStageName, expCtx) {} + + /** + * Performs the lookup to retrieve the full document. + */ + GetNextResult doGetNext() final; /** * Uses the "documentKey" field from 'updateOp' to look up the current version of the document. diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index 6ed37705edd..066330fa525 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -58,7 +58,7 @@ REGISTER_DOCUMENT_SOURCE(match, DocumentSourceMatch::createFromBson); const char* DocumentSourceMatch::getSourceName() const { - return "$match"; + return kStageName.rawData(); } Value DocumentSourceMatch::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { @@ -80,9 +80,7 @@ intrusive_ptr<DocumentSource> DocumentSourceMatch::optimize() { return this; } -DocumentSource::GetNextResult DocumentSourceMatch::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceMatch::doGetNext() { // The user facing error should have been generated earlier. massert(17309, "Should never call getNext on a $match stage with $text clause", !_isTextQuery); @@ -497,7 +495,7 @@ DepsTracker::State DocumentSourceMatch::getDependencies(DepsTracker* deps) const DocumentSourceMatch::DocumentSourceMatch(const BSONObj& query, const intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx) { + : DocumentSource(kStageName, expCtx) { rebuild(query); } diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 4645a2e1937..c35cdfcaadf 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -40,6 +40,7 @@ namespace mongo { class DocumentSourceMatch : public DocumentSource { public: + static constexpr StringData kStageName = "$match"_sd; /** * Convenience method for creating a $match stage. */ @@ -77,8 +78,6 @@ public: */ void rebuild(BSONObj filter); - GetNextResult getNext() override; - boost::intrusive_ptr<DocumentSource> optimize() final; const char* getSourceName() const override; @@ -170,6 +169,7 @@ public: } protected: + GetNextResult doGetNext() override; DocumentSourceMatch(const BSONObj& query, const boost::intrusive_ptr<ExpressionContext>& expCtx); diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp index c4f7e864231..99c80f74ec9 100644 --- a/src/mongo/db/pipeline/document_source_merge.cpp +++ b/src/mongo/db/pipeline/document_source_merge.cpp @@ -322,7 +322,7 @@ DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs, boost::optional<std::vector<BSONObj>> pipeline, std::set<FieldPath> mergeOnFields, boost::optional<ChunkVersion> targetCollectionVersion) - : DocumentSourceWriter(std::move(outputNs), expCtx), + : DocumentSourceWriter(kStageName.rawData(), std::move(outputNs), expCtx), _targetCollectionVersion(targetCollectionVersion), _descriptor(descriptor), _pipeline(std::move(pipeline)), diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 738fc802cc4..ca882575585 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -58,11 +58,6 @@ public: using DocumentSourceQueue::DocumentSourceQueue; DocumentSourceMock(std::deque<GetNextResult>); - GetNextResult getNext() override { - invariant(!isDisposed); - invariant(!isDetachedFromOpCtx); - return DocumentSourceQueue::getNext(); - } Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override { // Unlike the queue, it's okay to serialize this stage for testing purposes. @@ -100,6 +95,12 @@ public: bool isOptimized = false; protected: + GetNextResult doGetNext() override { + invariant(!isDisposed); + invariant(!isDetachedFromOpCtx); + return DocumentSourceQueue::doGetNext(); + } + void doDispose() override { isDisposed = true; } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index b474083a5da..28241d82893 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -48,7 +48,6 @@ public: using LiteParsedDocumentSourceForeignCollections:: LiteParsedDocumentSourceForeignCollections; - static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, const BSONElement& spec); @@ -95,7 +94,7 @@ public: private: DocumentSourceOut(NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceWriter(std::move(outputNs), expCtx) {} + : DocumentSourceWriter(kStageName.rawData(), std::move(outputNs), expCtx) {} void initialize() override; diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.cpp b/src/mongo/db/pipeline/document_source_plan_cache_stats.cpp index dfa460c3f9f..3140a90f5df 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.cpp +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.cpp @@ -33,8 +33,6 @@ namespace mongo { -const char* DocumentSourcePlanCacheStats::kStageName = "$planCacheStats"; - REGISTER_DOCUMENT_SOURCE(planCacheStats, DocumentSourcePlanCacheStats::LiteParsed::parse, DocumentSourcePlanCacheStats::createFromBson); @@ -60,7 +58,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourcePlanCacheStats::createFromBso DocumentSourcePlanCacheStats::DocumentSourcePlanCacheStats( const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx) {} + : DocumentSource(kStageName, expCtx) {} void DocumentSourcePlanCacheStats::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { @@ -93,7 +91,7 @@ Pipeline::SourceContainer::iterator DocumentSourcePlanCacheStats::doOptimizeAt( return container->erase(itrToNext); } -DocumentSource::GetNextResult DocumentSourcePlanCacheStats::getNext() { +DocumentSource::GetNextResult DocumentSourcePlanCacheStats::doGetNext() { if (!_haveRetrievedStats) { const auto matchExpr = _absorbedMatch ? _absorbedMatch->getMatchExpression() : nullptr; _results = pExpCtx->mongoProcessInterface->getMatchingPlanCacheEntryStats( diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h index 9e735f835af..f0cf248ae5b 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h @@ -36,7 +36,7 @@ namespace mongo { class DocumentSourcePlanCacheStats final : public DocumentSource { public: - static const char* kStageName; + static constexpr StringData kStageName = "$planCacheStats"_sd; class LiteParsed final : public LiteParsedDocumentSource { public: @@ -67,7 +67,8 @@ public: void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const { uassert(ErrorCodes::InvalidOptions, - str::stream() << "Aggregation stage " << kStageName + str::stream() << "Aggregation stage " + << DocumentSourcePlanCacheStats::kStageName << " requires read concern local but found " << readConcern.toString(), readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern); @@ -82,8 +83,6 @@ public: virtual ~DocumentSourcePlanCacheStats() = default; - GetNextResult getNext() override; - StageConstraints constraints( Pipeline::SplitState = Pipeline::SplitState::kUnsplit) const override { StageConstraints constraints{StreamType::kStreaming, @@ -105,7 +104,7 @@ public: } const char* getSourceName() const override { - return kStageName; + return DocumentSourcePlanCacheStats::kStageName.rawData(); } /** @@ -122,6 +121,8 @@ public: private: DocumentSourcePlanCacheStats(const boost::intrusive_ptr<ExpressionContext>& expCtx); + GetNextResult doGetNext() final; + Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override { MONGO_UNREACHABLE; // Should call serializeToArray instead. diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp index ad6b4a4b5e7..f37b1a1ced5 100644 --- a/src/mongo/db/pipeline/document_source_project.cpp +++ b/src/mongo/db/pipeline/document_source_project.cpp @@ -81,7 +81,7 @@ intrusive_ptr<DocumentSource> DocumentSourceProject::create( throw; } }(), - DocumentSourceProject::kStageName.rawData(), + kStageName, isIndependentOfAnyCollection)); return project; } diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp index 47a77709363..12b8157117c 100644 --- a/src/mongo/db/pipeline/document_source_queue.cpp +++ b/src/mongo/db/pipeline/document_source_queue.cpp @@ -40,13 +40,13 @@ boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create( DocumentSourceQueue::DocumentSourceQueue(std::deque<GetNextResult> results, const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx), _queue(std::move(results)) {} + : DocumentSource(kStageName, expCtx), _queue(std::move(results)) {} const char* DocumentSourceQueue::getSourceName() const { return kStageName.rawData(); } -DocumentSource::GetNextResult DocumentSourceQueue::getNext() { +DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() { if (_queue.empty()) { return GetNextResult::makeEOF(); } diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h index 4f5a61f5fb8..e6a1fe5e207 100644 --- a/src/mongo/db/pipeline/document_source_queue.h +++ b/src/mongo/db/pipeline/document_source_queue.h @@ -52,7 +52,6 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx); virtual ~DocumentSourceQueue() {} - GetNextResult getNext() override; const char* getSourceName() const override; Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override { @@ -100,6 +99,7 @@ public: } protected: + GetNextResult doGetNext() override; // Return documents from front of queue. std::deque<GetNextResult> _queue; }; diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp index 7afc1eea75a..749041c231a 100644 --- a/src/mongo/db/pipeline/document_source_redact.cpp +++ b/src/mongo/db/pipeline/document_source_redact.cpp @@ -47,23 +47,21 @@ using std::vector; DocumentSourceRedact::DocumentSourceRedact(const intrusive_ptr<ExpressionContext>& expCtx, const intrusive_ptr<Expression>& expression) - : DocumentSource(expCtx), _expression(expression) {} + : DocumentSource(kStageName, expCtx), _expression(expression) {} REGISTER_DOCUMENT_SOURCE(redact, LiteParsedDocumentSourceDefault::parse, DocumentSourceRedact::createFromBson); const char* DocumentSourceRedact::getSourceName() const { - return "$redact"; + return kStageName.rawData(); } static const Value descendVal = Value("descend"_sd); static const Value pruneVal = Value("prune"_sd); static const Value keepVal = Value("keep"_sd); -DocumentSource::GetNextResult DocumentSourceRedact::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceRedact::doGetNext() { auto nextInput = pSource->getNext(); for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { auto& variables = pExpCtx->variables; @@ -192,7 +190,6 @@ intrusive_ptr<DocumentSource> DocumentSourceRedact::createFromBson( variables.setValue(pruneId, pruneVal); variables.setValue(keepId, keepVal); - return source; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index 1e9c9784e12..a4409c2e081 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -36,7 +36,7 @@ namespace mongo { class DocumentSourceRedact final : public DocumentSource { public: - GetNextResult getNext() final; + static constexpr StringData kStageName = "$redact"_sd; const char* getSourceName() const final; boost::intrusive_ptr<DocumentSource> optimize() final; @@ -75,6 +75,8 @@ private: DocumentSourceRedact(const boost::intrusive_ptr<ExpressionContext>& expCtx, const boost::intrusive_ptr<Expression>& previsit); + GetNextResult doGetNext() final; + // These both work over pExpCtx->variables. boost::optional<Document> redactObject(const Document& root); // redacts CURRENT Value redactValue(const Value& in, const Document& root); diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp index 3fe49d83f0b..4a1a29688c8 100644 --- a/src/mongo/db/pipeline/document_source_replace_root.cpp +++ b/src/mongo/db/pipeline/document_source_replace_root.cpp @@ -119,7 +119,7 @@ intrusive_ptr<DocumentSource> DocumentSourceReplaceRoot::createFromBson( newRootExpression, (stageName == kStageName) ? ReplaceRootTransformation::UserSpecifiedName::kReplaceRoot : ReplaceRootTransformation::UserSpecifiedName::kReplaceWith), - kStageName.toString(), + kStageName.rawData(), isIndependentOfAnyCollection); } diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 072b7a22d1c..ac9346a0d28 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -44,18 +44,16 @@ using boost::intrusive_ptr; constexpr StringData DocumentSourceSample::kStageName; DocumentSourceSample::DocumentSourceSample(const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx), _size(0) {} + : DocumentSource(kStageName, pExpCtx), _size(0) {} REGISTER_DOCUMENT_SOURCE(sample, LiteParsedDocumentSourceDefault::parse, DocumentSourceSample::createFromBson); -DocumentSource::GetNextResult DocumentSourceSample::getNext() { +DocumentSource::GetNextResult DocumentSourceSample::doGetNext() { if (_size == 0) return GetNextResult::makeEOF(); - pExpCtx->checkForInterrupt(); - if (!_sortStage->isPopulated()) { // Exhaust source stage, add random metadata, and push all into sorter. PseudoRandom& prng = pExpCtx->opCtx->getClient()->getPrng(); @@ -117,7 +115,6 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::createFromBson( return sample; } - boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSample::distributedPlanLogic() { // On the merger we need to merge the pre-sorted documents by their random values, then limit to // the number we need. diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 987f8f8ba94..14c49978ee8 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -38,7 +38,6 @@ class DocumentSourceSample final : public DocumentSource { public: static constexpr StringData kStageName = "$sample"_sd; - GetNextResult getNext() final; const char* getSourceName() const final { return kStageName.rawData(); } @@ -70,6 +69,8 @@ public: private: explicit DocumentSourceSample(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + GetNextResult doGetNext() final; + long long _size; // Uses a $sort stage to randomly sort the documents. diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp index cddae4ad571..ba2c99cbbfb 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp @@ -50,14 +50,14 @@ DocumentSourceSampleFromRandomCursor::DocumentSourceSampleFromRandomCursor( long long size, std::string idField, long long nDocsInCollection) - : DocumentSource(pExpCtx), + : DocumentSource(kStageName, pExpCtx), _size(size), _idField(std::move(idField)), _seenDocs(pExpCtx->getValueComparator().makeUnorderedValueSet()), _nDocsInColl(nDocsInCollection) {} const char* DocumentSourceSampleFromRandomCursor::getSourceName() const { - return "$sampleFromRandomCursor"; + return kStageName.rawData(); } namespace { @@ -75,9 +75,7 @@ double smallestFromSampleOfUniform(PseudoRandom* prng, size_t N) { } } // namespace -DocumentSource::GetNextResult DocumentSourceSampleFromRandomCursor::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceSampleFromRandomCursor::doGetNext() { if (_seenDocs.size() >= static_cast<size_t>(_size)) return GetNextResult::makeEOF(); diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index 31f49bec10f..07d8f4cabfd 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -40,7 +40,7 @@ namespace mongo { */ class DocumentSourceSampleFromRandomCursor final : public DocumentSource { public: - GetNextResult getNext() final; + static constexpr StringData kStageName = "$sampleFromRandomCursor"_sd; const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; DepsTracker::State getDependencies(DepsTracker* deps) const final; @@ -71,6 +71,8 @@ private: std::string idField, long long collectionSize); + GetNextResult doGetNext() final; + /** * Keep asking for documents from the random cursor until it yields a new document. Errors if a * a document is encountered without a value for '_idField', or if the random cursor keeps diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp index e6d1f3bd7f8..8cdfba006dd 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp @@ -39,7 +39,7 @@ constexpr StringData DocumentSourceSequentialDocumentCache::kStageName; DocumentSourceSequentialDocumentCache::DocumentSourceSequentialDocumentCache( const boost::intrusive_ptr<ExpressionContext>& expCtx, SequentialDocumentCache* cache) - : DocumentSource(expCtx), _cache(cache) { + : DocumentSource(kStageName, expCtx), _cache(cache) { invariant(_cache); invariant(!_cache->isAbandoned()); @@ -48,12 +48,10 @@ DocumentSourceSequentialDocumentCache::DocumentSourceSequentialDocumentCache( } } -DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() { +DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::doGetNext() { // Either we're reading from the cache, or we have an input source to build the cache from. invariant(pSource || _cache->isServing()); - pExpCtx->checkForInterrupt(); - if (_cacheIsEOF) { return GetNextResult::makeEOF(); } diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index d8d2ba90db8..15381735974 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -47,7 +47,7 @@ public: static constexpr StringData kStageName = "$sequentialCache"_sd; const char* getSourceName() const final { - return kStageName.rawData(); + return DocumentSourceSequentialDocumentCache::kStageName.rawData(); } StageConstraints constraints(Pipeline::SplitState pipeState) const { @@ -68,8 +68,6 @@ public: return boost::none; } - GetNextResult getNext() final; - static boost::intrusive_ptr<DocumentSourceSequentialDocumentCache> create( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, SequentialDocumentCache* cache) { return new DocumentSourceSequentialDocumentCache(pExpCtx, cache); @@ -85,6 +83,7 @@ public: } protected: + GetNextResult doGetNext() final; Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp index 4575e9fa02b..633e2e55b1c 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp @@ -46,20 +46,18 @@ using boost::intrusive_ptr; DocumentSourceSingleDocumentTransformation::DocumentSourceSingleDocumentTransformation( const intrusive_ptr<ExpressionContext>& pExpCtx, std::unique_ptr<TransformerInterface> parsedTransform, - std::string name, + const StringData name, bool isIndependentOfAnyCollection) - : DocumentSource(pExpCtx), + : DocumentSource(name, pExpCtx), _parsedTransform(std::move(parsedTransform)), - _name(std::move(name)), + _name(name.toString()), _isIndependentOfAnyCollection(isIndependentOfAnyCollection) {} const char* DocumentSourceSingleDocumentTransformation::getSourceName() const { return _name.c_str(); } -DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::doGetNext() { // Get the next input document. auto input = pSource->getNext(); if (!input.isAdvanced()) { diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index f36287b0d37..6d5daa7171a 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -45,12 +45,12 @@ public: DocumentSourceSingleDocumentTransformation( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, std::unique_ptr<TransformerInterface> parsedTransform, - std::string name, + const StringData name, bool independentOfAnyCollection); // virtuals from DocumentSource const char* getSourceName() const final; - GetNextResult getNext() final; + boost::intrusive_ptr<DocumentSource> optimize() final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; DepsTracker::State getDependencies(DepsTracker* deps) const final; @@ -90,6 +90,7 @@ public: } protected: + GetNextResult doGetNext() final; void doDispose() final; Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp index 143a796cdf6..8a867a27e4b 100644 --- a/src/mongo/db/pipeline/document_source_skip.cpp +++ b/src/mongo/db/pipeline/document_source_skip.cpp @@ -45,7 +45,7 @@ using boost::intrusive_ptr; DocumentSourceSkip::DocumentSourceSkip(const intrusive_ptr<ExpressionContext>& pExpCtx, long long nToSkip) - : DocumentSource(pExpCtx), _nToSkip(nToSkip) {} + : DocumentSource(kStageName, pExpCtx), _nToSkip(nToSkip) {} REGISTER_DOCUMENT_SOURCE(skip, LiteParsedDocumentSourceDefault::parse, @@ -53,9 +53,7 @@ REGISTER_DOCUMENT_SOURCE(skip, constexpr StringData DocumentSourceSkip::kStageName; -DocumentSource::GetNextResult DocumentSourceSkip::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceSkip::doGetNext() { while (_nSkippedSoFar < _nToSkip) { // For performance reasons, a streaming stage must not keep references to documents across // calls to getNext(). Such stages must retrieve a result from their child and then release diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index fdd430ccf6c..46ab8f46041 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -61,8 +61,6 @@ public: LookupRequirement::kAllowed}; } - GetNextResult getNext() final; - const char* getSourceName() const final { return kStageName.rawData(); } @@ -101,6 +99,8 @@ private: explicit DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long nToSkip); + GetNextResult doGetNext() final; + long long _nToSkip = 0; long long _nSkippedSoFar = 0; }; diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 09afc8fc2e6..30d71711823 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -104,7 +104,7 @@ DocumentSourceSort::DocumentSourceSort(const boost::intrusive_ptr<ExpressionCont const BSONObj& sortOrder, uint64_t limit, uint64_t maxMemoryUsageBytes) - : DocumentSource(pExpCtx), + : DocumentSource(kStageName, pExpCtx), _sortExecutor({{sortOrder, pExpCtx}, limit, maxMemoryUsageBytes, @@ -122,9 +122,7 @@ REGISTER_DOCUMENT_SOURCE(sort, LiteParsedDocumentSourceDefault::parse, DocumentSourceSort::createFromBson); -DocumentSource::GetNextResult DocumentSourceSort::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceSort::doGetNext() { if (!_populated) { const auto populationResult = populate(); if (populationResult.isPaused()) { @@ -215,7 +213,6 @@ DepsTracker::State DocumentSourceSort::getDependencies(DepsTracker* deps) const return DepsTracker::State::SEE_NEXT; } - intrusive_ptr<DocumentSource> DocumentSourceSort::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(15973, "the $sort key specification must be an object", elem.type() == Object); diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 114c8f7445b..48b7fbc1095 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -44,8 +44,6 @@ class DocumentSourceSort final : public DocumentSource { public: static constexpr StringData kStageName = "$sort"_sd; - GetNextResult getNext() final; - const char* getSourceName() const final { return kStageName.rawData(); } @@ -135,6 +133,7 @@ public: } protected: + GetNextResult doGetNext() final; /** * Attempts to absorb a subsequent $limit stage so that it can perform a top-k sort. */ diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.cpp b/src/mongo/db/pipeline/document_source_tee_consumer.cpp index c63aa579b08..59910b8f5ca 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.cpp +++ b/src/mongo/db/pipeline/document_source_tee_consumer.cpp @@ -45,7 +45,7 @@ using boost::intrusive_ptr; DocumentSourceTeeConsumer::DocumentSourceTeeConsumer(const intrusive_ptr<ExpressionContext>& expCtx, size_t facetId, const intrusive_ptr<TeeBuffer>& bufferSource) - : DocumentSource(expCtx), _facetId(facetId), _bufferSource(bufferSource) {} + : DocumentSource(kStageName, expCtx), _facetId(facetId), _bufferSource(bufferSource) {} boost::intrusive_ptr<DocumentSourceTeeConsumer> DocumentSourceTeeConsumer::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -54,8 +54,7 @@ boost::intrusive_ptr<DocumentSourceTeeConsumer> DocumentSourceTeeConsumer::creat return new DocumentSourceTeeConsumer(expCtx, facetId, bufferSource); } -DocumentSource::GetNextResult DocumentSourceTeeConsumer::getNext() { - pExpCtx->checkForInterrupt(); +DocumentSource::GetNextResult DocumentSourceTeeConsumer::doGetNext() { return _bufferSource->getNext(_facetId); } diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index e9054678296..94078c9bcb0 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -49,6 +49,7 @@ class Value; */ class DocumentSourceTeeConsumer : public DocumentSource { public: + static constexpr StringData kStageName = "$teeConsumer"_sd; static boost::intrusive_ptr<DocumentSourceTeeConsumer> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, size_t facetId, @@ -68,8 +69,6 @@ public: return boost::none; } - GetNextResult getNext() final; - /** * Returns SEE_NEXT, since it requires no fields, and changes nothing about the documents. */ @@ -80,6 +79,7 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; protected: + GetNextResult doGetNext() final; void doDispose() final; private: diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h index ba535906ea6..89a91501d22 100644 --- a/src/mongo/db/pipeline/document_source_test_optimizations.h +++ b/src/mongo/db/pipeline/document_source_test_optimizations.h @@ -39,9 +39,12 @@ namespace mongo { */ class DocumentSourceTestOptimizations : public DocumentSource { public: - DocumentSourceTestOptimizations() : DocumentSource(new ExpressionContextForTest()) {} + static constexpr StringData kStageName = "$_internalTestOptimizations"_sd; + DocumentSourceTestOptimizations() + : DocumentSource(DocumentSourceTestOptimizations::kStageName, + new ExpressionContextForTest()) {} virtual ~DocumentSourceTestOptimizations() = default; - virtual GetNextResult getNext() override { + virtual GetNextResult doGetNext() override { MONGO_UNREACHABLE; } virtual StageConstraints constraints(Pipeline::SplitState) const override { diff --git a/src/mongo/db/pipeline/document_source_unwind.cpp b/src/mongo/db/pipeline/document_source_unwind.cpp index 870394a277c..b1426d30de3 100644 --- a/src/mongo/db/pipeline/document_source_unwind.cpp +++ b/src/mongo/db/pipeline/document_source_unwind.cpp @@ -160,7 +160,7 @@ DocumentSourceUnwind::DocumentSourceUnwind(const intrusive_ptr<ExpressionContext const FieldPath& fieldPath, bool preserveNullAndEmptyArrays, const boost::optional<FieldPath>& indexPath) - : DocumentSource(pExpCtx), + : DocumentSource(kStageName, pExpCtx), _unwindPath(fieldPath), _preserveNullAndEmptyArrays(preserveNullAndEmptyArrays), _indexPath(indexPath), @@ -171,7 +171,7 @@ REGISTER_DOCUMENT_SOURCE(unwind, DocumentSourceUnwind::createFromBson); const char* DocumentSourceUnwind::getSourceName() const { - return "$unwind"; + return kStageName.rawData(); } intrusive_ptr<DocumentSourceUnwind> DocumentSourceUnwind::create( @@ -187,9 +187,7 @@ intrusive_ptr<DocumentSourceUnwind> DocumentSourceUnwind::create( return source; } -DocumentSource::GetNextResult DocumentSourceUnwind::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceUnwind::doGetNext() { auto nextOut = _unwinder->getNext(); while (nextOut.isEOF()) { // No more elements in array currently being unwound. This will loop if the input diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index 4cef6983564..c3254179e88 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -36,9 +36,9 @@ namespace mongo { class DocumentSourceUnwind final : public DocumentSource { public: - // virtuals from DocumentSource - GetNextResult getNext() final; + static constexpr StringData kStageName = "$unwind"_sd; + // virtuals from DocumentSource const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; @@ -97,6 +97,8 @@ private: bool includeNullIfEmptyOrMissing, const boost::optional<FieldPath>& includeArrayIndex); + GetNextResult doGetNext() final; + // Configuration state. const FieldPath _unwindPath; // Documents that have a nullish value, or an empty array for the field '_unwindPath', will pass diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h index ada2fc72a53..dd5dd0c37a5 100644 --- a/src/mongo/db/pipeline/document_source_writer.h +++ b/src/mongo/db/pipeline/document_source_writer.h @@ -89,9 +89,10 @@ public: using BatchObject = B; using BatchedObjects = std::vector<BatchObject>; - DocumentSourceWriter(NamespaceString outputNs, + DocumentSourceWriter(const char* stageName, + NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(expCtx), + : DocumentSource(stageName, expCtx), _outputNs(std::move(outputNs)), _writeConcern(expCtx->opCtx->getWriteConcern()) {} @@ -119,9 +120,8 @@ public: return _outputNs; } - GetNextResult getNext() final override; - protected: + GetNextResult doGetNext() final override; /** * Prepares the stage to be able to write incoming batches. */ @@ -165,9 +165,7 @@ private: }; template <typename B> -DocumentSource::GetNextResult DocumentSourceWriter<B>::getNext() { - pExpCtx->checkForInterrupt(); - +DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() { if (_done) { return GetNextResult::makeEOF(); } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index fb4f3d988ee..3dc8dc79602 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -35,6 +35,7 @@ #include <boost/intrusive_ptr.hpp> +#include "mongo/db/exec/plan_stats.h" #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/dependencies.h" @@ -252,7 +253,6 @@ public: */ DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const; - const SourceContainer& getSources() const { return _sources; } diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp index d48c6ca804a..cbdb2fc033d 100644 --- a/src/mongo/s/query/document_source_merge_cursors.cpp +++ b/src/mongo/s/query/document_source_merge_cursors.cpp @@ -50,7 +50,7 @@ DocumentSourceMergeCursors::DocumentSourceMergeCursors( AsyncResultsMergerParams armParams, const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<BSONObj> ownedParamsSpec) - : DocumentSource(expCtx), + : DocumentSource(kStageName, expCtx), _armParamsObj(std::move(ownedParamsSpec)), _executor(std::move(executor)), _armParams(std::move(armParams)) {} @@ -95,7 +95,7 @@ std::unique_ptr<RouterStageMerge> DocumentSourceMergeCursors::convertToRouterSta return std::make_unique<RouterStageMerge>(pExpCtx->opCtx, _executor, std::move(*_armParams)); } -DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { +DocumentSource::GetNextResult DocumentSourceMergeCursors::doGetNext() { if (!_blockingResultsMerger) { populateMerger(); } diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index f8e25299260..f979c13a00c 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -101,8 +101,6 @@ public: return boost::none; } - GetNextResult getNext() final; - std::size_t getNumRemotes() const; /** @@ -148,6 +146,7 @@ public: } protected: + GetNextResult doGetNext() final; void doDispose() final; private: diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp index 00a8971454c..d94a99518b7 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.cpp +++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp @@ -66,13 +66,13 @@ DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard( const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors, std::vector<ShardId>&& shardsWithCursors, BSONObj cmdToRunOnNewShards) - : DocumentSource(expCtx), + : DocumentSource(kStageName, expCtx), _executor(std::move(executor)), _mergeCursors(mergeCursors), _shardsWithCursors(std::move(shardsWithCursors)), _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {} -DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::getNext() { +DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() { auto childResult = pSource->getNext(); while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) { diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h index e8f2473b432..0b41fde92d1 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -47,6 +47,8 @@ namespace mongo { */ class DocumentSourceUpdateOnAddShard final : public DocumentSource { public: + static constexpr StringData kStageName = "$_internalUpdateOnAddShard"_sd; + /** * Creates a new stage which will establish a new cursor and add it to the cursors being merged * by 'mergeCursorsStage' whenever a new shard is detected by a change stream. @@ -79,8 +81,6 @@ public: return boost::none; } - GetNextResult getNext() final; - private: DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&, std::shared_ptr<executor::TaskExecutor> executor, @@ -88,6 +88,8 @@ private: std::vector<ShardId>&& shardsWithCursors, BSONObj cmdToRunOnNewShards); + GetNextResult doGetNext() final; + /** * Establish the new cursors and tell the RouterStageMerge about them. */ |