From 9f0897f74297a4ae415a3b5ebf482ff78afc2058 Mon Sep 17 00:00:00 2001 From: Bernard Gorman Date: Mon, 11 Apr 2022 17:09:06 +0000 Subject: SERVER-65282 Add $_generateV2ResumeTokens parameter to aggregate command --- src/mongo/db/commands/run_aggregate.cpp | 8 ++++++++ src/mongo/db/pipeline/aggregate_command.idl | 7 +++++++ .../db/pipeline/change_stream_event_transform.cpp | 19 ++++++++++++++----- src/mongo/db/pipeline/change_stream_event_transform.h | 9 ++++++--- .../db/pipeline/change_stream_helpers_legacy.cpp | 5 +++-- .../db/pipeline/document_source_change_stream.cpp | 9 ++++++--- src/mongo/db/pipeline/document_source_change_stream.h | 4 +++- ...document_source_change_stream_check_invalidate.cpp | 2 +- ...cument_source_change_stream_check_resumability.cpp | 2 +- ...urce_change_stream_ensure_resume_token_present.cpp | 2 +- ...nt_source_change_stream_handle_topology_change.cpp | 4 ++-- .../document_source_change_stream_oplog_match.cpp | 2 +- .../pipeline/document_source_change_stream_test.cpp | 2 +- .../document_source_change_stream_transform.cpp | 3 ++- .../document_source_check_resume_token_test.cpp | 4 +++- src/mongo/db/pipeline/expression_context.cpp | 3 +++ src/mongo/db/pipeline/expression_context.h | 3 +++ src/mongo/db/pipeline/plan_executor_pipeline.cpp | 3 ++- src/mongo/db/pipeline/resume_token.cpp | 7 +++---- src/mongo/db/pipeline/resume_token.h | 2 +- src/mongo/db/pipeline/sharded_agg_helpers.cpp | 17 +++++++++++++++++ src/mongo/db/query/cursor_response_test.cpp | 4 +++- src/mongo/s/query/async_results_merger_test.cpp | 5 ++++- src/mongo/s/query/cluster_aggregate.cpp | 8 ++++++++ 24 files changed, 103 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index e21bcfbffdf..127898b875e 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -442,6 +442,14 @@ boost::intrusive_ptr makeExpressionContext( expCtx->collationMatchesDefault = collationMatchesDefault; expCtx->forPerShardCursor = request.getPassthroughToShard().has_value(); + // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0 + // we only expect this to occur during testing. + // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false. + if (request.getGenerateV2ResumeTokens()) { + uassert(6528200, "Invalid request for v2 resume tokens", getTestCommandsEnabled()); + expCtx->changeStreamTokenVersion = 2; + } + return expCtx; } diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl index 0e69d47c6ba..b84513a30e6 100644 --- a/src/mongo/db/pipeline/aggregate_command.idl +++ b/src/mongo/db/pipeline/aggregate_command.idl @@ -261,6 +261,13 @@ commands: cpp_name: passthroughToShard optional: true unstable: true + # TODO SERVER-65370: after 6.0, $_generateV2ResumeTokens should be assumed true if absent. + # TODO SERVER-65369: $_generateV2ResumeTokens can be removed after 7.0. + $_generateV2ResumeTokens: + description: "Internal parameter to signal whether v2 resume tokens should be generated." + type: optionalBool + cpp_name: generateV2ResumeTokens + unstable: false encryptionInformation: description: "Encryption Information schema and other tokens for CRUD commands" type: EncryptionInformation diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index 1cd6fe39c74..1731a99f328 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -75,9 +75,12 @@ void setResumeTokenForEvent(const ResumeTokenData& resumeTokenData, MutableDocum } // namespace ChangeStreamEventTransformation::ChangeStreamEventTransformation( + const boost::intrusive_ptr& expCtx, const DocumentSourceChangeStreamSpec& spec) - : _changeStreamSpec(spec) { - _resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); + : _changeStreamSpec(spec), _expCtx(expCtx) { + // Extract the resume token from the spec and store it. + _resumeToken = + DocumentSourceChangeStream::resolveResumeTokenFromSpec(_expCtx, _changeStreamSpec); // Determine whether the user requested a point-in-time pre-image, which will affect this // stage's output. @@ -104,7 +107,7 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal, // If we have a resume token, we need to match the version with which it was generated until we // have surpassed it, at which point we can begin generating tokens with our default version. - auto version = (clusterTime > _resumeToken.clusterTime) ? ResumeTokenData::kDefaultTokenVersion + auto version = (clusterTime > _resumeToken.clusterTime) ? _expCtx->changeStreamTokenVersion : _resumeToken.version; // Construct and return the final resume token. @@ -114,7 +117,7 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal, ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation( const boost::intrusive_ptr& expCtx, const DocumentSourceChangeStreamSpec& spec) - : ChangeStreamEventTransformation(spec) { + : ChangeStreamEventTransformation(expCtx, spec) { _documentKeyCache = std::make_unique(expCtx, _resumeToken); } @@ -471,6 +474,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum return doc.freeze(); } +ChangeStreamViewDefinitionEventTransformation::ChangeStreamViewDefinitionEventTransformation( + const boost::intrusive_ptr& expCtx, + const DocumentSourceChangeStreamSpec& spec) + : ChangeStreamEventTransformation(expCtx, spec) {} + std::set ChangeStreamViewDefinitionEventTransformation::getFieldNameDependencies() const { return std::set{repl::OplogEntry::kOpTypeFieldName.toString(), @@ -541,7 +549,8 @@ ChangeStreamEventTransformer::ChangeStreamEventTransformer( const boost::intrusive_ptr& expCtx, const DocumentSourceChangeStreamSpec& spec) { _defaultEventBuilder = std::make_unique(expCtx, spec); - _viewNsEventBuilder = std::make_unique(spec); + _viewNsEventBuilder = + std::make_unique(expCtx, spec); _isSingleCollStream = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns) == DocumentSourceChangeStream::ChangeStreamType::kSingleCollection; } diff --git a/src/mongo/db/pipeline/change_stream_event_transform.h b/src/mongo/db/pipeline/change_stream_event_transform.h index 8bb24eaf2bd..a884fe1c672 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.h +++ b/src/mongo/db/pipeline/change_stream_event_transform.h @@ -40,7 +40,8 @@ namespace mongo { */ class ChangeStreamEventTransformation { public: - ChangeStreamEventTransformation(const DocumentSourceChangeStreamSpec& spec); + ChangeStreamEventTransformation(const boost::intrusive_ptr& expCtx, + const DocumentSourceChangeStreamSpec& spec); virtual ~ChangeStreamEventTransformation() {} @@ -64,6 +65,7 @@ protected: Value opDescription) const; const DocumentSourceChangeStreamSpec _changeStreamSpec; + boost::intrusive_ptr _expCtx; ResumeTokenData _resumeToken; // Set to true if the pre-image should be included in the output documents. @@ -94,8 +96,9 @@ private: */ class ChangeStreamViewDefinitionEventTransformation final : public ChangeStreamEventTransformation { public: - ChangeStreamViewDefinitionEventTransformation(const DocumentSourceChangeStreamSpec& spec) - : ChangeStreamEventTransformation(spec) {} + ChangeStreamViewDefinitionEventTransformation( + const boost::intrusive_ptr& expCtx, + const DocumentSourceChangeStreamSpec& spec); Document applyTransformation(const Document& fromDoc) const override; diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 07678a0235a..af7cd71034f 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -64,8 +64,9 @@ std::list> buildPipeline( if (!userRequestedResumePoint) { // Make sure we update the 'resumeAfter' in the 'spec' so that we serialize the // correct resume token when sending it to the shards. - spec.setResumeAfter(ResumeToken::makeHighWaterMarkToken( - DocumentSourceChangeStream::getStartTimeForNewStream(expCtx))); + auto clusterTime = DocumentSourceChangeStream::getStartTimeForNewStream(expCtx); + spec.setResumeAfter( + ResumeToken::makeHighWaterMarkToken(clusterTime, expCtx->changeStreamTokenVersion)); } // Unfold the $changeStream into its constituent stages and add them to the pipeline. diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 860fdce238c..4fc0041cc0e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -227,13 +227,16 @@ std::string DocumentSourceChangeStream::regexEscapeNsForChangeStream(StringData } ResumeTokenData DocumentSourceChangeStream::resolveResumeTokenFromSpec( + const boost::intrusive_ptr& expCtx, const DocumentSourceChangeStreamSpec& spec) { if (spec.getStartAfter()) { return spec.getStartAfter()->getData(); } else if (spec.getResumeAfter()) { return spec.getResumeAfter()->getData(); } else if (spec.getStartAtOperationTime()) { - return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime()).getData(); + return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime(), + expCtx->changeStreamTokenVersion) + .getData(); } tasserted(5666901, "Expected one of 'startAfter', 'resumeAfter' or 'startAtOperationTime' to be " @@ -292,7 +295,7 @@ std::list> DocumentSourceChangeStream::_bui } // Obtain the resume token from the spec. This will be used when building the pipeline. - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); // Unfold the $changeStream into its constituent stages and add them to the pipeline. stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec)); @@ -441,7 +444,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( !spec.getResumeAfter() || !spec.getStartAfter()); auto resumeToken = (spec.getResumeAfter() || spec.getStartAfter()) - ? resolveResumeTokenFromSpec(spec) + ? resolveResumeTokenFromSpec(expCtx, spec) : boost::optional(); uassert(40674, diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index aebb966cf72..60013e64444 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -287,7 +287,9 @@ public: * returns the equivalent high-watermark token. This method should only ever be called on a spec * where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated. */ - static ResumeTokenData resolveResumeTokenFromSpec(const DocumentSourceChangeStreamSpec& spec); + static ResumeTokenData resolveResumeTokenFromSpec( + const boost::intrusive_ptr& expCtx, + const DocumentSourceChangeStreamSpec& spec); /** * For a change stream with no resume information supplied by the user, returns the clusterTime diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp index a3c6f46f0e2..8121fedb3e7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp @@ -71,7 +71,7 @@ DocumentSourceChangeStreamCheckInvalidate::create( const DocumentSourceChangeStreamSpec& spec) { // If resuming from an "invalidate" using "startAfter", pass along the resume token data to // DSCSCheckInvalidate to signify that another invalidate should not be generated. - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); return new DocumentSourceChangeStreamCheckInvalidate( expCtx, boost::make_optional(resumeToken.fromInvalidate, std::move(resumeToken))); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index cc3787a44ba..3861b21693d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -130,7 +130,7 @@ DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResu intrusive_ptr DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr& expCtx, const DocumentSourceChangeStreamSpec& spec) { - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(resumeToken)); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp index d100b6ace48..15fcb30d58f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp @@ -45,7 +45,7 @@ boost::intrusive_ptr DocumentSourceChangeStreamEnsureResumeTokenPresent::create( const boost::intrusive_ptr& expCtx, const DocumentSourceChangeStreamSpec& spec) { - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); tassert(5666902, "Expected non-high-water-mark resume token", !ResumeToken::isHighWaterMarkToken(resumeToken)); diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp index 8b3463f015b..2f0772a29bc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp @@ -174,8 +174,8 @@ BSONObj DocumentSourceChangeStreamHandleTopologyChange::createUpdatedCommandForN Timestamp shardAddedTime) { // We must start the new cursor from the moment at which the shard became visible. const auto newShardAddedTime = LogicalTime{shardAddedTime}; - auto resumeTokenForNewShard = - ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp()); + auto resumeTokenForNewShard = ResumeToken::makeHighWaterMarkToken( + newShardAddedTime.addTicks(1).asTimestamp(), pExpCtx->changeStreamTokenVersion); // Create a new shard command object containing the new resume token. auto shardCommand = replaceResumeTokenInCommand(resumeTokenForNewShard.toDocument()); diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp index 85a59f2db68..c11cb0a1aeb 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp @@ -104,7 +104,7 @@ DocumentSourceChangeStreamOplogMatch::DocumentSourceChangeStreamOplogMatch( boost::intrusive_ptr DocumentSourceChangeStreamOplogMatch::create(const boost::intrusive_ptr& expCtx, const DocumentSourceChangeStreamSpec& spec) { - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); return make_intrusive(resumeToken.clusterTime, expCtx); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 5f26ba73267..774e7a2dd23 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -4515,7 +4515,7 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) { auto oplogAfterResumeTime = makeAnOplogEntry(afterResumeTs, documentKey); // Create a v2 high water mark token which sorts immediately before 'firstOplogAtResumeTime'. - ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs).getData(); + ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs, 2).getData(); resumeToken.version = 2; auto expCtx = getExpCtxRaw(); expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("unittests"); diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index cf31f61b536..766150a6e54 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -74,7 +74,8 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) { // Extract the resume token or high-water-mark from the spec. - auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); + auto tokenData = + DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, _changeStreamSpec); // Set the initialPostBatchResumeToken on the expression context. expCtx->initialPostBatchResumeToken = ResumeToken(tokenData).toBSON(); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 5a225a35fca..28603f10a30 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -385,7 +385,9 @@ protected: } intrusive_ptr createDSCheckResumability( Timestamp ts) { - return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData()); + return createDSCheckResumability( + ResumeToken::makeHighWaterMarkToken(ts, ResumeTokenData::kDefaultTokenVersion) + .getData()); } }; diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 38f7680e2ce..52036933e6f 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -218,6 +218,9 @@ intrusive_ptr ExpressionContext::copyWith( expCtx->exprDeprectedForApiV1 = exprDeprectedForApiV1; expCtx->initialPostBatchResumeToken = initialPostBatchResumeToken.getOwned(); + expCtx->changeStreamTokenVersion = changeStreamTokenVersion; + expCtx->changeStreamSpec = changeStreamSpec; + expCtx->originalAggregateCommand = originalAggregateCommand.getOwned(); // Note that we intentionally skip copying the value of '_interruptCounter' because 'expCtx' is diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 833a2da9ef1..2fe36d51ab7 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -472,6 +472,9 @@ public: // If present, the spec associated with the current change stream pipeline. boost::optional changeStreamSpec; + // The resume token version that should be generated by a change stream. + int changeStreamTokenVersion = ResumeTokenData::kDefaultTokenVersion; + // True if the expression context is the original one for a given pipeline. // False if another context is created for the same pipeline. Used to disable duplicate // expression counting. diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index 9bd24748396..0b26a7db813 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -174,7 +174,8 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional // high-water-mark token at the current clusterTime. auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); if (highWaterMark > _latestOplogTimestamp) { - auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark); + auto token = ResumeToken::makeHighWaterMarkToken( + highWaterMark, _pipeline->getContext()->changeStreamTokenVersion); _postBatchResumeToken = token.toDocument().toBson(); _latestOplogTimestamp = highWaterMark; _setSpeculativeReadTimestamp(); diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index d1a54e4cb52..cc6d3631fd3 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -50,9 +50,9 @@ namespace { // Helper function for makeHighWaterMarkToken and isHighWaterMarkToken. ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime, int version) { ResumeTokenData tokenData; + tokenData.version = version; tokenData.clusterTime = clusterTime; tokenData.tokenType = ResumeTokenData::kHighWaterMarkToken; - tokenData.version = version; return tokenData; } } // namespace @@ -297,9 +297,8 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) { return ResumeToken(resumeDoc); } -ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime) { - return ResumeToken( - makeHighWaterMarkResumeTokenData(clusterTime, ResumeTokenData::kDefaultTokenVersion)); +ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime, int version) { + return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, version)); } bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) { diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 1c5380b7f12..a157c97fed9 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -142,7 +142,7 @@ public: /** * Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey. */ - static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime); + static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime, int version); /** * Returns true if the given token data represents a valid high-water-mark resume token; that diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index f44a6526c00..f63f6c61bc6 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -113,6 +113,12 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptrinMongos) { + aggReq.setGenerateV2ResumeTokens(expCtx->changeStreamTokenVersion == 2); + } + SimpleCursorOptions cursor; cursor.setBatchSize(0); aggReq.setCursor(cursor); @@ -145,6 +151,17 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, cmdForShards[AggregateCommandRequest::kCollationFieldName] = Value(collationObj); } + // We explicitly set $_generateV2ResumeTokens to false, if not already set, to indicate that the + // shards should NOT produce v2 resume tokens for change streams; instead, they should continue + // generating v1 tokens. This facilitates upgrade between 6.0 (which produces v1 by default) + // and 7.0 (which will produce v2 by default, but will be capable of generating v1) by ensuring + // that a 6.0 mongoS on a mixed 6.0/7.0 cluster will see only v1 tokens in the stream. + // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed. + const auto& v2FieldName = AggregateCommandRequest::kGenerateV2ResumeTokensFieldName; + if (auto cmdObj = cmdForShards.peek(); expCtx->inMongos && cmdObj[v2FieldName].missing()) { + cmdForShards[v2FieldName] = Value(false); + } + // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. if (explainVerbosity) { diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index 183c27e05d6..56498b71024 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -395,7 +395,9 @@ TEST(CursorResponseTest, addToBSONSubsequentResponse) { TEST(CursorResponseTest, serializePostBatchResumeToken) { std::vector batch = {BSON("_id" << 1), BSON("_id" << 2)}; auto postBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson(); + ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2), ResumeTokenData::kDefaultTokenVersion) + .toDocument() + .toBson(); CursorResponse response( NamespaceString("db.coll"), CursorId(123), batch, boost::none, postBatchResumeToken); auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse); diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index d089ac691c9..80600272e63 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -54,7 +54,10 @@ LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) { } BSONObj makePostBatchResumeToken(Timestamp clusterTime) { - auto pbrt = ResumeToken::makeHighWaterMarkToken(clusterTime).toDocument().toBson(); + auto pbrt = + ResumeToken::makeHighWaterMarkToken(clusterTime, ResumeTokenData::kDefaultTokenVersion) + .toDocument() + .toBson(); invariant(pbrt.firstElement().type() == BSONType::String); return pbrt; } diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 4bcd150206b..b3df71582da 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -133,6 +133,14 @@ boost::intrusive_ptr makeExpressionContext( mergeCtx->inMongos = true; + // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0 + // we only expect this to occur during testing. + // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false. + if (request.getGenerateV2ResumeTokens()) { + uassert(6528201, "Invalid request for v2 resume tokens", getTestCommandsEnabled()); + mergeCtx->changeStreamTokenVersion = 2; + } + // Serialize the 'AggregateCommandRequest' and save it so that the original command can be // reconstructed for dispatch to a new shard, which is sometimes necessary for change streams // pipelines. -- cgit v1.2.1