summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source.cpp21
-rw-r--r--src/mongo/db/pipeline/document_source.h17
-rw-r--r--src/mongo/db/pipeline/document_source_add_fields.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h5
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.h9
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h9
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h8
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h6
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h4
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.h8
-rw-r--r--src/mongo/db/pipeline/document_source_exchange_test.cpp365
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h12
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h10
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near_cursor.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near_cursor.h2
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h5
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_group.h3
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h5
-rw-r--r--src/mongo/db/pipeline/document_source_internal_shard_filter.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_internal_shard_filter.h4
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h6
-rw-r--r--src/mongo/db/pipeline/document_source_limit.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h2
-rw-r--r--src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_list_cached_and_active_users.h8
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h15
-rw-r--r--src/mongo/db/pipeline/document_source_list_sessions.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_list_sessions.h7
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h3
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h12
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_match.h4
-rw-r--r--src/mongo/db/pipeline/document_source_merge.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h11
-rw-r--r--src/mongo/db/pipeline/document_source_out.h3
-rw-r--r--src/mongo/db/pipeline/document_source_plan_cache_stats.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_plan_cache_stats.h11
-rw-r--r--src/mongo/db/pipeline/document_source_project.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_queue.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_queue.h2
-rw-r--r--src/mongo/db/pipeline/document_source_redact.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h4
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h3
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.h4
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h5
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h5
-rw-r--r--src/mongo/db/pipeline/document_source_skip.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h4
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h3
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.h4
-rw-r--r--src/mongo/db/pipeline/document_source_test_optimizations.h7
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h6
-rw-r--r--src/mongo/db/pipeline/document_source_writer.h12
-rw-r--r--src/mongo/db/pipeline/pipeline.h2
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.cpp4
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h3
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.cpp4
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.h6
92 files changed, 444 insertions, 471 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index b34cd5987d7..25efa885ef3 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -348,6 +348,7 @@ pipelineEnv.Library(
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
+ '$BUILD_DIR/mongo/db/exec/scoped_timer',
'$BUILD_DIR/mongo/db/exec/sort_executor',
'$BUILD_DIR/mongo/db/generic_cursor',
'$BUILD_DIR/mongo/db/index/key_generator',
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 75911585dfe..bdad5fb8fb7 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_internal_shard_filter.h"
@@ -51,8 +52,9 @@ using std::list;
using std::string;
using std::vector;
-DocumentSource::DocumentSource(const intrusive_ptr<ExpressionContext>& pCtx)
- : pSource(nullptr), pExpCtx(pCtx) {}
+DocumentSource::DocumentSource(const StringData stageName,
+ const intrusive_ptr<ExpressionContext>& pCtx)
+ : pSource(nullptr), pExpCtx(pCtx), _commonStats(stageName.rawData()) {}
namespace {
// Used to keep track of which DocumentSources are registered under which name.
@@ -85,6 +87,21 @@ list<intrusive_ptr<DocumentSource>> DocumentSource::parse(
return it->second(stageSpec, expCtx);
}
+DocumentSource::GetNextResult DocumentSource::getNext() {
+ pExpCtx->checkForInterrupt();
+ invariant(pExpCtx->opCtx->getServiceContext());
+ invariant(pExpCtx->opCtx->getServiceContext()->getFastClockSource());
+ ScopedTimer timer(pExpCtx->opCtx->getServiceContext()->getFastClockSource(),
+ &_commonStats.executionTimeMillis);
+ ++_commonStats.works;
+
+ GetNextResult next = doGetNext();
+ if (next.isAdvanced()) {
+ ++_commonStats.advanced;
+ }
+ return next;
+}
+
const char* DocumentSource::getSourceName() const {
static const char unknown[] = "[UNKNOWN]";
return unknown;
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 435219fdfda..d37fd19d2c6 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -43,6 +43,7 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/collection_index_usage_tracker.h"
#include "mongo/db/commands.h"
+#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/generic_cursor.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
@@ -241,14 +242,12 @@ public:
* The main execution API of a DocumentSource. Returns an intermediate query result generated by
* this DocumentSource.
*
- * All implementers must call pExpCtx->checkForInterrupt().
- *
* For performance reasons, a streaming stage must not keep references to documents across calls
* to getNext(). Such stages must retrieve a result from their child and then release it (or
* return it) before asking for another result. Failing to do so can result in extra work, since
* the Document/Value library must copy data on write when that data has a refcount above one.
*/
- virtual GetNextResult getNext() = 0;
+ GetNextResult getNext();
/**
* Returns a struct containing information about any special constraints imposed on using this
@@ -305,7 +304,6 @@ public:
virtual void addInvolvedCollections(
stdx::unordered_set<NamespaceString>* collectionNames) const {}
-
virtual void detachFromOperationContext() {}
virtual void reattachToOperationContext(OperationContext* opCtx) {}
@@ -470,7 +468,14 @@ public:
}
protected:
- explicit DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ DocumentSource(const StringData stageName,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
+ * The main execution API of a DocumentSource. Returns an intermediate query result generated by
+ * this DocumentSource. See comment at getNext().
+ */
+ virtual GetNextResult doGetNext() = 0;
/**
* Attempt to perform an optimization with the following source in the pipeline. 'container'
@@ -507,6 +512,8 @@ protected:
boost::intrusive_ptr<ExpressionContext> pExpCtx;
private:
+ CommonStats _commonStats;
+
/**
* Create a Value that represents the document source.
*
diff --git a/src/mongo/db/pipeline/document_source_add_fields.cpp b/src/mongo/db/pipeline/document_source_add_fields.cpp
index dd7550a0c1c..a362f38495d 100644
--- a/src/mongo/db/pipeline/document_source_add_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_add_fields.cpp
@@ -66,7 +66,7 @@ intrusive_ptr<DocumentSource> DocumentSourceAddFields::create(
throw;
}
}(),
- userSpecifiedName.toString(),
+ userSpecifiedName == kStageName ? kStageName : kAliasNameSet,
isIndependentOfAnyCollection));
return addFields;
}
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index f8fa5e86ceb..26cb0e9a89b 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -84,12 +84,10 @@ std::string nextFileName() {
} // namespace
const char* DocumentSourceBucketAuto::getSourceName() const {
- return "$bucketAuto";
+ return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceBucketAuto::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceBucketAuto::doGetNext() {
if (!_populated) {
const auto populationResult = populateSorter();
if (populationResult.isPaused()) {
@@ -426,7 +424,7 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto(
std::vector<AccumulationStatement> accumulationStatements,
const boost::intrusive_ptr<GranularityRounder>& granularityRounder,
uint64_t maxMemoryUsageBytes)
- : DocumentSource(pExpCtx),
+ : DocumentSource(kStageName, pExpCtx),
_nBuckets(numBuckets),
_maxMemoryUsageBytes(maxMemoryUsageBytes),
_groupByExpression(groupByExpression),
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index ec2ff61b7e7..65e2cd960a2 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -44,9 +44,10 @@ namespace mongo {
*/
class DocumentSourceBucketAuto final : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$bucketAuto"_sd;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
- GetNextResult getNext() final;
+
const char* getSourceName() const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -93,6 +94,7 @@ public:
const std::vector<AccumulationStatement>& getAccumulatedFields() const;
protected:
+ GetNextResult doGetNext() final;
void doDispose() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 9050b9990dd..a9d80a0e2a0 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -242,7 +242,6 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac
}
}
-
BSONObj DocumentSourceChangeStream::buildMatchFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp startFrom,
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 7487e565f1e..70357cd433e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -150,14 +150,12 @@ public:
enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster };
-
/**
* Helpers for Determining which regex to match a change stream against.
*/
static ChangeStreamType getChangeStreamType(const NamespaceString& nss);
static std::string getNsRegexForChangeStream(const NamespaceString& nss);
-
/**
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
* to only those relevant for this $changeStream stage.
@@ -215,7 +213,7 @@ public:
const char* getSourceName() const final;
- GetNextResult getNext() final {
+ GetNextResult doGetNext() final {
// We should never execute this stage directly. We expect this stage to be absorbed into the
// cursor feeding the pipeline, and executing this stage may result in the use of the wrong
// collation. The comparisons against the oplog must use the simple collation, regardless of
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
index 1fedb3cb940..a4a43c654ee 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
@@ -52,9 +52,7 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt
} // namespace
-DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceCloseCursor::doGetNext() {
// Close cursor if we have returned an invalidate entry.
if (_shouldCloseCursor) {
uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated");
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
index ab28dee5e8d..6fc042b9bb4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
@@ -43,11 +43,11 @@ namespace mongo {
*/
class DocumentSourceCloseCursor final : public DocumentSource {
public:
- GetNextResult getNext() final;
+ static constexpr StringData kStageName = "$changeStream"_sd;
const char* getSourceName() const final {
// This is used in error reporting.
- return "$changeStream";
+ return DocumentSourceCloseCursor::kStageName.rawData();
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -88,7 +88,9 @@ private:
* Use the create static method to create a DocumentSourceCloseCursor.
*/
DocumentSourceCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx) {}
+ : DocumentSource(kStageName, expCtx) {}
+
+ GetNextResult doGetNext() final;
bool _shouldCloseCursor = false;
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 1c6739b8497..5a558a924b9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -82,7 +82,7 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ServerGlobalParams::FeatureCompatibility::Version& fcv,
BSONObj changeStreamSpec)
- : DocumentSource(expCtx),
+ : DocumentSource(DocumentSourceChangeStreamTransform::kStageName, expCtx),
_changeStreamSpec(changeStreamSpec.getOwned()),
_isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()),
_fcv(fcv) {
@@ -393,9 +393,7 @@ DocumentSource::GetModPathsReturn DocumentSourceChangeStreamTransform::getModifi
return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}};
}
-DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::doGetNext() {
uassert(50988,
"Illegal attempt to execute an internal change stream stage on mongos. A $changeStream "
"stage must be the first stage in a pipeline",
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index c20f5864e67..df8cecf49f0 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -39,6 +39,7 @@ namespace mongo {
class DocumentSourceChangeStreamTransform : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$_internalChangeStreamTransform"_sd;
/**
* Creates a new transformation stage from the given specification.
*/
@@ -58,11 +59,13 @@ public:
return boost::none;
}
- DocumentSource::GetNextResult getNext();
const char* getSourceName() const {
return DocumentSourceChangeStream::kStageName.rawData();
}
+protected:
+ DocumentSource::GetNextResult doGetNext() override;
+
private:
// This constructor is private, callers should use the 'create()' method above.
DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>&,
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
index 376e0dc97c9..5b286b585b0 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
@@ -58,9 +58,7 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt
} // namespace
-DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() {
if (_queuedInvalidate) {
const auto res = DocumentSource::GetNextResult(std::move(_queuedInvalidate.get()));
_queuedInvalidate.reset();
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h
index 349ad68c589..69b4dcb742c 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.h
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.h
@@ -40,11 +40,11 @@ namespace mongo {
*/
class DocumentSourceCheckInvalidate final : public DocumentSource {
public:
- GetNextResult getNext() final;
+ static constexpr StringData kStageName = "$_checkInvalidate"_sd;
const char* getSourceName() const final {
// This is used in error reporting.
- return "$_checkInvalidate";
+ return DocumentSourceCheckInvalidate::kStageName.rawData();
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -80,11 +80,14 @@ private:
*/
DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<ResumeTokenData> startAfterInvalidate)
- : DocumentSource(expCtx), _startAfterInvalidate(std::move(startAfterInvalidate)) {
+ : DocumentSource(kStageName, expCtx),
+ _startAfterInvalidate(std::move(startAfterInvalidate)) {
invariant(!_startAfterInvalidate ||
_startAfterInvalidate->fromInvalidate == ResumeTokenData::kFromInvalidate);
}
+ GetNextResult doGetNext() final;
+
boost::optional<ResumeTokenData> _startAfterInvalidate;
boost::optional<Document> _queuedInvalidate;
};
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 5b9eeabaf6c..d5f6da9a8c0 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -168,7 +168,7 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
} // namespace
const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
- return "$_ensureResumeTokenPresent";
+ return kStageName.rawData();
}
Value DocumentSourceEnsureResumeTokenPresent::serialize(
@@ -186,11 +186,9 @@ DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionCon
DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
-
-DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
- pExpCtx->checkForInterrupt();
+ : DocumentSource(kStageName, expCtx), _tokenFromClient(std::move(token)) {}
+DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext() {
if (_resumeStatus == ResumeStatus::kSurpassedToken) {
// We've already verified the resume token is present.
return pSource->getNext();
@@ -256,7 +254,7 @@ DocumentSourceEnsureResumeTokenPresent::_checkNextDocAndSwallowResumeToken(
}
const char* DocumentSourceShardCheckResumability::getSourceName() const {
- return "$_checkShardResumability";
+ return kStageName.rawData();
}
Value DocumentSourceShardCheckResumability::serialize(
@@ -279,11 +277,9 @@ intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResu
DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}
-
-DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
- pExpCtx->checkForInterrupt();
+ : DocumentSource(kStageName, expCtx), _tokenFromClient(std::move(token)) {}
+DocumentSource::GetNextResult DocumentSourceShardCheckResumability::doGetNext() {
if (_surpassedResumeToken)
return pSource->getNext();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 8c90a88b564..f13d4741f30 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -59,7 +59,8 @@ namespace mongo {
*/
class DocumentSourceShardCheckResumability final : public DocumentSource {
public:
- GetNextResult getNext() final;
+ static constexpr StringData kStageName = "$_internalCheckShardResumability"_sd;
+
const char* getSourceName() const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -92,6 +93,8 @@ private:
DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
ResumeTokenData token);
+ GetNextResult doGetNext() final;
+
void _assertOplogHasEnoughHistory(const GetNextResult& nextInput);
ResumeTokenData _tokenFromClient;
@@ -105,6 +108,7 @@ private:
*/
class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd;
// Used to record the results of comparing the token data extracted from documents in the
// resumed stream against the client's resume token.
enum class ResumeStatus {
@@ -113,7 +117,6 @@ public:
kCheckNextDoc // The next document produced by the stream may contain the resume token.
};
- GetNextResult getNext() final;
const char* getSourceName() const final;
StageConstraints constraints(Pipeline::SplitState) const final {
@@ -154,6 +157,8 @@ private:
DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx,
ResumeTokenData token);
+ GetNextResult doGetNext() final;
+
/**
* Check the given event to determine whether it matches the client's resume token. If so, we
* swallow this event and return the next event in the stream. Otherwise, return boost::none.
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index bbf031b9f08..72c504a6093 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -46,7 +46,7 @@ REGISTER_DOCUMENT_SOURCE(collStats,
DocumentSourceCollStats::createFromBson);
const char* DocumentSourceCollStats::getSourceName() const {
- return "$collStats";
+ return kStageName.rawData();
}
intrusive_ptr<DocumentSource> DocumentSourceCollStats::createFromBson(
@@ -98,9 +98,7 @@ intrusive_ptr<DocumentSource> DocumentSourceCollStats::createFromBson(
return collStats;
}
-DocumentSource::GetNextResult DocumentSourceCollStats::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceCollStats::doGetNext() {
if (_finished) {
return GetNextResult::makeEOF();
}
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index a950afb1813..6aa1b27192a 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -39,6 +39,8 @@ namespace mongo {
*/
class DocumentSourceCollStats : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$collStats"_sd;
+
class LiteParsed final : public LiteParsedDocumentSource {
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
@@ -69,9 +71,7 @@ public:
};
DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx) {}
-
- GetNextResult getNext() final;
+ : DocumentSource(kStageName, pExpCtx) {}
const char* getSourceName() const final;
@@ -98,6 +98,8 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
private:
+ GetNextResult doGetNext() final;
+
// The raw object given to $collStats containing user specified options.
BSONObj _collStatsSpec;
bool _finished = false;
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
index 72225d8185f..20f67ef9175 100644
--- a/src/mongo/db/pipeline/document_source_current_op.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -103,14 +103,11 @@ std::unique_ptr<DocumentSourceCurrentOp::LiteParsed> DocumentSourceCurrentOp::Li
return std::make_unique<DocumentSourceCurrentOp::LiteParsed>(allUsers, localOps);
}
-
const char* DocumentSourceCurrentOp::getSourceName() const {
return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceCurrentOp::doGetNext() {
if (_ops.empty()) {
_ops = pExpCtx->mongoProcessInterface->getCurrentOps(pExpCtx,
_includeIdleConnections,
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index 0e86973a009..fffc51503a3 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -102,8 +102,6 @@ public:
CursorMode idleCursors = CursorMode::kExcludeCursors,
BacktraceMode backtrace = BacktraceMode::kExcludeBacktrace);
- GetNextResult getNext() final;
-
const char* getSourceName() const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -140,7 +138,7 @@ private:
TruncationMode truncateOps,
CursorMode idleCursors,
BacktraceMode backtrace)
- : DocumentSource(pExpCtx),
+ : DocumentSource(kStageName, pExpCtx),
_includeIdleConnections(includeIdleConnections),
_includeIdleSessions(includeIdleSessions),
_includeOpsFromAllUsers(includeOpsFromAllUsers),
@@ -149,6 +147,8 @@ private:
_idleCursors(idleCursors),
_backtrace(backtrace) {}
+ GetNextResult doGetNext() final;
+
ConnMode _includeIdleConnections = ConnMode::kExcludeIdle;
SessionMode _includeIdleSessions = SessionMode::kIncludeIdle;
UserMode _includeOpsFromAllUsers = UserMode::kExcludeOthers;
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 934f37d2d1d..fde3c279d33 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -53,12 +53,10 @@ using std::shared_ptr;
using std::string;
const char* DocumentSourceCursor::getSourceName() const {
- return "$cursor";
+ return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceCursor::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceCursor::doGetNext() {
if (_currentBatch.empty()) {
loadBatch();
}
@@ -308,7 +306,7 @@ DocumentSourceCursor::DocumentSourceCursor(
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
const intrusive_ptr<ExpressionContext>& pCtx,
bool trackOplogTimestamp)
- : DocumentSource(pCtx),
+ : DocumentSource(kStageName, pCtx),
_docsAddedToBatches(0),
_exec(std::move(exec)),
_trackOplogTS(trackOplogTimestamp) {
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 815b6635b14..fa28dd114a0 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -46,8 +46,8 @@ namespace mongo {
*/
class DocumentSourceCursor : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$cursor"_sd;
// virtuals from DocumentSource
- GetNextResult getNext() final;
const char* getSourceName() const override;
@@ -162,6 +162,8 @@ protected:
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
bool trackOplogTimestamp = false);
+ GetNextResult doGetNext() final;
+
~DocumentSourceCursor();
/**
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp
index b45938c45d0..2220a73b496 100644
--- a/src/mongo/db/pipeline/document_source_exchange.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange.cpp
@@ -88,7 +88,7 @@ constexpr size_t Exchange::kMaxBufferSize;
constexpr size_t Exchange::kMaxNumberConsumers;
const char* DocumentSourceExchange::getSourceName() const {
- return "$_internalExchange";
+ return kStageName.rawData();
}
Value DocumentSourceExchange::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
@@ -100,12 +100,12 @@ DocumentSourceExchange::DocumentSourceExchange(
const boost::intrusive_ptr<Exchange> exchange,
size_t consumerId,
std::unique_ptr<ResourceYielder> yielder)
- : DocumentSource(expCtx),
+ : DocumentSource(kStageName, expCtx),
_exchange(exchange),
_consumerId(consumerId),
_resourceYielder(std::move(yielder)) {}
-DocumentSource::GetNextResult DocumentSourceExchange::getNext() {
+DocumentSource::GetNextResult DocumentSourceExchange::doGetNext() {
return _exchange->getNext(pExpCtx->opCtx, _consumerId, _resourceYielder.get());
}
diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h
index 68b3a037047..e30d66698d1 100644
--- a/src/mongo/db/pipeline/document_source_exchange.h
+++ b/src/mongo/db/pipeline/document_source_exchange.h
@@ -191,6 +191,8 @@ private:
class DocumentSourceExchange final : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$_internalExchange"_sd;
+
/**
* Create an Exchange consumer. 'resourceYielder' is so the exchange may temporarily yield
* resources (such as the Session) while waiting for other threads to do
@@ -202,8 +204,6 @@ public:
size_t consumerId,
std::unique_ptr<ResourceYielder> yielder);
- GetNextResult getNext() final;
-
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
return {StreamType::kStreaming,
PositionRequirement::kNone,
@@ -230,8 +230,6 @@ public:
invariant(!source);
}
- GetNextResult getNext(size_t consumerId);
-
size_t getConsumers() const {
return _exchange->getConsumers();
}
@@ -249,6 +247,8 @@ public:
}
private:
+ GetNextResult doGetNext() final;
+
boost::intrusive_ptr<Exchange> _exchange;
const size_t _consumerId;
diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp
index ef4f626e7b6..c6a5c4945fd 100644
--- a/src/mongo/db/pipeline/document_source_exchange_test.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp
@@ -39,7 +39,9 @@
#include "mongo/platform/random.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/system_clock_source.h"
#include "mongo/util/time_support.h"
@@ -56,6 +58,42 @@ public:
}
};
+namespace {
+/**
+ * This class is used for an Exchange consumer to temporarily relinquish control of a mutex
+ * while it's blocked.
+ */
+class MutexYielder : public ResourceYielder {
+public:
+ MutexYielder(stdx::mutex* mutex) : _lock(*mutex, stdx::defer_lock) {}
+
+ void yield(OperationContext* opCtx) override {
+ _lock.unlock();
+ }
+
+ void unyield(OperationContext* opCtx) override {
+ _lock.lock();
+ }
+
+ stdx::unique_lock<stdx::mutex>& getLock() {
+ return _lock;
+ }
+
+private:
+ stdx::unique_lock<stdx::mutex> _lock;
+};
+
+/**
+ * Used to keep track of each client and operation context.
+ */
+struct ThreadInfo {
+ ServiceContext::UniqueClient client;
+ ServiceContext::UniqueOperationContext opCtx;
+ boost::intrusive_ptr<DocumentSourceExchange> documentSourceExchange;
+ MutexYielder* yielder;
+};
+} // namespace
+
class DocumentSourceExchangeTest : public AggregationContextFixture {
protected:
std::unique_ptr<executor::TaskExecutor> _executor;
@@ -109,6 +147,23 @@ protected:
IDLParserErrorContext ctx("internalExchange");
return ExchangeSpec::parse(ctx, spec);
}
+
+ auto createNProducers(size_t nConsumers, boost::intrusive_ptr<Exchange> ex) {
+ std::vector<ThreadInfo> threads;
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ ServiceContext::UniqueClient client =
+ getServiceContext()->makeClient("exchange client");
+ ServiceContext::UniqueOperationContext opCtxOwned =
+ getServiceContext()->makeOperationContext(client.get());
+ OperationContext* opCtx = opCtxOwned.get();
+ threads.emplace_back(ThreadInfo{
+ std::move(client),
+ std::move(opCtxOwned),
+ new DocumentSourceExchange(new ExpressionContext(opCtx, nullptr), ex, idx, nullptr),
+ });
+ }
+ return threads;
+ }
};
TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) {
@@ -150,29 +205,25 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
-
- for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
- }
-
+ std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
for (size_t id = 0; id < nConsumers; ++id) {
- auto handle = _executor->scheduleWork(
- [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) {
- PseudoRandom prng(getNewSeed());
+ DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get();
+ auto handle = _executor->scheduleWork([docSourceExchange, id, nDocs, nConsumers](
+ const executor::TaskExecutor::CallbackArgs& cb) {
+ PseudoRandom prng(getNewSeed());
- auto input = prods[id]->getNext();
+ auto input = docSourceExchange->getNext();
- size_t docs = 0;
+ size_t docs = 0;
- for (; input.isAdvanced(); input = prods[id]->getNext()) {
- sleepmillis(prng.nextInt32() % 20 + 1);
- ++docs;
- }
- ASSERT_EQ(docs, nDocs / nConsumers);
- });
+ for (; input.isAdvanced(); input = docSourceExchange->getNext()) {
+ sleepmillis(prng.nextInt32() % 20 + 1);
+ ++docs;
+ }
+ ASSERT_EQ(docs, nDocs / nConsumers);
+ });
handles.emplace_back(std::move(handle.getValue()));
}
@@ -197,38 +248,34 @@ TEST_F(DocumentSourceExchangeTest, ExchangeNConsumerEarlyout) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
-
- for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
- }
-
+ std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
for (size_t id = 0; id < nConsumers; ++id) {
- auto handle = _executor->scheduleWork(
- [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) {
- PseudoRandom prng(getNewSeed());
+ DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get();
+ auto handle = _executor->scheduleWork([docSourceExchange, id, nDocs, nConsumers](
+ const executor::TaskExecutor::CallbackArgs& cb) {
+ PseudoRandom prng(getNewSeed());
- auto input = prods[id]->getNext();
+ auto input = docSourceExchange->getNext();
- size_t docs = 0;
+ size_t docs = 0;
- for (; input.isAdvanced(); input = prods[id]->getNext()) {
- sleepmillis(prng.nextInt32() % 20 + 1);
- ++docs;
+ for (; input.isAdvanced(); input = docSourceExchange->getNext()) {
+ sleepmillis(prng.nextInt32() % 20 + 1);
+ ++docs;
- // The consumer 1 bails out early wihout consuming all its documents.
- if (id == 1 && docs == 100) {
- // Pretend we have seen all docs.
- docs = nDocs / nConsumers;
+ // The consumer 1 bails out early wihout consuming all its documents.
+ if (id == 1 && docs == 100) {
+ // Pretend we have seen all docs.
+ docs = nDocs / nConsumers;
- prods[id]->dispose();
- break;
- }
+ docSourceExchange->dispose();
+ break;
}
- ASSERT_EQ(docs, nDocs / nConsumers);
- });
+ }
+ ASSERT_EQ(docs, nDocs / nConsumers);
+ });
handles.emplace_back(std::move(handle.getValue()));
}
@@ -251,20 +298,16 @@ TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
-
- for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
- }
-
+ std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
for (size_t id = 0; id < nConsumers; ++id) {
+ DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get();
auto handle = _executor->scheduleWork(
- [prods, id, nDocs](const executor::TaskExecutor::CallbackArgs& cb) {
+ [docSourceExchange, id, nDocs](const executor::TaskExecutor::CallbackArgs& cb) {
size_t docs = 0;
- for (auto input = prods[id]->getNext(); input.isAdvanced();
- input = prods[id]->getNext()) {
+ for (auto input = docSourceExchange->getNext(); input.isAdvanced();
+ input = docSourceExchange->getNext()) {
++docs;
}
ASSERT_EQ(docs, nDocs);
@@ -302,30 +345,26 @@ TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
-
- for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
- }
-
+ std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
for (size_t id = 0; id < nConsumers; ++id) {
- auto handle = _executor->scheduleWork(
- [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) {
- size_t docs = 0;
- for (auto input = prods[id]->getNext(); input.isAdvanced();
- input = prods[id]->getNext()) {
- size_t value = input.getDocument()["a"].getInt();
+ DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get();
+ auto handle = _executor->scheduleWork([docSourceExchange, id, nDocs, nConsumers](
+ const executor::TaskExecutor::CallbackArgs& cb) {
+ size_t docs = 0;
+ for (auto input = docSourceExchange->getNext(); input.isAdvanced();
+ input = docSourceExchange->getNext()) {
+ size_t value = input.getDocument()["a"].getInt();
- ASSERT(value >= id * 100);
- ASSERT(value < (id + 1) * 100);
+ ASSERT(value >= id * 100);
+ ASSERT(value < (id + 1) * 100);
- ++docs;
- }
+ ++docs;
+ }
- ASSERT_EQ(docs, nDocs / nConsumers);
- });
+ ASSERT_EQ(docs, nDocs / nConsumers);
+ });
handles.emplace_back(std::move(handle.getValue()));
}
@@ -368,30 +407,26 @@ TEST_F(DocumentSourceExchangeTest, RangeShardingExchangeNConsumer) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
-
- for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
- }
-
+ std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
for (size_t id = 0; id < nConsumers; ++id) {
- auto handle = _executor->scheduleWork(
- [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) {
- size_t docs = 0;
- for (auto input = prods[id]->getNext(); input.isAdvanced();
- input = prods[id]->getNext()) {
- size_t value = input.getDocument()["a"].getInt();
+ DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get();
+ auto handle = _executor->scheduleWork([docSourceExchange, id, nDocs, nConsumers](
+ const executor::TaskExecutor::CallbackArgs& cb) {
+ size_t docs = 0;
+ for (auto input = docSourceExchange->getNext(); input.isAdvanced();
+ input = docSourceExchange->getNext()) {
+ size_t value = input.getDocument()["a"].getInt();
- ASSERT(value >= id * 100);
- ASSERT(value < (id + 1) * 100);
+ ASSERT(value >= id * 100);
+ ASSERT(value < (id + 1) * 100);
- ++docs;
- }
+ ++docs;
+ }
- ASSERT_EQ(docs, nDocs / nConsumers);
- });
+ ASSERT_EQ(docs, nDocs / nConsumers);
+ });
handles.emplace_back(std::move(handle.getValue()));
}
@@ -425,39 +460,35 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
-
- for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
- }
-
+ std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
AtomicWord<size_t> processedDocs{0};
for (size_t id = 0; id < nConsumers; ++id) {
- auto handle = _executor->scheduleWork(
- [prods, id, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) {
- PseudoRandom prng(getNewSeed());
+ DocumentSourceExchange* docSourceExchange = threads[id].documentSourceExchange.get();
+ auto handle = _executor->scheduleWork([docSourceExchange, id, &processedDocs](
+ const executor::TaskExecutor::CallbackArgs& cb) {
+ PseudoRandom prng(getNewSeed());
- auto input = prods[id]->getNext();
+ auto input = docSourceExchange->getNext();
- size_t docs = 0;
- for (; input.isAdvanced(); input = prods[id]->getNext()) {
- size_t value = input.getDocument()["a"].getInt();
+ size_t docs = 0;
+ for (; input.isAdvanced(); input = docSourceExchange->getNext()) {
+ size_t value = input.getDocument()["a"].getInt();
- ASSERT(value >= id * 100);
- ASSERT(value < (id + 1) * 100);
+ ASSERT(value >= id * 100);
+ ASSERT(value < (id + 1) * 100);
- ++docs;
+ ++docs;
- // This helps randomizing thread scheduling forcing different threads to load
- // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
- // reproducibility.
- sleepmillis(prng.nextInt32() % 50 + 1);
- }
- processedDocs.fetchAndAdd(docs);
- });
+ // This helps randomizing thread scheduling forcing different threads to load
+ // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
+ // reproducibility.
+ sleepmillis(prng.nextInt32() % 50 + 1);
+ }
+ processedDocs.fetchAndAdd(docs);
+ });
handles.emplace_back(std::move(handle.getValue()));
}
@@ -497,39 +528,6 @@ TEST_F(DocumentSourceExchangeTest, RandomExchangeNConsumerResourceYielding) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- /**
- * This class is used for an Exchange consumer to temporarily relinquish control of a mutex
- * while it's blocked.
- */
- class MutexYielder : public ResourceYielder {
- public:
- MutexYielder(stdx::mutex* mutex) : _lock(*mutex, stdx::defer_lock) {}
-
- void yield(OperationContext* opCtx) override {
- _lock.unlock();
- }
-
- void unyield(OperationContext* opCtx) override {
- _lock.lock();
- }
-
- stdx::unique_lock<stdx::mutex>& getLock() {
- return _lock;
- }
-
- private:
- stdx::unique_lock<stdx::mutex> _lock;
- };
-
- /**
- * Used to keep track of each client and operation context.
- */
- struct ThreadInfo {
- ServiceContext::UniqueClient client;
- ServiceContext::UniqueOperationContext opCtx;
- boost::intrusive_ptr<DocumentSourceExchange> documentSourceExchange;
- MutexYielder* yielder;
- };
std::vector<ThreadInfo> threads;
for (size_t idx = 0; idx < nConsumers; ++idx) {
@@ -554,27 +552,27 @@ TEST_F(DocumentSourceExchangeTest, RandomExchangeNConsumerResourceYielding) {
for (size_t id = 0; id < nConsumers; ++id) {
ThreadInfo* threadInfo = &threads[id];
- auto handle = _executor->scheduleWork(
- [threadInfo, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) {
- DocumentSourceExchange* exchange = threadInfo->documentSourceExchange.get();
- const auto getNext = [exchange, threadInfo]() {
- // Will acquire 'artificalGlobalMutex'. Within getNext() it will be released and
- // reacquired by the MutexYielder if the Exchange has to block.
- threadInfo->yielder->getLock().lock();
- auto res = exchange->getNext();
- threadInfo->yielder->getLock().unlock();
- return res;
- };
-
- for (auto input = getNext(); input.isAdvanced(); input = getNext()) {
- // This helps randomizing thread scheduling forcing different threads to load
- // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
- // reproducibility.
- PseudoRandom prng(getNewSeed());
- sleepmillis(prng.nextInt32() % 50 + 1);
- processedDocs.fetchAndAdd(1);
- }
- });
+ auto handle = _executor->scheduleWork([threadInfo, &processedDocs](
+ const executor::TaskExecutor::CallbackArgs& cb) {
+ DocumentSourceExchange* docSourceExchange = threadInfo->documentSourceExchange.get();
+ const auto getNext = [docSourceExchange, threadInfo]() {
+ // Will acquire 'artificalGlobalMutex'. Within getNext() it will be released and
+ // reacquired by the MutexYielder if the Exchange has to block.
+ threadInfo->yielder->getLock().lock();
+ auto res = docSourceExchange->getNext();
+ threadInfo->yielder->getLock().unlock();
+ return res;
+ };
+
+ for (auto input = getNext(); input.isAdvanced(); input = getNext()) {
+ // This helps randomizing thread scheduling forcing different threads to load
+ // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
+ // reproducibility.
+ PseudoRandom prng(getNewSeed());
+ sleepmillis(prng.nextInt32() % 50 + 1);
+ processedDocs.fetchAndAdd(1);
+ }
+ });
handles.emplace_back(std::move(handle.getValue()));
}
@@ -610,34 +608,29 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomHashExchangeNConsumer) {
boost::intrusive_ptr<Exchange> ex =
new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
- std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
-
- for (size_t idx = 0; idx < nConsumers; ++idx) {
- prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx, nullptr));
- }
-
+ std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
-
AtomicWord<size_t> processedDocs{0};
for (size_t id = 0; id < nConsumers; ++id) {
- auto handle = _executor->scheduleWork(
- [prods, id, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) {
- PseudoRandom prng(getNewSeed());
-
- auto input = prods[id]->getNext();
-
- size_t docs = 0;
- for (; input.isAdvanced(); input = prods[id]->getNext()) {
- ++docs;
-
- // This helps randomizing thread scheduling forcing different threads to load
- // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
- // reproducibility.
- sleepmillis(prng.nextInt32() % 50 + 1);
- }
- processedDocs.fetchAndAdd(docs);
- });
+ auto docSourceExchange = threads[id].documentSourceExchange.get();
+ auto handle = _executor->scheduleWork([docSourceExchange, id, &processedDocs](
+ const executor::TaskExecutor::CallbackArgs& cb) {
+ PseudoRandom prng(getNewSeed());
+
+ auto input = docSourceExchange->getNext();
+
+ size_t docs = 0;
+ for (; input.isAdvanced(); input = docSourceExchange->getNext()) {
+ ++docs;
+
+ // This helps randomizing thread scheduling forcing different threads to load
+ // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
+ // reproducibility.
+ sleepmillis(prng.nextInt32() % 50 + 1);
+ }
+ processedDocs.fetchAndAdd(docs);
+ });
handles.emplace_back(std::move(handle.getValue()));
}
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 16d14e1edf4..ff10abe4284 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -57,7 +57,7 @@ using std::vector;
DocumentSourceFacet::DocumentSourceFacet(std::vector<FacetPipeline> facetPipelines,
const intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx),
+ : DocumentSource(kStageName, expCtx),
_teeBuffer(TeeBuffer::create(facetPipelines.size())),
_facets(std::move(facetPipelines)) {
for (size_t facetId = 0; facetId < _facets.size(); ++facetId) {
@@ -179,9 +179,7 @@ void DocumentSourceFacet::doDispose() {
}
}
-DocumentSource::GetNextResult DocumentSourceFacet::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceFacet::doGetNext() {
if (_done) {
return GetNextResult::makeEOF();
}
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index 1a2c54a8649..4a6e0aecc5a 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -58,6 +58,7 @@ class NamespaceString;
*/
class DocumentSourceFacet final : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$facet"_sd;
struct FacetPipeline {
FacetPipeline(std::string name, std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
: name(std::move(name)), pipeline(std::move(pipeline)) {}
@@ -105,11 +106,6 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
- * Blocking call. Will consume all input and produces one output document.
- */
- GetNextResult getNext() final;
-
- /**
* Optimizes inner pipelines.
*/
boost::intrusive_ptr<DocumentSource> optimize() final;
@@ -120,7 +116,7 @@ public:
DepsTracker::State getDependencies(DepsTracker* deps) const final;
const char* getSourceName() const final {
- return "$facet";
+ return DocumentSourceFacet::kStageName.rawData();
}
/**
@@ -155,6 +151,10 @@ public:
bool usedDisk() final;
protected:
+ /**
+ * Blocking call. Will consume all input and produces one output document.
+ */
+ GetNextResult doGetNext() final;
void doDispose() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 5e8668b7de9..1ed94365190 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -244,7 +244,7 @@ public:
LookupRequirement::kAllowed};
}
- DocumentSource::GetNextResult getNext() final {
+ DocumentSource::GetNextResult doGetNext() final {
return pSource->getNext();
}
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index 6df14d1e05a..8e1067751b6 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -43,7 +43,6 @@ namespace mongo {
using boost::intrusive_ptr;
constexpr StringData DocumentSourceGeoNear::kKeyFieldName;
-constexpr const char* DocumentSourceGeoNear::kStageName;
REGISTER_DOCUMENT_SOURCE(geoNear,
LiteParsedDocumentSourceDefault::parse,
@@ -234,7 +233,7 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con
}
DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {}
+ : DocumentSource(kStageName, pExpCtx), coordsIsArray(false), spherical(false) {}
boost::optional<DocumentSource::DistributedPlanLogic>
DocumentSourceGeoNear::distributedPlanLogic() {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index cbb490dd807..ae72b3fc027 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -37,7 +37,7 @@ namespace mongo {
class DocumentSourceGeoNear : public DocumentSource {
public:
static constexpr StringData kKeyFieldName = "key"_sd;
- static constexpr auto kStageName = "$geoNear";
+ static constexpr StringData kStageName = "$geoNear"_sd;
/**
* Only exposed for testing.
@@ -46,7 +46,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>&);
const char* getSourceName() const final {
- return kStageName;
+ return DocumentSourceGeoNear::kStageName.rawData();
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -63,9 +63,8 @@ public:
* DocumentSourceGeoNear should always be replaced by a DocumentSourceGeoNearCursor before
* executing a pipeline, so this method should never be called.
*/
- GetNextResult getNext() final {
- // TODO: Replace with a MONGO_UNREACHABLE as part of SERVER-38995.
- uasserted(51048, "DocumentSourceGeoNear's getNext should never be called");
+ GetNextResult doGetNext() final {
+ MONGO_UNREACHABLE;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
@@ -133,7 +132,6 @@ public:
*/
boost::optional<DistributedPlanLogic> distributedPlanLogic() final;
-
private:
explicit DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_geo_near_cursor.cpp b/src/mongo/db/pipeline/document_source_geo_near_cursor.cpp
index 6a369e84a69..24253846166 100644
--- a/src/mongo/db/pipeline/document_source_geo_near_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near_cursor.cpp
@@ -49,7 +49,6 @@
#include "mongo/db/query/plan_executor.h"
namespace mongo {
-constexpr const char* DocumentSourceGeoNearCursor::kStageName;
boost::intrusive_ptr<DocumentSourceGeoNearCursor> DocumentSourceGeoNearCursor::create(
Collection* collection,
@@ -81,7 +80,7 @@ DocumentSourceGeoNearCursor::DocumentSourceGeoNearCursor(
}
const char* DocumentSourceGeoNearCursor::getSourceName() const {
- return kStageName;
+ return DocumentSourceGeoNearCursor::kStageName.rawData();
}
Document DocumentSourceGeoNearCursor::transformBSONObjToDocument(const BSONObj& obj) const {
diff --git a/src/mongo/db/pipeline/document_source_geo_near_cursor.h b/src/mongo/db/pipeline/document_source_geo_near_cursor.h
index c23e01dc146..52e8d1cc554 100644
--- a/src/mongo/db/pipeline/document_source_geo_near_cursor.h
+++ b/src/mongo/db/pipeline/document_source_geo_near_cursor.h
@@ -53,7 +53,7 @@ public:
/**
* The name of this stage.
*/
- static constexpr auto kStageName = "$geoNearCursor";
+ static constexpr StringData kStageName = "$geoNearCursor"_sd;
/**
* Create a new DocumentSourceGeoNearCursor. If specified, 'distanceMultiplier' must be
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index c96ac4b4114..ea781c2b1b9 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -84,12 +84,10 @@ REGISTER_DOCUMENT_SOURCE(graphLookup,
DocumentSourceGraphLookUp::createFromBson);
const char* DocumentSourceGraphLookUp::getSourceName() const {
- return "$graphLookup";
+ return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceGraphLookUp::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceGraphLookUp::doGetNext() {
if (_unwind) {
return getNextUnwound();
}
@@ -445,7 +443,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
boost::optional<FieldPath> depthField,
boost::optional<long long> maxDepth,
boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc)
- : DocumentSource(expCtx),
+ : DocumentSource(kStageName, expCtx),
_from(std::move(from)),
_as(std::move(as)),
_connectFromField(std::move(connectFromField)),
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index b36f79677c1..c9eccb39de9 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -39,6 +39,8 @@ namespace mongo {
class DocumentSourceGraphLookUp final : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$graphLookup"_sd;
+
class LiteParsed : public LiteParsedDocumentSourceForeignCollections {
public:
LiteParsed(NamespaceString foreignNss, PrivilegeVector privileges)
@@ -52,8 +54,6 @@ public:
static std::unique_ptr<LiteParsed> liteParse(const AggregationRequest& request,
const BSONElement& spec);
- GetNextResult getNext() final;
-
const char* getSourceName() const final;
const FieldPath& getConnectFromField() const {
@@ -130,6 +130,7 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
protected:
+ GetNextResult doGetNext() final;
void doDispose() final;
/**
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 6cfb08dfaf8..6ec80d8bbe5 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -132,9 +132,7 @@ const char* DocumentSourceGroup::getSourceName() const {
return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceGroup::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() {
if (!_initialized) {
const auto initializationResult = initialize();
if (initializationResult.isPaused()) {
@@ -328,7 +326,7 @@ intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create(
DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& pExpCtx,
boost::optional<size_t> maxMemoryUsageBytes)
- : DocumentSource(pExpCtx),
+ : DocumentSource(kStageName, pExpCtx),
_usedDisk(false),
_doingMerge(false),
_maxMemoryUsageBytes(maxMemoryUsageBytes ? *maxMemoryUsageBytes
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 31e29ea2954..fc4dc8c7515 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -96,7 +96,6 @@ public:
boost::intrusive_ptr<DocumentSource> optimize() final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- GetNextResult getNext() final;
const char* getSourceName() const final;
GetModPathsReturn getModifiedPaths() const final;
StringMap<boost::intrusive_ptr<Expression>> getIdFields() const;
@@ -131,7 +130,6 @@ public:
return constraints;
}
-
/**
* Add an accumulator, which will become a field in each Document that results from grouping.
*/
@@ -179,6 +177,7 @@ public:
const;
protected:
+ GetNextResult doGetNext() final;
void doDispose() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_index_stats.cpp b/src/mongo/db/pipeline/document_source_index_stats.cpp
index c24671624f6..7d02339794d 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_index_stats.cpp
@@ -44,12 +44,10 @@ REGISTER_DOCUMENT_SOURCE(indexStats,
DocumentSourceIndexStats::createFromBson);
const char* DocumentSourceIndexStats::getSourceName() const {
- return "$indexStats";
+ return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceIndexStats::doGetNext() {
if (_indexStatsMap.empty()) {
_indexStatsMap = pExpCtx->mongoProcessInterface->getIndexStats(pExpCtx->opCtx, pExpCtx->ns);
_indexStatsIter = _indexStatsMap.begin();
@@ -71,7 +69,7 @@ DocumentSource::GetNextResult DocumentSourceIndexStats::getNext() {
}
DocumentSourceIndexStats::DocumentSourceIndexStats(const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), _processName(getHostNameCachedAndPort()) {}
+ : DocumentSource(kStageName, pExpCtx), _processName(getHostNameCachedAndPort()) {}
intrusive_ptr<DocumentSource> DocumentSourceIndexStats::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 2493970e333..9198f817b33 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -40,6 +40,8 @@ namespace mongo {
*/
class DocumentSourceIndexStats final : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$indexStats"_sd;
+
class LiteParsed final : public LiteParsedDocumentSource {
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
@@ -66,7 +68,6 @@ public:
};
// virtuals from DocumentSource
- GetNextResult getNext() final;
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
@@ -92,6 +93,7 @@ public:
private:
DocumentSourceIndexStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ GetNextResult doGetNext() final;
CollectionIndexUsageMap _indexStatsMap;
CollectionIndexUsageMap::const_iterator _indexStatsIter;
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp
index 13a0c173424..f5729e4570a 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.cpp
@@ -55,8 +55,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalInhibitOptimization::
return new DocumentSourceInternalInhibitOptimization(expCtx);
}
-DocumentSource::GetNextResult DocumentSourceInternalInhibitOptimization::getNext() {
- pExpCtx->checkForInterrupt();
+DocumentSource::GetNextResult DocumentSourceInternalInhibitOptimization::doGetNext() {
return pSource->getNext();
}
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index 75f3e637a7d..2c13583a78a 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -47,7 +47,7 @@ public:
BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
DocumentSourceInternalInhibitOptimization(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx) {}
+ : DocumentSource(kStageName, expCtx) {}
const char* getSourceName() const final {
return kStageName.rawData();
@@ -67,9 +67,8 @@ public:
return boost::none;
}
- GetNextResult getNext() final;
-
private:
+ GetNextResult doGetNext() final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
};
diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp b/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp
index a548cb6366c..7beb9441feb 100644
--- a/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.cpp
@@ -46,11 +46,9 @@ namespace mongo {
DocumentSourceInternalShardFilter::DocumentSourceInternalShardFilter(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
std::unique_ptr<ShardFilterer> shardFilterer)
- : DocumentSource(pExpCtx), _shardFilterer(std::move(shardFilterer)) {}
-
-DocumentSource::GetNextResult DocumentSourceInternalShardFilter::getNext() {
- pExpCtx->checkForInterrupt();
+ : DocumentSource(kStageName, pExpCtx), _shardFilterer(std::move(shardFilterer)) {}
+DocumentSource::GetNextResult DocumentSourceInternalShardFilter::doGetNext() {
auto next = pSource->getNext();
invariant(_shardFilterer);
for (; next.isAdvanced(); next = pSource->getNext()) {
diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.h b/src/mongo/db/pipeline/document_source_internal_shard_filter.h
index 8fbfe6b07f1..2a13791b793 100644
--- a/src/mongo/db/pipeline/document_source_internal_shard_filter.h
+++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.h
@@ -63,8 +63,6 @@ public:
ChangeStreamRequirement::kBlacklist);
}
- GetNextResult getNext() override;
-
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
@@ -76,6 +74,8 @@ public:
Pipeline::SourceContainer* container) override;
private:
+ GetNextResult doGetNext() override;
+
std::unique_ptr<ShardFilterer> _shardFilterer;
};
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
index 0eb5a85f0d0..4857fac3d58 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
@@ -81,8 +81,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create
return new DocumentSourceInternalSplitPipeline(expCtx, mergeType);
}
-DocumentSource::GetNextResult DocumentSourceInternalSplitPipeline::getNext() {
- pExpCtx->checkForInterrupt();
+DocumentSource::GetNextResult DocumentSourceInternalSplitPipeline::doGetNext() {
return pSource->getNext();
}
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index d2d4b14e685..5e1580ce4e0 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -74,12 +74,12 @@ public:
: LookupRequirement::kAllowed};
}
- GetNextResult getNext() final;
-
private:
DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
HostTypeRequirement mergeType)
- : DocumentSource(expCtx), _mergeType(mergeType) {}
+ : DocumentSource(kStageName, expCtx), _mergeType(mergeType) {}
+
+ GetNextResult doGetNext() final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
HostTypeRequirement _mergeType = HostTypeRequirement::kNone;
diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp
index f8b2ca3c0aa..f5930738a41 100644
--- a/src/mongo/db/pipeline/document_source_limit.cpp
+++ b/src/mongo/db/pipeline/document_source_limit.cpp
@@ -44,7 +44,7 @@ using boost::intrusive_ptr;
DocumentSourceLimit::DocumentSourceLimit(const intrusive_ptr<ExpressionContext>& pExpCtx,
long long limit)
- : DocumentSource(pExpCtx), _limit(limit) {}
+ : DocumentSource(kStageName, pExpCtx), _limit(limit) {}
REGISTER_DOCUMENT_SOURCE(limit,
LiteParsedDocumentSourceDefault::parse,
@@ -70,9 +70,7 @@ Pipeline::SourceContainer::iterator DocumentSourceLimit::doOptimizeAt(
return std::next(itr);
}
-DocumentSource::GetNextResult DocumentSourceLimit::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceLimit::doGetNext() {
if (_nReturned >= _limit) {
return GetNextResult::makeEOF();
}
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index 3cecb165cf0..8bdd83686fe 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -59,7 +59,6 @@ public:
LookupRequirement::kAllowed};
}
- GetNextResult getNext() final;
const char* getSourceName() const final {
return kStageName.rawData();
}
@@ -96,6 +95,7 @@ public:
private:
DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
+ GetNextResult doGetNext() final;
long long _limit;
long long _nReturned = 0;
diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp
index 9195d0aa0aa..535d4ebb1e2 100644
--- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp
+++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp
@@ -42,11 +42,7 @@ REGISTER_TEST_DOCUMENT_SOURCE(listCachedAndActiveUsers,
DocumentSourceListCachedAndActiveUsers::LiteParsed::parse,
DocumentSourceListCachedAndActiveUsers::createFromBson);
-const char* DocumentSourceListCachedAndActiveUsers::kStageName = "$listCachedAndActiveUsers";
-
-DocumentSource::GetNextResult DocumentSourceListCachedAndActiveUsers::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceListCachedAndActiveUsers::doGetNext() {
if (!_users.empty()) {
const auto info = std::move(_users.back());
_users.pop_back();
@@ -75,7 +71,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceListCachedAndActiveUsers::cre
DocumentSourceListCachedAndActiveUsers::DocumentSourceListCachedAndActiveUsers(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), _users() {
+ : DocumentSource(kStageName, pExpCtx), _users() {
auto authMgr = AuthorizationManager::get(pExpCtx->opCtx->getServiceContext());
_users = authMgr->getUserCacheInfo();
}
diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
index 51dea36c162..3f297e25a49 100644
--- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
+++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
@@ -43,7 +43,7 @@ namespace mongo {
*/
class DocumentSourceListCachedAndActiveUsers final : public DocumentSource {
public:
- static const char* kStageName;
+ static constexpr StringData kStageName = "$listCachedAndActiveUsers"_sd;
class LiteParsed final : public LiteParsedDocumentSource {
public:
@@ -78,10 +78,8 @@ public:
}
};
- GetNextResult getNext() final;
-
const char* getSourceName() const final {
- return kStageName;
+ return kStageName.rawData();
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
@@ -112,6 +110,8 @@ public:
private:
DocumentSourceListCachedAndActiveUsers(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ GetNextResult doGetNext() final;
+
std::vector<AuthorizationManager::CachedUserInfo> _users;
};
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.cpp b/src/mongo/db/pipeline/document_source_list_local_sessions.cpp
index 53beab31a77..b388ee0e688 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.cpp
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.cpp
@@ -41,11 +41,7 @@ REGISTER_DOCUMENT_SOURCE(listLocalSessions,
DocumentSourceListLocalSessions::LiteParsed::parse,
DocumentSourceListLocalSessions::createFromBson);
-const char* DocumentSourceListLocalSessions::kStageName = "$listLocalSessions";
-
-DocumentSource::GetNextResult DocumentSourceListLocalSessions::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceListLocalSessions::doGetNext() {
while (!_ids.empty()) {
const auto& id = _ids.back();
_ids.pop_back();
@@ -75,7 +71,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceListLocalSessions::createFrom
DocumentSourceListLocalSessions::DocumentSourceListLocalSessions(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const ListSessionsSpec& spec)
- : DocumentSource(pExpCtx), _spec(spec) {
+ : DocumentSource(kStageName, pExpCtx), _spec(spec) {
const auto& opCtx = pExpCtx->opCtx;
_cache = LogicalSessionCache::get(opCtx);
if (_spec.getAllUsers()) {
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index 74403ac9d45..0cc7ba1a929 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -52,14 +52,15 @@ std::vector<SHA256Block> listSessionsUsersToDigests(const std::vector<ListSessio
*/
class DocumentSourceListLocalSessions final : public DocumentSource {
public:
- static const char* kStageName;
+ static constexpr StringData kStageName = "$listLocalSessions"_sd;
class LiteParsed final : public LiteParsedDocumentSource {
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return std::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec));
+ return std::make_unique<LiteParsed>(
+ listSessionsParseSpec(DocumentSourceListLocalSessions::kStageName, spec));
}
explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {}
@@ -82,7 +83,9 @@ public:
void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
uassert(ErrorCodes::InvalidOptions,
- str::stream() << "Aggregation stage " << kStageName << " cannot run with a "
+ str::stream() << "Aggregation stage "
+ << DocumentSourceListLocalSessions::kStageName
+ << " cannot run with a "
<< "readConcern other than 'local', or in a multi-document "
<< "transaction. Current readConcern: " << readConcern.toString(),
readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
@@ -92,10 +95,8 @@ public:
const ListSessionsSpec _spec;
};
- GetNextResult getNext() final;
-
const char* getSourceName() const final {
- return kStageName;
+ return DocumentSourceListLocalSessions::kStageName.rawData();
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
@@ -127,6 +128,8 @@ private:
DocumentSourceListLocalSessions(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
const ListSessionsSpec& spec);
+ GetNextResult doGetNext() final;
+
const ListSessionsSpec _spec;
const LogicalSessionCache* _cache;
std::vector<LogicalSessionId> _ids;
diff --git a/src/mongo/db/pipeline/document_source_list_sessions.cpp b/src/mongo/db/pipeline/document_source_list_sessions.cpp
index d6351375bd9..5eba38b1510 100644
--- a/src/mongo/db/pipeline/document_source_list_sessions.cpp
+++ b/src/mongo/db/pipeline/document_source_list_sessions.cpp
@@ -38,8 +38,6 @@
namespace mongo {
-const char* DocumentSourceListSessions::kStageName = "$listSessions";
-
REGISTER_DOCUMENT_SOURCE(listSessions,
DocumentSourceListSessions::LiteParsed::parse,
DocumentSourceListSessions::createFromBson);
@@ -75,7 +73,6 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceListSessions::createFromBson(
return new DocumentSourceListSessions(query, pExpCtx, spec.getAllUsers(), spec.getUsers());
}
-
Value DocumentSourceListSessions::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
ListSessionsSpec spec;
diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h
index e8068747799..75e5bfd7fee 100644
--- a/src/mongo/db/pipeline/document_source_list_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_sessions.h
@@ -47,13 +47,14 @@ namespace mongo {
*/
class DocumentSourceListSessions final : public DocumentSourceMatch {
public:
- static const char* kStageName;
+ static constexpr StringData kStageName = "$listSessions"_sd;
class LiteParsed final : public LiteParsedDocumentSource {
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return std::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec));
+ return std::make_unique<LiteParsed>(
+ listSessionsParseSpec(DocumentSourceListSessions::kStageName, spec));
}
explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {}
@@ -79,7 +80,7 @@ public:
};
const char* getSourceName() const final {
- return kStageName;
+ return DocumentSourceListSessions::kStageName.rawData();
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index f14b3db0394..0c42c595cfe 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -53,7 +53,7 @@ constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth;
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx),
+ : DocumentSource(kStageName, expCtx),
_fromNs(std::move(fromNs)),
_as(std::move(as)),
_variables(expCtx->variables),
@@ -161,7 +161,7 @@ REGISTER_DOCUMENT_SOURCE(lookup,
DocumentSourceLookUp::createFromBson);
const char* DocumentSourceLookUp::getSourceName() const {
- return "$lookup";
+ return kStageName.rawData();
}
StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
@@ -227,9 +227,7 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const BSONArray& valu
} // namespace
-DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() {
if (_unwindSrc) {
return unwindResult();
}
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index d885dc5bcff..b618ec6230f 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -49,6 +49,7 @@ namespace mongo {
class DocumentSourceLookUp final : public DocumentSource {
public:
static constexpr size_t kMaxSubPipelineDepth = 20;
+ static constexpr StringData kStageName = "$lookup"_sd;
struct LetVariable {
LetVariable(std::string name, boost::intrusive_ptr<Expression> expression, Variables::Id id)
@@ -112,7 +113,6 @@ public:
const boost::optional<LiteParsedPipeline> _liteParsedPipeline;
};
- GetNextResult getNext() final;
const char* getSourceName() const final;
void serializeToArray(
std::vector<Value>& array,
@@ -220,6 +220,7 @@ public:
}
protected:
+ GetNextResult doGetNext() final;
void doDispose() final;
/**
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
index cb24b7b9ae8..1019454c133 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -51,9 +51,7 @@ Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType
}
} // namespace
-DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::doGetNext() {
auto input = pSource->getNext();
if (!input.isAdvanced()) {
return input;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index 83bad0c7ee5..40d50586025 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -93,11 +93,6 @@ public:
return DepsTracker::State::SEE_NEXT;
}
- /**
- * Performs the lookup to retrieve the full document.
- */
- GetNextResult getNext() final;
-
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
if (explain) {
return Value{Document{{kStageName, Document()}}};
@@ -111,7 +106,12 @@ public:
private:
DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx) {}
+ : DocumentSource(kStageName, expCtx) {}
+
+ /**
+ * Performs the lookup to retrieve the full document.
+ */
+ GetNextResult doGetNext() final;
/**
* Uses the "documentKey" field from 'updateOp' to look up the current version of the document.
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index 6ed37705edd..066330fa525 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -58,7 +58,7 @@ REGISTER_DOCUMENT_SOURCE(match,
DocumentSourceMatch::createFromBson);
const char* DocumentSourceMatch::getSourceName() const {
- return "$match";
+ return kStageName.rawData();
}
Value DocumentSourceMatch::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
@@ -80,9 +80,7 @@ intrusive_ptr<DocumentSource> DocumentSourceMatch::optimize() {
return this;
}
-DocumentSource::GetNextResult DocumentSourceMatch::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceMatch::doGetNext() {
// The user facing error should have been generated earlier.
massert(17309, "Should never call getNext on a $match stage with $text clause", !_isTextQuery);
@@ -497,7 +495,7 @@ DepsTracker::State DocumentSourceMatch::getDependencies(DepsTracker* deps) const
DocumentSourceMatch::DocumentSourceMatch(const BSONObj& query,
const intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx) {
+ : DocumentSource(kStageName, expCtx) {
rebuild(query);
}
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 4645a2e1937..c35cdfcaadf 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -40,6 +40,7 @@ namespace mongo {
class DocumentSourceMatch : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$match"_sd;
/**
* Convenience method for creating a $match stage.
*/
@@ -77,8 +78,6 @@ public:
*/
void rebuild(BSONObj filter);
- GetNextResult getNext() override;
-
boost::intrusive_ptr<DocumentSource> optimize() final;
const char* getSourceName() const override;
@@ -170,6 +169,7 @@ public:
}
protected:
+ GetNextResult doGetNext() override;
DocumentSourceMatch(const BSONObj& query,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
diff --git a/src/mongo/db/pipeline/document_source_merge.cpp b/src/mongo/db/pipeline/document_source_merge.cpp
index c4f7e864231..99c80f74ec9 100644
--- a/src/mongo/db/pipeline/document_source_merge.cpp
+++ b/src/mongo/db/pipeline/document_source_merge.cpp
@@ -322,7 +322,7 @@ DocumentSourceMerge::DocumentSourceMerge(NamespaceString outputNs,
boost::optional<std::vector<BSONObj>> pipeline,
std::set<FieldPath> mergeOnFields,
boost::optional<ChunkVersion> targetCollectionVersion)
- : DocumentSourceWriter(std::move(outputNs), expCtx),
+ : DocumentSourceWriter(kStageName.rawData(), std::move(outputNs), expCtx),
_targetCollectionVersion(targetCollectionVersion),
_descriptor(descriptor),
_pipeline(std::move(pipeline)),
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 738fc802cc4..ca882575585 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -58,11 +58,6 @@ public:
using DocumentSourceQueue::DocumentSourceQueue;
DocumentSourceMock(std::deque<GetNextResult>);
- GetNextResult getNext() override {
- invariant(!isDisposed);
- invariant(!isDetachedFromOpCtx);
- return DocumentSourceQueue::getNext();
- }
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override {
// Unlike the queue, it's okay to serialize this stage for testing purposes.
@@ -100,6 +95,12 @@ public:
bool isOptimized = false;
protected:
+ GetNextResult doGetNext() override {
+ invariant(!isDisposed);
+ invariant(!isDetachedFromOpCtx);
+ return DocumentSourceQueue::doGetNext();
+ }
+
void doDispose() override {
isDisposed = true;
}
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index b474083a5da..28241d82893 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -48,7 +48,6 @@ public:
using LiteParsedDocumentSourceForeignCollections::
LiteParsedDocumentSourceForeignCollections;
-
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec);
@@ -95,7 +94,7 @@ public:
private:
DocumentSourceOut(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceWriter(std::move(outputNs), expCtx) {}
+ : DocumentSourceWriter(kStageName.rawData(), std::move(outputNs), expCtx) {}
void initialize() override;
diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.cpp b/src/mongo/db/pipeline/document_source_plan_cache_stats.cpp
index dfa460c3f9f..3140a90f5df 100644
--- a/src/mongo/db/pipeline/document_source_plan_cache_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.cpp
@@ -33,8 +33,6 @@
namespace mongo {
-const char* DocumentSourcePlanCacheStats::kStageName = "$planCacheStats";
-
REGISTER_DOCUMENT_SOURCE(planCacheStats,
DocumentSourcePlanCacheStats::LiteParsed::parse,
DocumentSourcePlanCacheStats::createFromBson);
@@ -60,7 +58,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourcePlanCacheStats::createFromBso
DocumentSourcePlanCacheStats::DocumentSourcePlanCacheStats(
const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx) {}
+ : DocumentSource(kStageName, expCtx) {}
void DocumentSourcePlanCacheStats::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
@@ -93,7 +91,7 @@ Pipeline::SourceContainer::iterator DocumentSourcePlanCacheStats::doOptimizeAt(
return container->erase(itrToNext);
}
-DocumentSource::GetNextResult DocumentSourcePlanCacheStats::getNext() {
+DocumentSource::GetNextResult DocumentSourcePlanCacheStats::doGetNext() {
if (!_haveRetrievedStats) {
const auto matchExpr = _absorbedMatch ? _absorbedMatch->getMatchExpression() : nullptr;
_results = pExpCtx->mongoProcessInterface->getMatchingPlanCacheEntryStats(
diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
index 9e735f835af..f0cf248ae5b 100644
--- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h
+++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
@@ -36,7 +36,7 @@ namespace mongo {
class DocumentSourcePlanCacheStats final : public DocumentSource {
public:
- static const char* kStageName;
+ static constexpr StringData kStageName = "$planCacheStats"_sd;
class LiteParsed final : public LiteParsedDocumentSource {
public:
@@ -67,7 +67,8 @@ public:
void assertSupportsReadConcern(const repl::ReadConcernArgs& readConcern) const {
uassert(ErrorCodes::InvalidOptions,
- str::stream() << "Aggregation stage " << kStageName
+ str::stream() << "Aggregation stage "
+ << DocumentSourcePlanCacheStats::kStageName
<< " requires read concern local but found "
<< readConcern.toString(),
readConcern.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
@@ -82,8 +83,6 @@ public:
virtual ~DocumentSourcePlanCacheStats() = default;
- GetNextResult getNext() override;
-
StageConstraints constraints(
Pipeline::SplitState = Pipeline::SplitState::kUnsplit) const override {
StageConstraints constraints{StreamType::kStreaming,
@@ -105,7 +104,7 @@ public:
}
const char* getSourceName() const override {
- return kStageName;
+ return DocumentSourcePlanCacheStats::kStageName.rawData();
}
/**
@@ -122,6 +121,8 @@ public:
private:
DocumentSourcePlanCacheStats(const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ GetNextResult doGetNext() final;
+
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override {
MONGO_UNREACHABLE; // Should call serializeToArray instead.
diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp
index ad6b4a4b5e7..f37b1a1ced5 100644
--- a/src/mongo/db/pipeline/document_source_project.cpp
+++ b/src/mongo/db/pipeline/document_source_project.cpp
@@ -81,7 +81,7 @@ intrusive_ptr<DocumentSource> DocumentSourceProject::create(
throw;
}
}(),
- DocumentSourceProject::kStageName.rawData(),
+ kStageName,
isIndependentOfAnyCollection));
return project;
}
diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp
index 47a77709363..12b8157117c 100644
--- a/src/mongo/db/pipeline/document_source_queue.cpp
+++ b/src/mongo/db/pipeline/document_source_queue.cpp
@@ -40,13 +40,13 @@ boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create(
DocumentSourceQueue::DocumentSourceQueue(std::deque<GetNextResult> results,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx), _queue(std::move(results)) {}
+ : DocumentSource(kStageName, expCtx), _queue(std::move(results)) {}
const char* DocumentSourceQueue::getSourceName() const {
return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceQueue::getNext() {
+DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() {
if (_queue.empty()) {
return GetNextResult::makeEOF();
}
diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h
index 4f5a61f5fb8..e6a1fe5e207 100644
--- a/src/mongo/db/pipeline/document_source_queue.h
+++ b/src/mongo/db/pipeline/document_source_queue.h
@@ -52,7 +52,6 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx);
virtual ~DocumentSourceQueue() {}
- GetNextResult getNext() override;
const char* getSourceName() const override;
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override {
@@ -100,6 +99,7 @@ public:
}
protected:
+ GetNextResult doGetNext() override;
// Return documents from front of queue.
std::deque<GetNextResult> _queue;
};
diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp
index 7afc1eea75a..749041c231a 100644
--- a/src/mongo/db/pipeline/document_source_redact.cpp
+++ b/src/mongo/db/pipeline/document_source_redact.cpp
@@ -47,23 +47,21 @@ using std::vector;
DocumentSourceRedact::DocumentSourceRedact(const intrusive_ptr<ExpressionContext>& expCtx,
const intrusive_ptr<Expression>& expression)
- : DocumentSource(expCtx), _expression(expression) {}
+ : DocumentSource(kStageName, expCtx), _expression(expression) {}
REGISTER_DOCUMENT_SOURCE(redact,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceRedact::createFromBson);
const char* DocumentSourceRedact::getSourceName() const {
- return "$redact";
+ return kStageName.rawData();
}
static const Value descendVal = Value("descend"_sd);
static const Value pruneVal = Value("prune"_sd);
static const Value keepVal = Value("keep"_sd);
-DocumentSource::GetNextResult DocumentSourceRedact::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceRedact::doGetNext() {
auto nextInput = pSource->getNext();
for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
auto& variables = pExpCtx->variables;
@@ -192,7 +190,6 @@ intrusive_ptr<DocumentSource> DocumentSourceRedact::createFromBson(
variables.setValue(pruneId, pruneVal);
variables.setValue(keepId, keepVal);
-
return source;
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index 1e9c9784e12..a4409c2e081 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -36,7 +36,7 @@ namespace mongo {
class DocumentSourceRedact final : public DocumentSource {
public:
- GetNextResult getNext() final;
+ static constexpr StringData kStageName = "$redact"_sd;
const char* getSourceName() const final;
boost::intrusive_ptr<DocumentSource> optimize() final;
@@ -75,6 +75,8 @@ private:
DocumentSourceRedact(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const boost::intrusive_ptr<Expression>& previsit);
+ GetNextResult doGetNext() final;
+
// These both work over pExpCtx->variables.
boost::optional<Document> redactObject(const Document& root); // redacts CURRENT
Value redactValue(const Value& in, const Document& root);
diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp
index 3fe49d83f0b..4a1a29688c8 100644
--- a/src/mongo/db/pipeline/document_source_replace_root.cpp
+++ b/src/mongo/db/pipeline/document_source_replace_root.cpp
@@ -119,7 +119,7 @@ intrusive_ptr<DocumentSource> DocumentSourceReplaceRoot::createFromBson(
newRootExpression,
(stageName == kStageName) ? ReplaceRootTransformation::UserSpecifiedName::kReplaceRoot
: ReplaceRootTransformation::UserSpecifiedName::kReplaceWith),
- kStageName.toString(),
+ kStageName.rawData(),
isIndependentOfAnyCollection);
}
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 072b7a22d1c..ac9346a0d28 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -44,18 +44,16 @@ using boost::intrusive_ptr;
constexpr StringData DocumentSourceSample::kStageName;
DocumentSourceSample::DocumentSourceSample(const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), _size(0) {}
+ : DocumentSource(kStageName, pExpCtx), _size(0) {}
REGISTER_DOCUMENT_SOURCE(sample,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceSample::createFromBson);
-DocumentSource::GetNextResult DocumentSourceSample::getNext() {
+DocumentSource::GetNextResult DocumentSourceSample::doGetNext() {
if (_size == 0)
return GetNextResult::makeEOF();
- pExpCtx->checkForInterrupt();
-
if (!_sortStage->isPopulated()) {
// Exhaust source stage, add random metadata, and push all into sorter.
PseudoRandom& prng = pExpCtx->opCtx->getClient()->getPrng();
@@ -117,7 +115,6 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::createFromBson(
return sample;
}
-
boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSample::distributedPlanLogic() {
// On the merger we need to merge the pre-sorted documents by their random values, then limit to
// the number we need.
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index 987f8f8ba94..14c49978ee8 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -38,7 +38,6 @@ class DocumentSourceSample final : public DocumentSource {
public:
static constexpr StringData kStageName = "$sample"_sd;
- GetNextResult getNext() final;
const char* getSourceName() const final {
return kStageName.rawData();
}
@@ -70,6 +69,8 @@ public:
private:
explicit DocumentSourceSample(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ GetNextResult doGetNext() final;
+
long long _size;
// Uses a $sort stage to randomly sort the documents.
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
index cddae4ad571..ba2c99cbbfb 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
@@ -50,14 +50,14 @@ DocumentSourceSampleFromRandomCursor::DocumentSourceSampleFromRandomCursor(
long long size,
std::string idField,
long long nDocsInCollection)
- : DocumentSource(pExpCtx),
+ : DocumentSource(kStageName, pExpCtx),
_size(size),
_idField(std::move(idField)),
_seenDocs(pExpCtx->getValueComparator().makeUnorderedValueSet()),
_nDocsInColl(nDocsInCollection) {}
const char* DocumentSourceSampleFromRandomCursor::getSourceName() const {
- return "$sampleFromRandomCursor";
+ return kStageName.rawData();
}
namespace {
@@ -75,9 +75,7 @@ double smallestFromSampleOfUniform(PseudoRandom* prng, size_t N) {
}
} // namespace
-DocumentSource::GetNextResult DocumentSourceSampleFromRandomCursor::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceSampleFromRandomCursor::doGetNext() {
if (_seenDocs.size() >= static_cast<size_t>(_size))
return GetNextResult::makeEOF();
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
index 31f49bec10f..07d8f4cabfd 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
@@ -40,7 +40,7 @@ namespace mongo {
*/
class DocumentSourceSampleFromRandomCursor final : public DocumentSource {
public:
- GetNextResult getNext() final;
+ static constexpr StringData kStageName = "$sampleFromRandomCursor"_sd;
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
@@ -71,6 +71,8 @@ private:
std::string idField,
long long collectionSize);
+ GetNextResult doGetNext() final;
+
/**
* Keep asking for documents from the random cursor until it yields a new document. Errors if a
* a document is encountered without a value for '_idField', or if the random cursor keeps
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
index e6d1f3bd7f8..8cdfba006dd 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
@@ -39,7 +39,7 @@ constexpr StringData DocumentSourceSequentialDocumentCache::kStageName;
DocumentSourceSequentialDocumentCache::DocumentSourceSequentialDocumentCache(
const boost::intrusive_ptr<ExpressionContext>& expCtx, SequentialDocumentCache* cache)
- : DocumentSource(expCtx), _cache(cache) {
+ : DocumentSource(kStageName, expCtx), _cache(cache) {
invariant(_cache);
invariant(!_cache->isAbandoned());
@@ -48,12 +48,10 @@ DocumentSourceSequentialDocumentCache::DocumentSourceSequentialDocumentCache(
}
}
-DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() {
+DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::doGetNext() {
// Either we're reading from the cache, or we have an input source to build the cache from.
invariant(pSource || _cache->isServing());
- pExpCtx->checkForInterrupt();
-
if (_cacheIsEOF) {
return GetNextResult::makeEOF();
}
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index d8d2ba90db8..15381735974 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -47,7 +47,7 @@ public:
static constexpr StringData kStageName = "$sequentialCache"_sd;
const char* getSourceName() const final {
- return kStageName.rawData();
+ return DocumentSourceSequentialDocumentCache::kStageName.rawData();
}
StageConstraints constraints(Pipeline::SplitState pipeState) const {
@@ -68,8 +68,6 @@ public:
return boost::none;
}
- GetNextResult getNext() final;
-
static boost::intrusive_ptr<DocumentSourceSequentialDocumentCache> create(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx, SequentialDocumentCache* cache) {
return new DocumentSourceSequentialDocumentCache(pExpCtx, cache);
@@ -85,6 +83,7 @@ public:
}
protected:
+ GetNextResult doGetNext() final;
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
index 4575e9fa02b..633e2e55b1c 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
@@ -46,20 +46,18 @@ using boost::intrusive_ptr;
DocumentSourceSingleDocumentTransformation::DocumentSourceSingleDocumentTransformation(
const intrusive_ptr<ExpressionContext>& pExpCtx,
std::unique_ptr<TransformerInterface> parsedTransform,
- std::string name,
+ const StringData name,
bool isIndependentOfAnyCollection)
- : DocumentSource(pExpCtx),
+ : DocumentSource(name, pExpCtx),
_parsedTransform(std::move(parsedTransform)),
- _name(std::move(name)),
+ _name(name.toString()),
_isIndependentOfAnyCollection(isIndependentOfAnyCollection) {}
const char* DocumentSourceSingleDocumentTransformation::getSourceName() const {
return _name.c_str();
}
-DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceSingleDocumentTransformation::doGetNext() {
// Get the next input document.
auto input = pSource->getNext();
if (!input.isAdvanced()) {
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index f36287b0d37..6d5daa7171a 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -45,12 +45,12 @@ public:
DocumentSourceSingleDocumentTransformation(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
std::unique_ptr<TransformerInterface> parsedTransform,
- std::string name,
+ const StringData name,
bool independentOfAnyCollection);
// virtuals from DocumentSource
const char* getSourceName() const final;
- GetNextResult getNext() final;
+
boost::intrusive_ptr<DocumentSource> optimize() final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
@@ -90,6 +90,7 @@ public:
}
protected:
+ GetNextResult doGetNext() final;
void doDispose() final;
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp
index 143a796cdf6..8a867a27e4b 100644
--- a/src/mongo/db/pipeline/document_source_skip.cpp
+++ b/src/mongo/db/pipeline/document_source_skip.cpp
@@ -45,7 +45,7 @@ using boost::intrusive_ptr;
DocumentSourceSkip::DocumentSourceSkip(const intrusive_ptr<ExpressionContext>& pExpCtx,
long long nToSkip)
- : DocumentSource(pExpCtx), _nToSkip(nToSkip) {}
+ : DocumentSource(kStageName, pExpCtx), _nToSkip(nToSkip) {}
REGISTER_DOCUMENT_SOURCE(skip,
LiteParsedDocumentSourceDefault::parse,
@@ -53,9 +53,7 @@ REGISTER_DOCUMENT_SOURCE(skip,
constexpr StringData DocumentSourceSkip::kStageName;
-DocumentSource::GetNextResult DocumentSourceSkip::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceSkip::doGetNext() {
while (_nSkippedSoFar < _nToSkip) {
// For performance reasons, a streaming stage must not keep references to documents across
// calls to getNext(). Such stages must retrieve a result from their child and then release
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index fdd430ccf6c..46ab8f46041 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -61,8 +61,6 @@ public:
LookupRequirement::kAllowed};
}
- GetNextResult getNext() final;
-
const char* getSourceName() const final {
return kStageName.rawData();
}
@@ -101,6 +99,8 @@ private:
explicit DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
long long nToSkip);
+ GetNextResult doGetNext() final;
+
long long _nToSkip = 0;
long long _nSkippedSoFar = 0;
};
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 09afc8fc2e6..30d71711823 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -104,7 +104,7 @@ DocumentSourceSort::DocumentSourceSort(const boost::intrusive_ptr<ExpressionCont
const BSONObj& sortOrder,
uint64_t limit,
uint64_t maxMemoryUsageBytes)
- : DocumentSource(pExpCtx),
+ : DocumentSource(kStageName, pExpCtx),
_sortExecutor({{sortOrder, pExpCtx},
limit,
maxMemoryUsageBytes,
@@ -122,9 +122,7 @@ REGISTER_DOCUMENT_SOURCE(sort,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceSort::createFromBson);
-DocumentSource::GetNextResult DocumentSourceSort::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceSort::doGetNext() {
if (!_populated) {
const auto populationResult = populate();
if (populationResult.isPaused()) {
@@ -215,7 +213,6 @@ DepsTracker::State DocumentSourceSort::getDependencies(DepsTracker* deps) const
return DepsTracker::State::SEE_NEXT;
}
-
intrusive_ptr<DocumentSource> DocumentSourceSort::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(15973, "the $sort key specification must be an object", elem.type() == Object);
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 114c8f7445b..48b7fbc1095 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -44,8 +44,6 @@ class DocumentSourceSort final : public DocumentSource {
public:
static constexpr StringData kStageName = "$sort"_sd;
- GetNextResult getNext() final;
-
const char* getSourceName() const final {
return kStageName.rawData();
}
@@ -135,6 +133,7 @@ public:
}
protected:
+ GetNextResult doGetNext() final;
/**
* Attempts to absorb a subsequent $limit stage so that it can perform a top-k sort.
*/
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.cpp b/src/mongo/db/pipeline/document_source_tee_consumer.cpp
index c63aa579b08..59910b8f5ca 100644
--- a/src/mongo/db/pipeline/document_source_tee_consumer.cpp
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.cpp
@@ -45,7 +45,7 @@ using boost::intrusive_ptr;
DocumentSourceTeeConsumer::DocumentSourceTeeConsumer(const intrusive_ptr<ExpressionContext>& expCtx,
size_t facetId,
const intrusive_ptr<TeeBuffer>& bufferSource)
- : DocumentSource(expCtx), _facetId(facetId), _bufferSource(bufferSource) {}
+ : DocumentSource(kStageName, expCtx), _facetId(facetId), _bufferSource(bufferSource) {}
boost::intrusive_ptr<DocumentSourceTeeConsumer> DocumentSourceTeeConsumer::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -54,8 +54,7 @@ boost::intrusive_ptr<DocumentSourceTeeConsumer> DocumentSourceTeeConsumer::creat
return new DocumentSourceTeeConsumer(expCtx, facetId, bufferSource);
}
-DocumentSource::GetNextResult DocumentSourceTeeConsumer::getNext() {
- pExpCtx->checkForInterrupt();
+DocumentSource::GetNextResult DocumentSourceTeeConsumer::doGetNext() {
return _bufferSource->getNext(_facetId);
}
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h
index e9054678296..94078c9bcb0 100644
--- a/src/mongo/db/pipeline/document_source_tee_consumer.h
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.h
@@ -49,6 +49,7 @@ class Value;
*/
class DocumentSourceTeeConsumer : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$teeConsumer"_sd;
static boost::intrusive_ptr<DocumentSourceTeeConsumer> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
size_t facetId,
@@ -68,8 +69,6 @@ public:
return boost::none;
}
- GetNextResult getNext() final;
-
/**
* Returns SEE_NEXT, since it requires no fields, and changes nothing about the documents.
*/
@@ -80,6 +79,7 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
protected:
+ GetNextResult doGetNext() final;
void doDispose() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h
index ba535906ea6..89a91501d22 100644
--- a/src/mongo/db/pipeline/document_source_test_optimizations.h
+++ b/src/mongo/db/pipeline/document_source_test_optimizations.h
@@ -39,9 +39,12 @@ namespace mongo {
*/
class DocumentSourceTestOptimizations : public DocumentSource {
public:
- DocumentSourceTestOptimizations() : DocumentSource(new ExpressionContextForTest()) {}
+ static constexpr StringData kStageName = "$_internalTestOptimizations"_sd;
+ DocumentSourceTestOptimizations()
+ : DocumentSource(DocumentSourceTestOptimizations::kStageName,
+ new ExpressionContextForTest()) {}
virtual ~DocumentSourceTestOptimizations() = default;
- virtual GetNextResult getNext() override {
+ virtual GetNextResult doGetNext() override {
MONGO_UNREACHABLE;
}
virtual StageConstraints constraints(Pipeline::SplitState) const override {
diff --git a/src/mongo/db/pipeline/document_source_unwind.cpp b/src/mongo/db/pipeline/document_source_unwind.cpp
index 870394a277c..b1426d30de3 100644
--- a/src/mongo/db/pipeline/document_source_unwind.cpp
+++ b/src/mongo/db/pipeline/document_source_unwind.cpp
@@ -160,7 +160,7 @@ DocumentSourceUnwind::DocumentSourceUnwind(const intrusive_ptr<ExpressionContext
const FieldPath& fieldPath,
bool preserveNullAndEmptyArrays,
const boost::optional<FieldPath>& indexPath)
- : DocumentSource(pExpCtx),
+ : DocumentSource(kStageName, pExpCtx),
_unwindPath(fieldPath),
_preserveNullAndEmptyArrays(preserveNullAndEmptyArrays),
_indexPath(indexPath),
@@ -171,7 +171,7 @@ REGISTER_DOCUMENT_SOURCE(unwind,
DocumentSourceUnwind::createFromBson);
const char* DocumentSourceUnwind::getSourceName() const {
- return "$unwind";
+ return kStageName.rawData();
}
intrusive_ptr<DocumentSourceUnwind> DocumentSourceUnwind::create(
@@ -187,9 +187,7 @@ intrusive_ptr<DocumentSourceUnwind> DocumentSourceUnwind::create(
return source;
}
-DocumentSource::GetNextResult DocumentSourceUnwind::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceUnwind::doGetNext() {
auto nextOut = _unwinder->getNext();
while (nextOut.isEOF()) {
// No more elements in array currently being unwound. This will loop if the input
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index 4cef6983564..c3254179e88 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -36,9 +36,9 @@ namespace mongo {
class DocumentSourceUnwind final : public DocumentSource {
public:
- // virtuals from DocumentSource
- GetNextResult getNext() final;
+ static constexpr StringData kStageName = "$unwind"_sd;
+ // virtuals from DocumentSource
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
@@ -97,6 +97,8 @@ private:
bool includeNullIfEmptyOrMissing,
const boost::optional<FieldPath>& includeArrayIndex);
+ GetNextResult doGetNext() final;
+
// Configuration state.
const FieldPath _unwindPath;
// Documents that have a nullish value, or an empty array for the field '_unwindPath', will pass
diff --git a/src/mongo/db/pipeline/document_source_writer.h b/src/mongo/db/pipeline/document_source_writer.h
index ada2fc72a53..dd5dd0c37a5 100644
--- a/src/mongo/db/pipeline/document_source_writer.h
+++ b/src/mongo/db/pipeline/document_source_writer.h
@@ -89,9 +89,10 @@ public:
using BatchObject = B;
using BatchedObjects = std::vector<BatchObject>;
- DocumentSourceWriter(NamespaceString outputNs,
+ DocumentSourceWriter(const char* stageName,
+ NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(expCtx),
+ : DocumentSource(stageName, expCtx),
_outputNs(std::move(outputNs)),
_writeConcern(expCtx->opCtx->getWriteConcern()) {}
@@ -119,9 +120,8 @@ public:
return _outputNs;
}
- GetNextResult getNext() final override;
-
protected:
+ GetNextResult doGetNext() final override;
/**
* Prepares the stage to be able to write incoming batches.
*/
@@ -165,9 +165,7 @@ private:
};
template <typename B>
-DocumentSource::GetNextResult DocumentSourceWriter<B>::getNext() {
- pExpCtx->checkForInterrupt();
-
+DocumentSource::GetNextResult DocumentSourceWriter<B>::doGetNext() {
if (_done) {
return GetNextResult::makeEOF();
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index fb4f3d988ee..3dc8dc79602 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -35,6 +35,7 @@
#include <boost/intrusive_ptr.hpp>
+#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/matcher/expression_parser.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/dependencies.h"
@@ -252,7 +253,6 @@ public:
*/
DepsTracker getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const;
-
const SourceContainer& getSources() const {
return _sources;
}
diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp
index d48c6ca804a..cbdb2fc033d 100644
--- a/src/mongo/s/query/document_source_merge_cursors.cpp
+++ b/src/mongo/s/query/document_source_merge_cursors.cpp
@@ -50,7 +50,7 @@ DocumentSourceMergeCursors::DocumentSourceMergeCursors(
AsyncResultsMergerParams armParams,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<BSONObj> ownedParamsSpec)
- : DocumentSource(expCtx),
+ : DocumentSource(kStageName, expCtx),
_armParamsObj(std::move(ownedParamsSpec)),
_executor(std::move(executor)),
_armParams(std::move(armParams)) {}
@@ -95,7 +95,7 @@ std::unique_ptr<RouterStageMerge> DocumentSourceMergeCursors::convertToRouterSta
return std::make_unique<RouterStageMerge>(pExpCtx->opCtx, _executor, std::move(*_armParams));
}
-DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
+DocumentSource::GetNextResult DocumentSourceMergeCursors::doGetNext() {
if (!_blockingResultsMerger) {
populateMerger();
}
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
index f8e25299260..f979c13a00c 100644
--- a/src/mongo/s/query/document_source_merge_cursors.h
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -101,8 +101,6 @@ public:
return boost::none;
}
- GetNextResult getNext() final;
-
std::size_t getNumRemotes() const;
/**
@@ -148,6 +146,7 @@ public:
}
protected:
+ GetNextResult doGetNext() final;
void doDispose() final;
private:
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp
index 00a8971454c..d94a99518b7 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.cpp
+++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp
@@ -66,13 +66,13 @@ DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
std::vector<ShardId>&& shardsWithCursors,
BSONObj cmdToRunOnNewShards)
- : DocumentSource(expCtx),
+ : DocumentSource(kStageName, expCtx),
_executor(std::move(executor)),
_mergeCursors(mergeCursors),
_shardsWithCursors(std::move(shardsWithCursors)),
_cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
-DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::getNext() {
+DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() {
auto childResult = pSource->getNext();
while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) {
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h
index e8f2473b432..0b41fde92d1 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.h
+++ b/src/mongo/s/query/document_source_update_on_add_shard.h
@@ -47,6 +47,8 @@ namespace mongo {
*/
class DocumentSourceUpdateOnAddShard final : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$_internalUpdateOnAddShard"_sd;
+
/**
* Creates a new stage which will establish a new cursor and add it to the cursors being merged
* by 'mergeCursorsStage' whenever a new shard is detected by a change stream.
@@ -79,8 +81,6 @@ public:
return boost::none;
}
- GetNextResult getNext() final;
-
private:
DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&,
std::shared_ptr<executor::TaskExecutor> executor,
@@ -88,6 +88,8 @@ private:
std::vector<ShardId>&& shardsWithCursors,
BSONObj cmdToRunOnNewShards);
+ GetNextResult doGetNext() final;
+
/**
* Establish the new cursors and tell the RouterStageMerge about them.
*/