diff options
author | Catalin Sumanaru <catalin.sumanaru@mongodb.com> | 2022-06-28 09:47:33 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-28 10:57:26 +0000 |
commit | 62d6f21de4e554c1b4656e36337af08e896567fa (patch) | |
tree | 5717a14a3a34bb4dc767f8928f1cb3261158eda6 /src/mongo | |
parent | 2b66cd3adf6c22e4b4c173f6dc3cdb918645dc84 (diff) | |
download | mongo-62d6f21de4e554c1b4656e36337af08e896567fa.tar.gz |
SERVER-65257 Expose only the all arguments constructor for 'ResumeTokenData' class
Diffstat (limited to 'src/mongo')
8 files changed, 113 insertions, 94 deletions
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index f41ae425423..5e2cdafaa66 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -46,6 +46,7 @@ namespace mongo { namespace { constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; constexpr auto checkValueTypeOrMissing = &DocumentSourceChangeStream::checkValueTypeOrMissing; +constexpr auto resolveResumeToken = &DocumentSourceChangeStream::resolveResumeTokenFromSpec; Document copyDocExceptFields(const Document& source, const std::set<StringData>& fieldNames) { MutableDocument doc(source); @@ -93,11 +94,7 @@ NamespaceString createNamespaceStringFromOplogEntry(Value tid, StringData ns) { ChangeStreamEventTransformation::ChangeStreamEventTransformation( const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) - : _changeStreamSpec(spec), _expCtx(expCtx) { - // Extract the resume token from the spec and store it. - _resumeToken = - DocumentSourceChangeStream::resolveResumeTokenFromSpec(_expCtx, _changeStreamSpec); - + : _changeStreamSpec(spec), _expCtx(expCtx), _resumeToken(resolveResumeToken(expCtx, spec)) { // Determine whether the user requested a point-in-time pre-image, which will affect this // stage's output. _preImageRequested = diff --git a/src/mongo/db/pipeline/change_stream_test_helpers.cpp b/src/mongo/db/pipeline/change_stream_test_helpers.cpp index 9ee4bf2b33a..81b7ad860cb 100644 --- a/src/mongo/db/pipeline/change_stream_test_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_test_helpers.cpp @@ -75,13 +75,13 @@ Document makeResumeTokenWithEventId(Timestamp ts, ImplicitValue eventIdentifier, ResumeTokenData::FromInvalidate fromInvalidate, size_t txnOpIndex) { - ResumeTokenData tokenData; - tokenData.clusterTime = ts; - tokenData.eventIdentifier = eventIdentifier; - tokenData.fromInvalidate = fromInvalidate; - tokenData.txnOpIndex = txnOpIndex; - if (!uuid.missing()) - tokenData.uuid = uuid.getUuid(); + auto optionalUuid = uuid.missing() ? boost::none : boost::make_optional(uuid.getUuid()); + ResumeTokenData tokenData{ts, + ResumeTokenData::kDefaultTokenVersion, + txnOpIndex, + optionalUuid, + eventIdentifier, + fromInvalidate}; return ResumeToken(tokenData).toDocument(); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp index 8aa0d00deb9..ba6b8e751a8 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp @@ -70,12 +70,19 @@ public: Document makeResumeToken(ImplicitValue id = Value()) { const Timestamp ts(100, 1); if (id.missing()) { - ResumeTokenData tokenData; - tokenData.clusterTime = ts; + ResumeTokenData tokenData(ts, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpIndex */ 0, + /* uuid */ boost::none, + /* eventIdentifier */ Value()); return ResumeToken(tokenData).toDocument(); } - return ResumeToken(ResumeTokenData(ts, 0, 0, testUuid(), Value(Document{{"_id", id}}))) - .toDocument(); + ResumeTokenData tokenData(ts, + /* version */ 0, + /* txnOpIndex */ 0, + testUuid(), + /* eventIdentifier */ Value(Document{{"_id", id}})); + return ResumeToken(tokenData).toDocument(); } DocumentSourceChangeStreamSpec getSpec( diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 5d250101fa2..e93e68aa0ca 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -651,11 +651,13 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new // stream after the old stream is invalidated. - ResumeTokenData invalidateToken; - invalidateToken.clusterTime = resumeTimestamp; - invalidateToken.uuid = uuids[0]; - invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; - invalidateToken.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); + auto eventIdentifier = Value{Document{{"operationType", "drop"_sd}}}; + ResumeTokenData invalidateToken{resumeTimestamp, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpIndex */ 0, + uuids[0], + std::move(eventIdentifier), + ResumeTokenData::kFromInvalidate}; auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken); // Add three documents which each have the invalidate resume token. We expect to see this in the @@ -692,11 +694,13 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv // Create a resume token representing an 'invalidate' event, and use it to seed the stage. A // resume token with {fromInvalidate:true} can only be used with startAfter, to start a new // stream after the old stream is invalidated. - ResumeTokenData invalidateToken; - invalidateToken.clusterTime = resumeTimestamp; - invalidateToken.uuid = uuids[0]; - invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; - invalidateToken.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); + auto eventIdentifier = Value{Document{{"operationType", "drop"_sd}}}; + ResumeTokenData invalidateToken{resumeTimestamp, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpIndex */ 0, + uuids[0], + eventIdentifier, + ResumeTokenData::kFromInvalidate}; auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken); // Create a second invalidate token with the same clusterTime but a different UUID. diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index cc6d3631fd3..c17eb5d009a 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -46,17 +46,6 @@ namespace mongo { constexpr StringData ResumeToken::kDataFieldName; constexpr StringData ResumeToken::kTypeBitsFieldName; -namespace { -// Helper function for makeHighWaterMarkToken and isHighWaterMarkToken. -ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime, int version) { - ResumeTokenData tokenData; - tokenData.version = version; - tokenData.clusterTime = clusterTime; - tokenData.tokenType = ResumeTokenData::kHighWaterMarkToken; - return tokenData; -} -} // namespace - ResumeTokenData::ResumeTokenData(Timestamp clusterTimeIn, int versionIn, size_t txnOpIndexIn, @@ -297,12 +286,20 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) { return ResumeToken(resumeDoc); } +ResumeTokenData ResumeToken::makeHighWaterMarkTokenData(Timestamp clusterTime, int version) { + ResumeTokenData tokenData; + tokenData.version = version; + tokenData.clusterTime = clusterTime; + tokenData.tokenType = ResumeTokenData::kHighWaterMarkToken; + return tokenData; +} + ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime, int version) { - return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, version)); + return ResumeToken(makeHighWaterMarkTokenData(clusterTime, version)); } bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) { - return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime, tokenData.version); + return tokenData == makeHighWaterMarkTokenData(tokenData.clusterTime, tokenData.version); } } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 589cf458ee7..e626bb43b2b 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -62,9 +62,6 @@ struct ResumeTokenData { kEventToken = 128, // Token refers to an actual event in the stream. }; - ResumeTokenData(){}; - - // TODO SERVER-65257: force all callers to go through this constructor. ResumeTokenData(Timestamp clusterTimeIn, int versionIn, size_t txnOpIndexIn, @@ -78,10 +75,14 @@ struct ResumeTokenData { int versionIn, size_t txnOpIndexIn, const boost::optional<UUID>& uuidIn, - Value eventIdentifierIn) + Value eventIdentifierIn, + FromInvalidate fromInvalidate = FromInvalidate::kNotFromInvalidate, + TokenType tokenType = TokenType::kEventToken) : clusterTime(clusterTimeIn), version(versionIn), + tokenType(tokenType), txnOpIndex(txnOpIndexIn), + fromInvalidate(fromInvalidate), uuid(uuidIn), eventIdentifier(std::move(eventIdentifierIn)){}; @@ -107,6 +108,12 @@ struct ResumeTokenData { // The eventIdentifier can be either be a document key for CRUD operations, or a more // descriptive operation details for non-CRUD operations. Value eventIdentifier; + +private: + // This private constructor should only ever be used internally or by the ResumeToken class. + ResumeTokenData() = default; + + friend class ResumeToken; }; std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData); @@ -184,6 +191,9 @@ public: } private: + // Helper function for makeHighWaterMarkToken and isHighWaterMarkToken. + static ResumeTokenData makeHighWaterMarkTokenData(Timestamp clusterTime, int version); + explicit ResumeToken(const Document& resumeData); // This is the hex-encoded string encoding all the pieces of the resume token. diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp index 8592e701665..dcfcf284486 100644 --- a/src/mongo/db/pipeline/resume_token_test.cpp +++ b/src/mongo/db/pipeline/resume_token_test.cpp @@ -59,12 +59,12 @@ TEST(ResumeToken, EncodesFullTokenFromData) { } TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) { - Timestamp ts(1001, 3); - - ResumeTokenData resumeTokenDataIn; - resumeTokenDataIn.clusterTime = ts; - resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); - + ResumeTokenData resumeTokenDataIn{ + Timestamp{1001, 3}, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpindex */ 0, + /* uuid */ boost::none, + /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})}; ResumeToken token(resumeTokenDataIn); ResumeTokenData tokenData = token.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); @@ -90,11 +90,12 @@ TEST(ResumeToken, ShouldRoundTripThroughHexEncoding) { } TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexEncoding) { - Timestamp ts(1001, 3); - - ResumeTokenData resumeTokenDataIn; - resumeTokenDataIn.clusterTime = ts; - resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); + ResumeTokenData resumeTokenDataIn{ + Timestamp{1001, 3}, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpIndex */ 0, + /* uuid */ boost::none, + /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})}; // Test serialization/parsing through Document. auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); @@ -108,15 +109,14 @@ TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexEncoding) { } TEST(ResumeToken, NonDocumentKeyResumeTokenRoundTripsThroughHexEncoding) { - Timestamp ts(1001, 3); - - ResumeTokenData resumeTokenDataIn; - resumeTokenDataIn.clusterTime = ts; - resumeTokenDataIn.uuid = UUID::gen(); - - resumeTokenDataIn.eventIdentifier = Value(BSON("operationType" - << "create" - << "operationDescription" << BSONObj())); + auto eventIdentifier = Value(BSON("operationType" + << "create" + << "operationDescription" << BSONObj())); + ResumeTokenData resumeTokenDataIn{Timestamp{1001, 3}, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpIndex */ 0, + UUID::gen(), + eventIdentifier}; // Test serialization/parsing through Document. auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); @@ -244,10 +244,11 @@ TEST(ResumeToken, FailsToParseForInvalidTokenFormats) { AssertionException); // Valid data field, but wrong type typeBits. - Timestamp ts(1010, 4); - ResumeTokenData tokenData; - tokenData.clusterTime = ts; - tokenData.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); + ResumeTokenData tokenData{Timestamp{1010, 4}, + ResumeTokenData::kDefaultTokenVersion, + /* version */ 0, + /* uuid */ boost::none, + /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})}; auto goodTokenDocBinData = ResumeToken(tokenData).toDocument(); auto goodData = goodTokenDocBinData["_data"].getStringData(); ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, goodData}, {"_typeBits", "string"_sd}}), @@ -260,11 +261,11 @@ TEST(ResumeToken, FailsToParseForInvalidTokenFormats) { } TEST(ResumeToken, FailsToDecodeInvalidKeyString) { - Timestamp ts(1010, 4); - - ResumeTokenData tokenData; - tokenData.clusterTime = ts; - tokenData.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); + ResumeTokenData tokenData{Timestamp{1010, 4}, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpIndex */ 0, + /* uuid */ boost::none, + /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})}; auto goodTokenDocBinData = ResumeToken(tokenData).toDocument(); auto goodData = goodTokenDocBinData["_data"].getStringData(); @@ -314,13 +315,14 @@ TEST(ResumeToken, FailsToDecodeInvalidKeyString) { TEST(ResumeToken, WrongVersionToken) { Timestamp ts(1001, 3); - - ResumeTokenData resumeTokenDataIn; - resumeTokenDataIn.clusterTime = ts; - resumeTokenDataIn.version = 0; - resumeTokenDataIn.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate; - const auto eventIdentifier = + auto eventIdentifier = Value(Document{{"operationType", "insert"_sd}, {"documentKey", Document{{"_id", 1}}}}); + ResumeTokenData resumeTokenDataIn{ts, + /* version */ 0, + /* txnOpIndex */ 0, + UUID::gen(), + eventIdentifier, + ResumeTokenData::FromInvalidate::kFromInvalidate}; // This one with version 0 should succeed. Version 0 cannot encode the fromInvalidate bool, so // we expect it to be set to the default 'kNotFromInvalidate' after serialization. @@ -337,8 +339,6 @@ TEST(ResumeToken, WrongVersionToken) { ASSERT_EQ(resumeTokenDataIn, tokenData); resumeTokenDataIn.version = 2; - resumeTokenDataIn.uuid = UUID::gen(); - resumeTokenDataIn.eventIdentifier = eventIdentifier; rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); tokenData = rtToken.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); @@ -350,10 +350,13 @@ TEST(ResumeToken, WrongVersionToken) { // For version 0, the 'tokenType' field is not encoded. We expect it to default from the value // 'kHighWaterMarkToken' back to 'kEventToken' after serialization. - resumeTokenDataIn = {}; - resumeTokenDataIn.version = 0; - resumeTokenDataIn.tokenType = ResumeTokenData::kHighWaterMarkToken; - + resumeTokenDataIn = ResumeTokenData{ts, + /* version */ 0, + /* txnOpIndex */ 0, + /* uuid */ boost::none, + /* eventIdentifier */ Value{}, + ResumeTokenData::kNotFromInvalidate, + ResumeTokenData::TokenType::kHighWaterMarkToken}; rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); tokenData = rtToken.getData(); ASSERT_NE(resumeTokenDataIn, tokenData); @@ -383,12 +386,12 @@ TEST(ResumeToken, WrongVersionToken) { } TEST(ResumeToken, InvalidTxnOpIndex) { - Timestamp ts(1001, 3); - - ResumeTokenData resumeTokenDataIn; - resumeTokenDataIn.clusterTime = ts; - resumeTokenDataIn.txnOpIndex = 1234; - resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); + ResumeTokenData resumeTokenDataIn{ + Timestamp{1001, 3}, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpIndex */ 1234, + /* uuid */ boost::none, + /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})}; // Should round trip with a non-negative txnOpIndex. auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 4ba891f55f5..e20c06538f9 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -63,10 +63,11 @@ BSONObj makePostBatchResumeToken(Timestamp clusterTime) { } BSONObj makeResumeToken(Timestamp clusterTime, UUID uuid, BSONObj docKey) { - ResumeTokenData data; - data.clusterTime = clusterTime; - data.uuid = uuid; - data.eventIdentifier = Value(Document{docKey}); + ResumeTokenData data(clusterTime, + ResumeTokenData::kDefaultTokenVersion, + /* txnOpIndex */ 0, + uuid, + /* eventIdentifier */ Value(Document{docKey})); return ResumeToken(data).toDocument().toBson(); } |