summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMindaugas Malinauskas <mindaugas.malinauskas@mongodb.com>2021-02-03 10:52:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-10 13:01:50 +0000
commit159a29e1e69948106f5bc601042b1baecf6fe982 (patch)
treef9ba9d588363c4f680f5764617eac266e7df4b33 /src/mongo/db/pipeline
parentbfbe088aabd4716348eb17602f669613f9ce6b45 (diff)
downloadmongo-159a29e1e69948106f5bc601042b1baecf6fe982.tar.gz
SERVER-53534 Support including postBatchResumeToken in cursor response for non-change streams aggregations
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/aggregation_request_helper.cpp39
-rw-r--r--src/mongo/db/pipeline/aggregation_request_helper.h7
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h10
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp14
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h6
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp78
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.h40
9 files changed, 193 insertions, 40 deletions
diff --git a/src/mongo/db/pipeline/aggregation_request_helper.cpp b/src/mongo/db/pipeline/aggregation_request_helper.cpp
index 971d1afb75e..d710e73597a 100644
--- a/src/mongo/db/pipeline/aggregation_request_helper.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_helper.cpp
@@ -49,7 +49,9 @@ namespace aggregation_request_helper {
/**
* Validate the aggregate command object.
*/
-void validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> explainVerbosity);
+void validate(const BSONObj& cmdObj,
+ const NamespaceString& nss,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity);
AggregateCommand parseFromBSON(const std::string& dbName,
const BSONObj& cmdObj,
@@ -109,8 +111,7 @@ AggregateCommand parseFromBSON(NamespaceString nss,
request.setExplain(explainVerbosity);
}
- validate(cmdObj, explainVerbosity);
-
+ validate(cmdObj, nss, explainVerbosity);
return request;
}
@@ -148,7 +149,9 @@ Document serializeToCommandDoc(const AggregateCommand& request) {
return Document(request.toBSON(BSONObj()).getOwned());
}
-void validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+void validate(const BSONObj& cmdObj,
+ const NamespaceString& nss,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
bool hasAllowDiskUseElem = cmdObj.hasField(AggregateCommand::kAllowDiskUseFieldName);
bool hasCursorElem = cmdObj.hasField(AggregateCommand::kCursorFieldName);
bool hasExplainElem = cmdObj.hasField(AggregateCommand::kExplainFieldName);
@@ -178,6 +181,19 @@ void validate(const BSONObj& cmdObj, boost::optional<ExplainOptions::Verbosity>
str::stream() << "The '" << AggregateCommand::kAllowDiskUseFieldName
<< "' option is not permitted in read-only mode.",
(!hasAllowDiskUseElem || !storageGlobalParams.readOnly));
+
+ auto requestReshardingResumeTokenElem =
+ cmdObj[AggregateCommand::kRequestReshardingResumeTokenFieldName];
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << AggregateCommand::kRequestReshardingResumeTokenFieldName
+ << " must be a boolean type",
+ !requestReshardingResumeTokenElem || requestReshardingResumeTokenElem.isBoolean());
+ bool hasRequestReshardingResumeToken =
+ requestReshardingResumeTokenElem && requestReshardingResumeTokenElem.boolean();
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << AggregateCommand::kRequestReshardingResumeTokenFieldName
+ << " must only be set for the oplog namespace, not " << nss,
+ !hasRequestReshardingResumeToken || nss.isOplog());
}
void validateRequestForAPIVersion(const OperationContext* opCtx, const AggregateCommand& request) {
@@ -205,6 +221,21 @@ void validateRequestForAPIVersion(const OperationContext* opCtx, const Aggregate
}
}
+PlanExecutorPipeline::ResumableScanType getResumableScanType(const AggregateCommand& request,
+ bool isChangeStream) {
+ // $changeStream cannot be run on the oplog, and $_requestReshardingResumeToken can only be run
+ // on the oplog. An aggregation request with both should therefore never reach this point.
+ tassert(5353400,
+ "$changeStream can't be combined with _requestReshardingResumeToken: true",
+ !(isChangeStream && request.getRequestReshardingResumeToken()));
+ if (isChangeStream) {
+ return PlanExecutorPipeline::ResumableScanType::kChangeStream;
+ }
+ if (request.getRequestReshardingResumeToken()) {
+ return PlanExecutorPipeline::ResumableScanType::kOplogScan;
+ }
+ return PlanExecutorPipeline::ResumableScanType::kNone;
+}
} // namespace aggregation_request_helper
// Custom serializers/deserializers for AggregateCommand.
diff --git a/src/mongo/db/pipeline/aggregation_request_helper.h b/src/mongo/db/pipeline/aggregation_request_helper.h
index c4cb3adbc48..2b2ec150e39 100644
--- a/src/mongo/db/pipeline/aggregation_request_helper.h
+++ b/src/mongo/db/pipeline/aggregation_request_helper.h
@@ -38,6 +38,7 @@
#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/exchange_spec_gen.h"
#include "mongo/db/pipeline/legacy_runtime_constants_gen.h"
+#include "mongo/db/pipeline/plan_executor_pipeline.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/idl/basic_types_gen.h"
@@ -119,6 +120,12 @@ BSONObj serializeToCommandObj(const AggregateCommand& request);
*/
void validateRequestForAPIVersion(const OperationContext* opCtx, const AggregateCommand& request);
+/**
+ * Returns the type of resumable scan required by this aggregation, if applicable. Otherwise returns
+ * ResumableScanType::kNone.
+ */
+PlanExecutorPipeline::ResumableScanType getResumableScanType(const AggregateCommand& request,
+ bool isChangeStream);
} // namespace aggregation_request_helper
/**
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index 3adb9219d20..cd776dc2d40 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -58,7 +58,7 @@ const Document kDefaultCursorOptionDocument{
TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
// Using oplog namespace so that validation of $_requestReshardingResumeToken succeeds.
- NamespaceString nss("oplog.rs");
+ NamespaceString nss("local.oplog.rs");
BSONObj inputBson = fromjson(
"{aggregate: 'oplog.rs', pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: "
"true, fromMongos: true, "
@@ -489,10 +489,19 @@ TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConce
.getStatus());
}
+TEST(AggregationRequestTest, ShouldRejectRequestReshardingResumeTokenIfNonBooleanType) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson(
+ "{aggregate: 'collection', pipeline: [], $_requestReshardingResumeToken: 'yes', $db: 'a', "
+ "cursor: {}}");
+ ASSERT_NOT_OK(aggregation_request_helper::parseFromBSONForTests(nss, inputBson).getStatus());
+}
+
TEST(AggregationRequestTest, ShouldRejectRequestReshardingResumeTokenIfNonOplogNss) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson(
- "{aggregate: 'collection', pipeline: [], $_requestReshardingResumeToken: true, $db: 'a'}");
+ "{aggregate: 'collection', pipeline: [], $_requestReshardingResumeToken: true, $db: 'a', "
+ "cursor: {}}");
ASSERT_NOT_OK(aggregation_request_helper::parseFromBSONForTests(nss, inputBson).getStatus());
}
@@ -561,14 +570,14 @@ TEST(AggregationRequestTest, ParseFromBSONOverloadsShouldProduceIdenticalRequest
TEST(AggregationRequestTest, ShouldRejectExchangeNotObject) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
- fromjson("{aggregate: 'collection', pipeline: [], exchage: '42', $db: 'a'}");
+ fromjson("{aggregate: 'collection', pipeline: [], exchange: '42', $db: 'a', cursor: {}}");
ASSERT_NOT_OK(aggregation_request_helper::parseFromBSONForTests(nss, inputBson).getStatus());
}
TEST(AggregationRequestTest, ShouldRejectExchangeInvalidSpec) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
- fromjson("{aggregate: 'collection', pipeline: [], exchage: {}, $db: 'a'}");
+ fromjson("{aggregate: 'collection', pipeline: [], exchange: {}, $db: 'a', cursor: {}}");
ASSERT_NOT_OK(aggregation_request_helper::parseFromBSONForTests(nss, inputBson).getStatus());
}
@@ -583,7 +592,8 @@ TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) {
TEST(AggregationRequestTest, ShouldRejectInvalidCollectionUUID) {
NamespaceString nss("a.collection");
const BSONObj inputBSON = fromjson(
- "{aggregate: 'collection', pipeline: [{$match: {}}], collectionUUID: 2, $db: 'a'}");
+ "{aggregate: 'collection', pipeline: [{$match: {}}], collectionUUID: 2, $db: 'a', cursor: "
+ "{}}");
ASSERT_EQUALS(
aggregation_request_helper::parseFromBSONForTests(nss, inputBSON).getStatus().code(),
ErrorCodes::TypeMismatch);
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index b16ae484d25..42390d5a3a9 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/query/find_common.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/resharding/resume_token_gen.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"
@@ -164,9 +165,10 @@ void DocumentSourceCursor::loadBatch() {
invariant(state == PlanExecutor::IS_EOF);
- // Special case for tailable cursor -- EOF doesn't preclude more results, so keep
- // the PlanExecutor alive.
- if (pExpCtx->isTailableAwaitData()) {
+ // Keep the inner PlanExecutor alive if the cursor is tailable, since more results may
+ // become available in the future, or if we are tracking the latest oplog timestamp, since
+ // we will need to retrieve the last timestamp the executor observed before hitting EOF.
+ if (_trackOplogTS || pExpCtx->isTailableAwaitData()) {
_exec->saveState();
return;
}
@@ -176,8 +178,8 @@ void DocumentSourceCursor::loadBatch() {
throw;
}
- // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we must
- // hold a collection lock to destroy '_exec'.
+ // If we got here, there won't be any more documents and we no longer need our PlanExecutor, so
+ // destroy it.
cleanupExecutor();
}
@@ -278,6 +280,13 @@ void DocumentSourceCursor::cleanupExecutor() {
}
}
+BSONObj DocumentSourceCursor::getPostBatchResumeToken() const {
+ if (_trackOplogTS) {
+ return ResumeTokenOplogTimestamp{getLatestOplogTimestamp()}.toBSON();
+ }
+ return BSONObj{};
+}
+
DocumentSourceCursor::~DocumentSourceCursor() {
if (pExpCtx->explain) {
invariant(_exec->isDisposed()); // _exec should have at least been disposed.
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 2d3a5332b10..27889466373 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -99,6 +99,12 @@ public:
return _latestOplogTimestamp;
}
+ /**
+ * Returns a postBatchResumeToken compatible with resharding oplog sync, if available.
+ * Otherwise, returns an empty object.
+ */
+ BSONObj getPostBatchResumeToken() const;
+
const std::string& getPlanSummaryStr() const {
return _planSummary;
}
@@ -246,8 +252,8 @@ private:
// True if we are tracking the latest observed oplog timestamp, false otherwise.
bool _trackOplogTS = false;
- // If we are tailing the oplog and tracking the latest observed oplog time, this is the latest
- // timestamp seen in the collection. Otherwise, this is a null timestamp.
+ // If we are tracking the latest observed oplog time, this is the latest timestamp seen in the
+ // oplog. Otherwise, this is a null timestamp.
Timestamp _latestOplogTimestamp;
// Specific stats for $cursor stage.
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index bf2923130ed..84d4905070c 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -589,9 +589,11 @@ PipelineD::buildInnerQueryExecutorGeneric(const CollectionPtr& collection,
? DocumentSourceCursor::CursorType::kEmptyDocuments
: DocumentSourceCursor::CursorType::kRegular;
- // If this is a change stream pipeline, make sure that we tell DSCursor to track the oplog time.
+ // If this is a change stream pipeline or a resharding resume token has been requested, make
+ // sure that we tell DSCursor to track the oplog time.
const bool trackOplogTS =
- (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage());
+ (pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage()) ||
+ (aggRequest && aggRequest->getRequestReshardingResumeToken());
auto attachExecutorCallback =
[cursorType, trackOplogTS](const CollectionPtr& collection,
@@ -805,4 +807,12 @@ Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) {
}
return Timestamp();
}
+
+BSONObj PipelineD::getPostBatchResumeToken(const Pipeline* pipeline) {
+ if (auto docSourceCursor =
+ dynamic_cast<DocumentSourceCursor*>(pipeline->_sources.front().get())) {
+ return docSourceCursor->getPostBatchResumeToken();
+ }
+ return BSONObj{};
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 1bfc2b77922..37c70d88f70 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -122,6 +122,12 @@ public:
static Timestamp getLatestOplogTimestamp(const Pipeline* pipeline);
/**
+ * Retrieves postBatchResumeToken from the 'pipeline' if it is available. Returns an empty
+ * object otherwise.
+ */
+ static BSONObj getPostBatchResumeToken(const Pipeline* pipeline);
+
+ /**
* Resolves the collator to either the user-specified collation or, if none was specified, to
* the collection-default collation.
*/
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index 3fcdc71cf61..01d31be0ff8 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -40,11 +40,11 @@ namespace mongo {
PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- bool isChangeStream)
+ ResumableScanType resumableScanType)
: _expCtx(std::move(expCtx)),
_pipeline(std::move(pipeline)),
_planExplainer{_pipeline.get()},
- _isChangeStream(isChangeStream) {
+ _resumableScanType{resumableScanType} {
// Pipeline plan executors must always have an ExpressionContext.
invariant(_expCtx);
@@ -53,13 +53,9 @@ PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContex
// again when it is destroyed.
_pipeline.get_deleter().dismissDisposal();
- if (_isChangeStream) {
- // Set _postBatchResumeToken to the initial PBRT that was added to the expression context
- // during pipeline construction, and use it to obtain the starting time for
- // _latestOplogTimestamp.
- invariant(!_expCtx->initialPostBatchResumeToken.isEmpty());
- _postBatchResumeToken = _expCtx->initialPostBatchResumeToken.getOwned();
- _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
+ if (ResumableScanType::kNone != resumableScanType) {
+ // For a resumable scan, set the initial _latestOplogTimestamp and _postBatchResumeToken.
+ _initializeResumableScanState();
}
}
@@ -121,18 +117,35 @@ boost::optional<Document> PlanExecutorPipeline::_getNext() {
_pipelineIsEof = true;
}
- if (_isChangeStream) {
- _performChangeStreamsAccounting(nextDoc);
+ if (ResumableScanType::kNone != _resumableScanType) {
+ _updateResumableScanState(nextDoc);
}
return nextDoc;
}
-void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional<Document> doc) {
- invariant(_isChangeStream);
+void PlanExecutorPipeline::_updateResumableScanState(const boost::optional<Document>& document) {
+ switch (_resumableScanType) {
+ case ResumableScanType::kChangeStream:
+ _performChangeStreamsAccounting(document);
+ break;
+ case ResumableScanType::kOplogScan:
+ _performResumableOplogScanAccounting();
+ break;
+ case ResumableScanType::kNone:
+ break;
+ default:
+ MONGO_UNREACHABLE_TASSERT(5353402);
+ }
+}
+
+void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional<Document>& doc) {
+ tassert(5353405,
+ "expected _resumableScanType == kChangeStream",
+ ResumableScanType::kChangeStream == _resumableScanType);
if (doc) {
// While we have more results to return, we track both the timestamp and the resume token of
// the latest event observed in the oplog, the latter via its sort key metadata field.
- _validateResumeToken(*doc);
+ _validateChangeStreamsResumeToken(*doc);
_latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
_postBatchResumeToken = doc->metadata().getSortKey().getDocument().toBson();
_setSpeculativeReadTimestamp();
@@ -152,7 +165,7 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional
}
}
-void PlanExecutorPipeline::_validateResumeToken(const Document& event) const {
+void PlanExecutorPipeline::_validateChangeStreamsResumeToken(const Document& event) const {
// If we are producing output to be merged on mongoS, then no stages can have modified the _id.
if (_expCtx->needsMerge) {
return;
@@ -175,6 +188,17 @@ void PlanExecutorPipeline::_validateResumeToken(const Document& event) const {
idField.binaryEqual(resumeToken.getDocument().toBson()));
}
+void PlanExecutorPipeline::_performResumableOplogScanAccounting() {
+ tassert(5353404,
+ "expected _resumableScanType == kOplogScan",
+ ResumableScanType::kOplogScan == _resumableScanType);
+
+ // Update values of latest oplog timestamp and postBatchResumeToken.
+ _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ _postBatchResumeToken = PipelineD::getPostBatchResumeToken(_pipeline.get());
+ _setSpeculativeReadTimestamp();
+}
+
void PlanExecutorPipeline::_setSpeculativeReadTimestamp() {
repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
repl::SpeculativeMajorityReadInfo::get(_expCtx->opCtx);
@@ -183,6 +207,30 @@ void PlanExecutorPipeline::_setSpeculativeReadTimestamp() {
}
}
+void PlanExecutorPipeline::_initializeResumableScanState() {
+ switch (_resumableScanType) {
+ case ResumableScanType::kChangeStream:
+ // Set _postBatchResumeToken to the initial PBRT that was added to the expression
+ // context during pipeline construction, and use it to obtain the starting time for
+ // _latestOplogTimestamp.
+ tassert(5353403,
+ "expected initialPostBatchResumeToken to be not empty",
+ !_expCtx->initialPostBatchResumeToken.isEmpty());
+ _postBatchResumeToken = _expCtx->initialPostBatchResumeToken.getOwned();
+ _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
+ break;
+ case ResumableScanType::kOplogScan:
+ // Initialize the oplog timestamp and postBatchResumeToken here in case the request has
+ // batchSize 0, in which case the PBRT of the first batch would be empty.
+ _performResumableOplogScanAccounting();
+ break;
+ case ResumableScanType::kNone:
+ break;
+ default:
+ MONGO_UNREACHABLE_TASSERT(5353401);
+ }
+}
+
void PlanExecutorPipeline::markAsKilled(Status killStatus) {
invariant(!killStatus.isOK());
// If killed multiple times, only retain the first status.
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h
index af3513315ed..7fbb0d677b4 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.h
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.h
@@ -43,9 +43,18 @@ namespace mongo {
*/
class PlanExecutorPipeline final : public PlanExecutor {
public:
+ /**
+ * Determines the type of resumable scan being run by the PlanExecutorPipeline.
+ */
+ enum class ResumableScanType {
+ kNone, // No resuming. This is the default.
+ kChangeStream, // For change stream pipelines.
+ kOplogScan // For non-changestream resumable oplog scans.
+ };
+
PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- bool isChangeStream);
+ ResumableScanType resumableScanType);
CanonicalQuery* getCanonicalQuery() const override {
return nullptr;
@@ -148,15 +157,27 @@ private:
boost::optional<Document> _getNext();
/**
+ * For a change stream or resumable oplog scan, updates the scan state based on the latest
+ * document returned by the underlying pipeline.
+ */
+ void _updateResumableScanState(const boost::optional<Document>& document);
+
+ /**
* If this is a change stream, advance the cluster time and post batch resume token based on the
* latest document returned by the underlying pipeline.
*/
- void _performChangeStreamsAccounting(const boost::optional<Document>);
+ void _performChangeStreamsAccounting(const boost::optional<Document>&);
/**
* Verifies that the docs's resume token has not been modified.
*/
- void _validateResumeToken(const Document& event) const;
+ void _validateChangeStreamsResumeToken(const Document& event) const;
+
+ /**
+ * For a non-changestream resumable oplog scan, updates the latest oplog timestamp and
+ * postBatchResumeToken value from the underlying pipeline.
+ */
+ void _performResumableOplogScanAccounting();
/**
* Set the speculative majority read timestamp if we have scanned up to a certain oplog
@@ -164,14 +185,17 @@ private:
*/
void _setSpeculativeReadTimestamp();
+ /**
+ * For a change stream or resumable oplog scan, initializes the scan state.
+ */
+ void _initializeResumableScanState();
+
boost::intrusive_ptr<ExpressionContext> _expCtx;
std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
PlanExplainerPipeline _planExplainer;
- const bool _isChangeStream;
-
std::queue<BSONObj> _stash;
// If _killStatus has a non-OK value, then we have been killed and the value represents the
@@ -182,8 +206,10 @@ private:
// pipeline has indicated end-of-stream.
bool _pipelineIsEof = false;
- // If '_pipeline' is a change stream, these track the latest timestamp seen while scanning the
- // oplog, as well as the most recent PBRT.
+ const ResumableScanType _resumableScanType{ResumableScanType::kNone};
+
+ // If '_pipeline' is a change stream or other resumable scan type, these track the latest
+ // timestamp seen while scanning the oplog, as well as the most recent PBRT.
Timestamp _latestOplogTimestamp;
BSONObj _postBatchResumeToken;
};