summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2022-04-11 17:09:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-11 23:28:08 +0000
commit9f0897f74297a4ae415a3b5ebf482ff78afc2058 (patch)
tree9829721a2bcded15f410b23a8af5962bfd5e6a2d /src
parentbbb9ba057afddf16a714c3c39688bee9fe8ccb56 (diff)
downloadmongo-9f0897f74297a4ae415a3b5ebf482ff78afc2058.tar.gz
SERVER-65282 Add $_generateV2ResumeTokens parameter to aggregate command
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp8
-rw-r--r--src/mongo/db/pipeline/aggregate_command.idl7
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp19
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.h9
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp3
-rw-r--r--src/mongo/db/pipeline/expression_context.h3
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp3
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp7
-rw-r--r--src/mongo/db/pipeline/resume_token.h2
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp17
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp4
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp5
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp8
24 files changed, 103 insertions, 31 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index e21bcfbffdf..127898b875e 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -442,6 +442,14 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
expCtx->collationMatchesDefault = collationMatchesDefault;
expCtx->forPerShardCursor = request.getPassthroughToShard().has_value();
+ // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0
+ // we only expect this to occur during testing.
+ // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false.
+ if (request.getGenerateV2ResumeTokens()) {
+ uassert(6528200, "Invalid request for v2 resume tokens", getTestCommandsEnabled());
+ expCtx->changeStreamTokenVersion = 2;
+ }
+
return expCtx;
}
diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl
index 0e69d47c6ba..b84513a30e6 100644
--- a/src/mongo/db/pipeline/aggregate_command.idl
+++ b/src/mongo/db/pipeline/aggregate_command.idl
@@ -261,6 +261,13 @@ commands:
cpp_name: passthroughToShard
optional: true
unstable: true
+ # TODO SERVER-65370: after 6.0, $_generateV2ResumeTokens should be assumed true if absent.
+ # TODO SERVER-65369: $_generateV2ResumeTokens can be removed after 7.0.
+ $_generateV2ResumeTokens:
+ description: "Internal parameter to signal whether v2 resume tokens should be generated."
+ type: optionalBool
+ cpp_name: generateV2ResumeTokens
+ unstable: false
encryptionInformation:
description: "Encryption Information schema and other tokens for CRUD commands"
type: EncryptionInformation
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 1cd6fe39c74..1731a99f328 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -75,9 +75,12 @@ void setResumeTokenForEvent(const ResumeTokenData& resumeTokenData, MutableDocum
} // namespace
ChangeStreamEventTransformation::ChangeStreamEventTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec)
- : _changeStreamSpec(spec) {
- _resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
+ : _changeStreamSpec(spec), _expCtx(expCtx) {
+ // Extract the resume token from the spec and store it.
+ _resumeToken =
+ DocumentSourceChangeStream::resolveResumeTokenFromSpec(_expCtx, _changeStreamSpec);
// Determine whether the user requested a point-in-time pre-image, which will affect this
// stage's output.
@@ -104,7 +107,7 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal,
// If we have a resume token, we need to match the version with which it was generated until we
// have surpassed it, at which point we can begin generating tokens with our default version.
- auto version = (clusterTime > _resumeToken.clusterTime) ? ResumeTokenData::kDefaultTokenVersion
+ auto version = (clusterTime > _resumeToken.clusterTime) ? _expCtx->changeStreamTokenVersion
: _resumeToken.version;
// Construct and return the final resume token.
@@ -114,7 +117,7 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal,
ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec)
- : ChangeStreamEventTransformation(spec) {
+ : ChangeStreamEventTransformation(expCtx, spec) {
_documentKeyCache =
std::make_unique<change_stream_legacy::DocumentKeyCache>(expCtx, _resumeToken);
}
@@ -471,6 +474,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
return doc.freeze();
}
+ChangeStreamViewDefinitionEventTransformation::ChangeStreamViewDefinitionEventTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec)
+ : ChangeStreamEventTransformation(expCtx, spec) {}
+
std::set<std::string> ChangeStreamViewDefinitionEventTransformation::getFieldNameDependencies()
const {
return std::set<std::string>{repl::OplogEntry::kOpTypeFieldName.toString(),
@@ -541,7 +549,8 @@ ChangeStreamEventTransformer::ChangeStreamEventTransformer(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
_defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(expCtx, spec);
- _viewNsEventBuilder = std::make_unique<ChangeStreamViewDefinitionEventTransformation>(spec);
+ _viewNsEventBuilder =
+ std::make_unique<ChangeStreamViewDefinitionEventTransformation>(expCtx, spec);
_isSingleCollStream = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns) ==
DocumentSourceChangeStream::ChangeStreamType::kSingleCollection;
}
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.h b/src/mongo/db/pipeline/change_stream_event_transform.h
index 8bb24eaf2bd..a884fe1c672 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.h
+++ b/src/mongo/db/pipeline/change_stream_event_transform.h
@@ -40,7 +40,8 @@ namespace mongo {
*/
class ChangeStreamEventTransformation {
public:
- ChangeStreamEventTransformation(const DocumentSourceChangeStreamSpec& spec);
+ ChangeStreamEventTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
virtual ~ChangeStreamEventTransformation() {}
@@ -64,6 +65,7 @@ protected:
Value opDescription) const;
const DocumentSourceChangeStreamSpec _changeStreamSpec;
+ boost::intrusive_ptr<ExpressionContext> _expCtx;
ResumeTokenData _resumeToken;
// Set to true if the pre-image should be included in the output documents.
@@ -94,8 +96,9 @@ private:
*/
class ChangeStreamViewDefinitionEventTransformation final : public ChangeStreamEventTransformation {
public:
- ChangeStreamViewDefinitionEventTransformation(const DocumentSourceChangeStreamSpec& spec)
- : ChangeStreamEventTransformation(spec) {}
+ ChangeStreamViewDefinitionEventTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
Document applyTransformation(const Document& fromDoc) const override;
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 07678a0235a..af7cd71034f 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -64,8 +64,9 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
if (!userRequestedResumePoint) {
// Make sure we update the 'resumeAfter' in the 'spec' so that we serialize the
// correct resume token when sending it to the shards.
- spec.setResumeAfter(ResumeToken::makeHighWaterMarkToken(
- DocumentSourceChangeStream::getStartTimeForNewStream(expCtx)));
+ auto clusterTime = DocumentSourceChangeStream::getStartTimeForNewStream(expCtx);
+ spec.setResumeAfter(
+ ResumeToken::makeHighWaterMarkToken(clusterTime, expCtx->changeStreamTokenVersion));
}
// Unfold the $changeStream into its constituent stages and add them to the pipeline.
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 860fdce238c..4fc0041cc0e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -227,13 +227,16 @@ std::string DocumentSourceChangeStream::regexEscapeNsForChangeStream(StringData
}
ResumeTokenData DocumentSourceChangeStream::resolveResumeTokenFromSpec(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
if (spec.getStartAfter()) {
return spec.getStartAfter()->getData();
} else if (spec.getResumeAfter()) {
return spec.getResumeAfter()->getData();
} else if (spec.getStartAtOperationTime()) {
- return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime()).getData();
+ return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime(),
+ expCtx->changeStreamTokenVersion)
+ .getData();
}
tasserted(5666901,
"Expected one of 'startAfter', 'resumeAfter' or 'startAtOperationTime' to be "
@@ -292,7 +295,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
}
// Obtain the resume token from the spec. This will be used when building the pipeline.
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
// Unfold the $changeStream into its constituent stages and add them to the pipeline.
stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec));
@@ -441,7 +444,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
!spec.getResumeAfter() || !spec.getStartAfter());
auto resumeToken = (spec.getResumeAfter() || spec.getStartAfter())
- ? resolveResumeTokenFromSpec(spec)
+ ? resolveResumeTokenFromSpec(expCtx, spec)
: boost::optional<ResumeTokenData>();
uassert(40674,
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index aebb966cf72..60013e64444 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -287,7 +287,9 @@ public:
* returns the equivalent high-watermark token. This method should only ever be called on a spec
* where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated.
*/
- static ResumeTokenData resolveResumeTokenFromSpec(const DocumentSourceChangeStreamSpec& spec);
+ static ResumeTokenData resolveResumeTokenFromSpec(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
/**
* For a change stream with no resume information supplied by the user, returns the clusterTime
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
index a3c6f46f0e2..8121fedb3e7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
@@ -71,7 +71,7 @@ DocumentSourceChangeStreamCheckInvalidate::create(
const DocumentSourceChangeStreamSpec& spec) {
// If resuming from an "invalidate" using "startAfter", pass along the resume token data to
// DSCSCheckInvalidate to signify that another invalidate should not be generated.
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
return new DocumentSourceChangeStreamCheckInvalidate(
expCtx, boost::make_optional(resumeToken.fromInvalidate, std::move(resumeToken)));
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
index cc3787a44ba..3861b21693d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
@@ -130,7 +130,7 @@ DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResu
intrusive_ptr<DocumentSourceChangeStreamCheckResumability>
DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(resumeToken));
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
index d100b6ace48..15fcb30d58f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
@@ -45,7 +45,7 @@ boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
DocumentSourceChangeStreamEnsureResumeTokenPresent::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
tassert(5666902,
"Expected non-high-water-mark resume token",
!ResumeToken::isHighWaterMarkToken(resumeToken));
diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
index 8b3463f015b..2f0772a29bc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
@@ -174,8 +174,8 @@ BSONObj DocumentSourceChangeStreamHandleTopologyChange::createUpdatedCommandForN
Timestamp shardAddedTime) {
// We must start the new cursor from the moment at which the shard became visible.
const auto newShardAddedTime = LogicalTime{shardAddedTime};
- auto resumeTokenForNewShard =
- ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp());
+ auto resumeTokenForNewShard = ResumeToken::makeHighWaterMarkToken(
+ newShardAddedTime.addTicks(1).asTimestamp(), pExpCtx->changeStreamTokenVersion);
// Create a new shard command object containing the new resume token.
auto shardCommand = replaceResumeTokenInCommand(resumeTokenForNewShard.toDocument());
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
index 85a59f2db68..c11cb0a1aeb 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
@@ -104,7 +104,7 @@ DocumentSourceChangeStreamOplogMatch::DocumentSourceChangeStreamOplogMatch(
boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch>
DocumentSourceChangeStreamOplogMatch::create(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
return make_intrusive<DocumentSourceChangeStreamOplogMatch>(resumeToken.clusterTime, expCtx);
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 5f26ba73267..774e7a2dd23 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -4515,7 +4515,7 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) {
auto oplogAfterResumeTime = makeAnOplogEntry(afterResumeTs, documentKey);
// Create a v2 high water mark token which sorts immediately before 'firstOplogAtResumeTime'.
- ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs).getData();
+ ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs, 2).getData();
resumeToken.version = 2;
auto expCtx = getExpCtxRaw();
expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("unittests");
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index cf31f61b536..766150a6e54 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -74,7 +74,8 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
_isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) {
// Extract the resume token or high-water-mark from the spec.
- auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
+ auto tokenData =
+ DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, _changeStreamSpec);
// Set the initialPostBatchResumeToken on the expression context.
expCtx->initialPostBatchResumeToken = ResumeToken(tokenData).toBSON();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 5a225a35fca..28603f10a30 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -385,7 +385,9 @@ protected:
}
intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability(
Timestamp ts) {
- return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData());
+ return createDSCheckResumability(
+ ResumeToken::makeHighWaterMarkToken(ts, ResumeTokenData::kDefaultTokenVersion)
+ .getData());
}
};
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 38f7680e2ce..52036933e6f 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -218,6 +218,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(
expCtx->exprDeprectedForApiV1 = exprDeprectedForApiV1;
expCtx->initialPostBatchResumeToken = initialPostBatchResumeToken.getOwned();
+ expCtx->changeStreamTokenVersion = changeStreamTokenVersion;
+ expCtx->changeStreamSpec = changeStreamSpec;
+
expCtx->originalAggregateCommand = originalAggregateCommand.getOwned();
// Note that we intentionally skip copying the value of '_interruptCounter' because 'expCtx' is
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 833a2da9ef1..2fe36d51ab7 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -472,6 +472,9 @@ public:
// If present, the spec associated with the current change stream pipeline.
boost::optional<DocumentSourceChangeStreamSpec> changeStreamSpec;
+ // The resume token version that should be generated by a change stream.
+ int changeStreamTokenVersion = ResumeTokenData::kDefaultTokenVersion;
+
// True if the expression context is the original one for a given pipeline.
// False if another context is created for the same pipeline. Used to disable duplicate
// expression counting.
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index 9bd24748396..0b26a7db813 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -174,7 +174,8 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional
// high-water-mark token at the current clusterTime.
auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
if (highWaterMark > _latestOplogTimestamp) {
- auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
+ auto token = ResumeToken::makeHighWaterMarkToken(
+ highWaterMark, _pipeline->getContext()->changeStreamTokenVersion);
_postBatchResumeToken = token.toDocument().toBson();
_latestOplogTimestamp = highWaterMark;
_setSpeculativeReadTimestamp();
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index d1a54e4cb52..cc6d3631fd3 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -50,9 +50,9 @@ namespace {
// Helper function for makeHighWaterMarkToken and isHighWaterMarkToken.
ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime, int version) {
ResumeTokenData tokenData;
+ tokenData.version = version;
tokenData.clusterTime = clusterTime;
tokenData.tokenType = ResumeTokenData::kHighWaterMarkToken;
- tokenData.version = version;
return tokenData;
}
} // namespace
@@ -297,9 +297,8 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) {
return ResumeToken(resumeDoc);
}
-ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime) {
- return ResumeToken(
- makeHighWaterMarkResumeTokenData(clusterTime, ResumeTokenData::kDefaultTokenVersion));
+ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime, int version) {
+ return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, version));
}
bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) {
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 1c5380b7f12..a157c97fed9 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -142,7 +142,7 @@ public:
/**
* Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey.
*/
- static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime);
+ static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime, int version);
/**
* Returns true if the given token data represents a valid high-water-mark resume token; that
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index f44a6526c00..f63f6c61bc6 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -113,6 +113,12 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi
<< DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))});
aggReq.setFromMongos(true);
aggReq.setNeedsMerge(true);
+
+ // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed.
+ if (expCtx->inMongos) {
+ aggReq.setGenerateV2ResumeTokens(expCtx->changeStreamTokenVersion == 2);
+ }
+
SimpleCursorOptions cursor;
cursor.setBatchSize(0);
aggReq.setCursor(cursor);
@@ -145,6 +151,17 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
cmdForShards[AggregateCommandRequest::kCollationFieldName] = Value(collationObj);
}
+ // We explicitly set $_generateV2ResumeTokens to false, if not already set, to indicate that the
+ // shards should NOT produce v2 resume tokens for change streams; instead, they should continue
+ // generating v1 tokens. This facilitates upgrade between 6.0 (which produces v1 by default)
+ // and 7.0 (which will produce v2 by default, but will be capable of generating v1) by ensuring
+ // that a 6.0 mongoS on a mixed 6.0/7.0 cluster will see only v1 tokens in the stream.
+ // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed.
+ const auto& v2FieldName = AggregateCommandRequest::kGenerateV2ResumeTokensFieldName;
+ if (auto cmdObj = cmdForShards.peek(); expCtx->inMongos && cmdObj[v2FieldName].missing()) {
+ cmdForShards[v2FieldName] = Value(false);
+ }
+
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (explainVerbosity) {
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index 183c27e05d6..56498b71024 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -395,7 +395,9 @@ TEST(CursorResponseTest, addToBSONSubsequentResponse) {
TEST(CursorResponseTest, serializePostBatchResumeToken) {
std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
auto postBatchResumeToken =
- ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson();
+ ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2), ResumeTokenData::kDefaultTokenVersion)
+ .toDocument()
+ .toBson();
CursorResponse response(
NamespaceString("db.coll"), CursorId(123), batch, boost::none, postBatchResumeToken);
auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse);
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index d089ac691c9..80600272e63 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -54,7 +54,10 @@ LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) {
}
BSONObj makePostBatchResumeToken(Timestamp clusterTime) {
- auto pbrt = ResumeToken::makeHighWaterMarkToken(clusterTime).toDocument().toBson();
+ auto pbrt =
+ ResumeToken::makeHighWaterMarkToken(clusterTime, ResumeTokenData::kDefaultTokenVersion)
+ .toDocument()
+ .toBson();
invariant(pbrt.firstElement().type() == BSONType::String);
return pbrt;
}
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 4bcd150206b..b3df71582da 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -133,6 +133,14 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
mergeCtx->inMongos = true;
+ // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0
+ // we only expect this to occur during testing.
+ // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false.
+ if (request.getGenerateV2ResumeTokens()) {
+ uassert(6528201, "Invalid request for v2 resume tokens", getTestCommandsEnabled());
+ mergeCtx->changeStreamTokenVersion = 2;
+ }
+
// Serialize the 'AggregateCommandRequest' and save it so that the original command can be
// reconstructed for dispatch to a new shard, which is sometimes necessary for change streams
// pipelines.