diff options
author | Mindaugas Malinauskas <mindaugas.malinauskas@mongodb.com> | 2021-02-03 10:52:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-10 13:01:50 +0000 |
commit | 159a29e1e69948106f5bc601042b1baecf6fe982 (patch) | |
tree | f9ba9d588363c4f680f5764617eac266e7df4b33 /src/mongo/db/pipeline | |
parent | bfbe088aabd4716348eb17602f669613f9ce6b45 (diff) | |
download | mongo-159a29e1e69948106f5bc601042b1baecf6fe982.tar.gz |
SERVER-53534 Support including postBatchResumeToken in cursor response for non-change streams aggregations
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_helper.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_helper.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request_test.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/plan_executor_pipeline.cpp | 78 | ||||
-rw-r--r-- | src/mongo/db/pipeline/plan_executor_pipeline.h | 40 |
9 files changed, 193 insertions, 40 deletions
diff --git a/src/mongo/db/pipeline/aggregation_request_helper.cpp b/src/mongo/db/pipeline/aggregation_request_helper.cpp index 971d1afb75e..d710e73597a 100644 --- a/src/mongo/db/pipeline/aggregation_request_helper.cpp +++ b/src/mongo/db/pipeline/aggregation_request_helper.cpp @@ -49,7 +49,9 @@ namespace aggregation_request_helper { /** * Validate the aggregate command object. */ -void validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> explainVerbosity); +void validate(const BSONObj& cmdObj, + const NamespaceString& nss, + boost::optional<ExplainOptions::Verbosity> explainVerbosity); AggregateCommand parseFromBSON(const std::string& dbName, const BSONObj& cmdObj, @@ -109,8 +111,7 @@ AggregateCommand parseFromBSON(NamespaceString nss, request.setExplain(explainVerbosity); } - validate(cmdObj, explainVerbosity); - + validate(cmdObj, nss, explainVerbosity); return request; } @@ -148,7 +149,9 @@ Document serializeToCommandDoc(const AggregateCommand& request) { return Document(request.toBSON(BSONObj()).getOwned()); } -void validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> explainVerbosity) { +void validate(const BSONObj& cmdObj, + const NamespaceString& nss, + boost::optional<ExplainOptions::Verbosity> explainVerbosity) { bool hasAllowDiskUseElem = cmdObj.hasField(AggregateCommand::kAllowDiskUseFieldName); bool hasCursorElem = cmdObj.hasField(AggregateCommand::kCursorFieldName); bool hasExplainElem = cmdObj.hasField(AggregateCommand::kExplainFieldName); @@ -178,6 +181,19 @@ void validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> str::stream() << "The '" << AggregateCommand::kAllowDiskUseFieldName << "' option is not permitted in read-only mode.", (!hasAllowDiskUseElem || !storageGlobalParams.readOnly)); + + auto requestReshardingResumeTokenElem = + cmdObj[AggregateCommand::kRequestReshardingResumeTokenFieldName]; + uassert(ErrorCodes::FailedToParse, + str::stream() << AggregateCommand::kRequestReshardingResumeTokenFieldName + << " must be a boolean type", + !requestReshardingResumeTokenElem || requestReshardingResumeTokenElem.isBoolean()); + bool hasRequestReshardingResumeToken = + requestReshardingResumeTokenElem && requestReshardingResumeTokenElem.boolean(); + uassert(ErrorCodes::FailedToParse, + str::stream() << AggregateCommand::kRequestReshardingResumeTokenFieldName + << " must only be set for the oplog namespace, not " << nss, + !hasRequestReshardingResumeToken || nss.isOplog()); } void validateRequestForAPIVersion(const OperationContext* opCtx, const AggregateCommand& request) { @@ -205,6 +221,21 @@ void validateRequestForAPIVersion(const OperationContext* opCtx, const Aggregate } } +PlanExecutorPipeline::ResumableScanType getResumableScanType(const AggregateCommand& request, + bool isChangeStream) { + // $changeStream cannot be run on the oplog, and $_requestReshardingResumeToken can only be run + // on the oplog. An aggregation request with both should therefore never reach this point. + tassert(5353400, + "$changeStream can't be combined with _requestReshardingResumeToken: true", + !(isChangeStream && request.getRequestReshardingResumeToken())); + if (isChangeStream) { + return PlanExecutorPipeline::ResumableScanType::kChangeStream; + } + if (request.getRequestReshardingResumeToken()) { + return PlanExecutorPipeline::ResumableScanType::kOplogScan; + } + return PlanExecutorPipeline::ResumableScanType::kNone; +} } // namespace aggregation_request_helper // Custom serializers/deserializers for AggregateCommand. diff --git a/src/mongo/db/pipeline/aggregation_request_helper.h b/src/mongo/db/pipeline/aggregation_request_helper.h index c4cb3adbc48..2b2ec150e39 100644 --- a/src/mongo/db/pipeline/aggregation_request_helper.h +++ b/src/mongo/db/pipeline/aggregation_request_helper.h @@ -38,6 +38,7 @@ #include "mongo/db/pipeline/aggregate_command_gen.h" #include "mongo/db/pipeline/exchange_spec_gen.h" #include "mongo/db/pipeline/legacy_runtime_constants_gen.h" +#include "mongo/db/pipeline/plan_executor_pipeline.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/write_concern_options.h" #include "mongo/idl/basic_types_gen.h" @@ -119,6 +120,12 @@ BSONObj serializeToCommandObj(const AggregateCommand& request); */ void validateRequestForAPIVersion(const OperationContext* opCtx, const AggregateCommand& request); +/** + * Returns the type of resumable scan required by this aggregation, if applicable. Otherwise returns + * ResumableScanType::kNone. + */ +PlanExecutorPipeline::ResumableScanType getResumableScanType(const AggregateCommand& request, + bool isChangeStream); } // namespace aggregation_request_helper /** diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index 3adb9219d20..cd776dc2d40 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -58,7 +58,7 @@ const Document kDefaultCursorOptionDocument{ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { // Using oplog namespace so that validation of $_requestReshardingResumeToken succeeds. - NamespaceString nss("oplog.rs"); + NamespaceString nss("local.oplog.rs"); BSONObj inputBson = fromjson( "{aggregate: 'oplog.rs', pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: " "true, fromMongos: true, " @@ -489,10 +489,19 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce .getStatus()); } +TEST(AggregationRequestTest, ShouldRejectRequestReshardingResumeTokenIfNonBooleanType) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson( + "{aggregate: 'collection', pipeline: [], $_requestReshardingResumeToken: 'yes', $db: 'a', " + "cursor: {}}"); + ASSERT_NOT_OK(aggregation_request_helper::parseFromBSONForTests(nss, inputBson).getStatus()); +} + TEST(AggregationRequestTest, ShouldRejectRequestReshardingResumeTokenIfNonOplogNss) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson( - "{aggregate: 'collection', pipeline: [], $_requestReshardingResumeToken: true, $db: 'a'}"); + "{aggregate: 'collection', pipeline: [], $_requestReshardingResumeToken: true, $db: 'a', " + "cursor: {}}"); ASSERT_NOT_OK(aggregation_request_helper::parseFromBSONForTests(nss, inputBson).getStatus()); } @@ -561,14 +570,14 @@ TEST(AggregationRequestTest, ParseFromBSONOverloadsShouldProduceIdenticalRequest TEST(AggregationRequestTest, ShouldRejectExchangeNotObject) { NamespaceString nss("a.collection"); const BSONObj inputBson = - fromjson("{aggregate: 'collection', pipeline: [], exchage: '42', $db: 'a'}"); + fromjson("{aggregate: 'collection', pipeline: [], exchange: '42', $db: 'a', cursor: {}}"); ASSERT_NOT_OK(aggregation_request_helper::parseFromBSONForTests(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectExchangeInvalidSpec) { NamespaceString nss("a.collection"); const BSONObj inputBson = - fromjson("{aggregate: 'collection', pipeline: [], exchage: {}, $db: 'a'}"); + fromjson("{aggregate: 'collection', pipeline: [], exchange: {}, $db: 'a', cursor: {}}"); ASSERT_NOT_OK(aggregation_request_helper::parseFromBSONForTests(nss, inputBson).getStatus()); } @@ -583,7 +592,8 @@ TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) { TEST(AggregationRequestTest, ShouldRejectInvalidCollectionUUID) { NamespaceString nss("a.collection"); const BSONObj inputBSON = fromjson( - "{aggregate: 'collection', pipeline: [{$match: {}}], collectionUUID: 2, $db: 'a'}"); + "{aggregate: 'collection', pipeline: [{$match: {}}], collectionUUID: 2, $db: 'a', cursor: " + "{}}"); ASSERT_EQUALS( aggregation_request_helper::parseFromBSONForTests(nss, inputBSON).getStatus().code(), ErrorCodes::TypeMismatch); diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index b16ae484d25..42390d5a3a9 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -41,6 +41,7 @@ #include "mongo/db/query/find_common.h" #include "mongo/db/storage/storage_options.h" #include "mongo/logv2/log.h" +#include "mongo/s/resharding/resume_token_gen.h" #include "mongo/util/fail_point.h" #include "mongo/util/scopeguard.h" @@ -164,9 +165,10 @@ void DocumentSourceCursor::loadBatch() { invariant(state == PlanExecutor::IS_EOF); - // Special case for tailable cursor -- EOF doesn't preclude more results, so keep - // the PlanExecutor alive. - if (pExpCtx->isTailableAwaitData()) { + // Keep the inner PlanExecutor alive if the cursor is tailable, since more results may + // become available in the future, or if we are tracking the latest oplog timestamp, since + // we will need to retrieve the last timestamp the executor observed before hitting EOF. + if (_trackOplogTS || pExpCtx->isTailableAwaitData()) { _exec->saveState(); return; } @@ -176,8 +178,8 @@ void DocumentSourceCursor::loadBatch() { throw; } - // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we must - // hold a collection lock to destroy '_exec'. + // If we got here, there won't be any more documents and we no longer need our PlanExecutor, so + // destroy it. cleanupExecutor(); } @@ -278,6 +280,13 @@ void DocumentSourceCursor::cleanupExecutor() { } } +BSONObj DocumentSourceCursor::getPostBatchResumeToken() const { + if (_trackOplogTS) { + return ResumeTokenOplogTimestamp{getLatestOplogTimestamp()}.toBSON(); + } + return BSONObj{}; +} + DocumentSourceCursor::~DocumentSourceCursor() { if (pExpCtx->explain) { invariant(_exec->isDisposed()); // _exec should have at least been disposed. diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 2d3a5332b10..27889466373 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -99,6 +99,12 @@ public: return _latestOplogTimestamp; } + /** + * Returns a postBatchResumeToken compatible with resharding oplog sync, if available. + * Otherwise, returns an empty object. + */ + BSONObj getPostBatchResumeToken() const; + const std::string& getPlanSummaryStr() const { return _planSummary; } @@ -246,8 +252,8 @@ private: // True if we are tracking the latest observed oplog timestamp, false otherwise. bool _trackOplogTS = false; - // If we are tailing the oplog and tracking the latest observed oplog time, this is the latest - // timestamp seen in the collection. Otherwise, this is a null timestamp. + // If we are tracking the latest observed oplog time, this is the latest timestamp seen in the + // oplog. Otherwise, this is a null timestamp. Timestamp _latestOplogTimestamp; // Specific stats for $cursor stage. diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index bf2923130ed..84d4905070c 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -589,9 +589,11 @@ PipelineD::buildInnerQueryExecutorGeneric(const CollectionPtr& collection, ? DocumentSourceCursor::CursorType::kEmptyDocuments : DocumentSourceCursor::CursorType::kRegular; - // If this is a change stream pipeline, make sure that we tell DSCursor to track the oplog time. + // If this is a change stream pipeline or a resharding resume token has been requested, make + // sure that we tell DSCursor to track the oplog time. const bool trackOplogTS = - (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()); + (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) || + (aggRequest && aggRequest->getRequestReshardingResumeToken()); auto attachExecutorCallback = [cursorType, trackOplogTS](const CollectionPtr& collection, @@ -805,4 +807,12 @@ Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) { } return Timestamp(); } + +BSONObj PipelineD::getPostBatchResumeToken(const Pipeline* pipeline) { + if (auto docSourceCursor = + dynamic_cast<DocumentSourceCursor*>(pipeline->_sources.front().get())) { + return docSourceCursor->getPostBatchResumeToken(); + } + return BSONObj{}; +} } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 1bfc2b77922..37c70d88f70 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -122,6 +122,12 @@ public: static Timestamp getLatestOplogTimestamp(const Pipeline* pipeline); /** + * Retrieves postBatchResumeToken from the 'pipeline' if it is available. Returns an empty + * object otherwise. + */ + static BSONObj getPostBatchResumeToken(const Pipeline* pipeline); + + /** * Resolves the collator to either the user-specified collation or, if none was specified, to * the collection-default collation. */ diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index 3fcdc71cf61..01d31be0ff8 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -40,11 +40,11 @@ namespace mongo { PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - bool isChangeStream) + ResumableScanType resumableScanType) : _expCtx(std::move(expCtx)), _pipeline(std::move(pipeline)), _planExplainer{_pipeline.get()}, - _isChangeStream(isChangeStream) { + _resumableScanType{resumableScanType} { // Pipeline plan executors must always have an ExpressionContext. invariant(_expCtx); @@ -53,13 +53,9 @@ PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContex // again when it is destroyed. _pipeline.get_deleter().dismissDisposal(); - if (_isChangeStream) { - // Set _postBatchResumeToken to the initial PBRT that was added to the expression context - // during pipeline construction, and use it to obtain the starting time for - // _latestOplogTimestamp. - invariant(!_expCtx->initialPostBatchResumeToken.isEmpty()); - _postBatchResumeToken = _expCtx->initialPostBatchResumeToken.getOwned(); - _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime; + if (ResumableScanType::kNone != resumableScanType) { + // For a resumable scan, set the initial _latestOplogTimestamp and _postBatchResumeToken. + _initializeResumableScanState(); } } @@ -121,18 +117,35 @@ boost::optional<Document> PlanExecutorPipeline::_getNext() { _pipelineIsEof = true; } - if (_isChangeStream) { - _performChangeStreamsAccounting(nextDoc); + if (ResumableScanType::kNone != _resumableScanType) { + _updateResumableScanState(nextDoc); } return nextDoc; } -void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional<Document> doc) { - invariant(_isChangeStream); +void PlanExecutorPipeline::_updateResumableScanState(const boost::optional<Document>& document) { + switch (_resumableScanType) { + case ResumableScanType::kChangeStream: + _performChangeStreamsAccounting(document); + break; + case ResumableScanType::kOplogScan: + _performResumableOplogScanAccounting(); + break; + case ResumableScanType::kNone: + break; + default: + MONGO_UNREACHABLE_TASSERT(5353402); + } +} + +void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional<Document>& doc) { + tassert(5353405, + "expected _resumableScanType == kChangeStream", + ResumableScanType::kChangeStream == _resumableScanType); if (doc) { // While we have more results to return, we track both the timestamp and the resume token of // the latest event observed in the oplog, the latter via its sort key metadata field. - _validateResumeToken(*doc); + _validateChangeStreamsResumeToken(*doc); _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); _postBatchResumeToken = doc->metadata().getSortKey().getDocument().toBson(); _setSpeculativeReadTimestamp(); @@ -152,7 +165,7 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional } } -void PlanExecutorPipeline::_validateResumeToken(const Document& event) const { +void PlanExecutorPipeline::_validateChangeStreamsResumeToken(const Document& event) const { // If we are producing output to be merged on mongoS, then no stages can have modified the _id. if (_expCtx->needsMerge) { return; @@ -175,6 +188,17 @@ void PlanExecutorPipeline::_validateResumeToken(const Document& event) const { idField.binaryEqual(resumeToken.getDocument().toBson())); } +void PlanExecutorPipeline::_performResumableOplogScanAccounting() { + tassert(5353404, + "expected _resumableScanType == kOplogScan", + ResumableScanType::kOplogScan == _resumableScanType); + + // Update values of latest oplog timestamp and postBatchResumeToken. + _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + _postBatchResumeToken = PipelineD::getPostBatchResumeToken(_pipeline.get()); + _setSpeculativeReadTimestamp(); +} + void PlanExecutorPipeline::_setSpeculativeReadTimestamp() { repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo = repl::SpeculativeMajorityReadInfo::get(_expCtx->opCtx); @@ -183,6 +207,30 @@ void PlanExecutorPipeline::_setSpeculativeReadTimestamp() { } } +void PlanExecutorPipeline::_initializeResumableScanState() { + switch (_resumableScanType) { + case ResumableScanType::kChangeStream: + // Set _postBatchResumeToken to the initial PBRT that was added to the expression + // context during pipeline construction, and use it to obtain the starting time for + // _latestOplogTimestamp. + tassert(5353403, + "expected initialPostBatchResumeToken to be not empty", + !_expCtx->initialPostBatchResumeToken.isEmpty()); + _postBatchResumeToken = _expCtx->initialPostBatchResumeToken.getOwned(); + _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime; + break; + case ResumableScanType::kOplogScan: + // Initialize the oplog timestamp and postBatchResumeToken here in case the request has + // batchSize 0, in which case the PBRT of the first batch would be empty. + _performResumableOplogScanAccounting(); + break; + case ResumableScanType::kNone: + break; + default: + MONGO_UNREACHABLE_TASSERT(5353401); + } +} + void PlanExecutorPipeline::markAsKilled(Status killStatus) { invariant(!killStatus.isOK()); // If killed multiple times, only retain the first status. diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h index af3513315ed..7fbb0d677b4 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.h +++ b/src/mongo/db/pipeline/plan_executor_pipeline.h @@ -43,9 +43,18 @@ namespace mongo { */ class PlanExecutorPipeline final : public PlanExecutor { public: + /** + * Determines the type of resumable scan being run by the PlanExecutorPipeline. + */ + enum class ResumableScanType { + kNone, // No resuming. This is the default. + kChangeStream, // For change stream pipelines. + kOplogScan // For non-changestream resumable oplog scans. + }; + PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - bool isChangeStream); + ResumableScanType resumableScanType); CanonicalQuery* getCanonicalQuery() const override { return nullptr; @@ -148,15 +157,27 @@ private: boost::optional<Document> _getNext(); /** + * For a change stream or resumable oplog scan, updates the scan state based on the latest + * document returned by the underlying pipeline. + */ + void _updateResumableScanState(const boost::optional<Document>& document); + + /** * If this is a change stream, advance the cluster time and post batch resume token based on the * latest document returned by the underlying pipeline. */ - void _performChangeStreamsAccounting(const boost::optional<Document>); + void _performChangeStreamsAccounting(const boost::optional<Document>&); /** * Verifies that the docs's resume token has not been modified. */ - void _validateResumeToken(const Document& event) const; + void _validateChangeStreamsResumeToken(const Document& event) const; + + /** + * For a non-changestream resumable oplog scan, updates the latest oplog timestamp and + * postBatchResumeToken value from the underlying pipeline. + */ + void _performResumableOplogScanAccounting(); /** * Set the speculative majority read timestamp if we have scanned up to a certain oplog @@ -164,14 +185,17 @@ private: */ void _setSpeculativeReadTimestamp(); + /** + * For a change stream or resumable oplog scan, initializes the scan state. + */ + void _initializeResumableScanState(); + boost::intrusive_ptr<ExpressionContext> _expCtx; std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; PlanExplainerPipeline _planExplainer; - const bool _isChangeStream; - std::queue<BSONObj> _stash; // If _killStatus has a non-OK value, then we have been killed and the value represents the @@ -182,8 +206,10 @@ private: // pipeline has indicated end-of-stream. bool _pipelineIsEof = false; - // If '_pipeline' is a change stream, these track the latest timestamp seen while scanning the - // oplog, as well as the most recent PBRT. + const ResumableScanType _resumableScanType{ResumableScanType::kNone}; + + // If '_pipeline' is a change stream or other resumable scan type, these track the latest + // timestamp seen while scanning the oplog, as well as the most recent PBRT. Timestamp _latestOplogTimestamp; BSONObj _postBatchResumeToken; }; |