summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorCatalin Sumanaru <catalin.sumanaru@mongodb.com>2022-06-28 09:47:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-28 10:57:26 +0000
commit62d6f21de4e554c1b4656e36337af08e896567fa (patch)
tree5717a14a3a34bb4dc767f8928f1cb3261158eda6 /src/mongo
parent2b66cd3adf6c22e4b4c173f6dc3cdb918645dc84 (diff)
downloadmongo-62d6f21de4e554c1b4656e36337af08e896567fa.tar.gz
SERVER-65257 Expose only the all arguments constructor for 'ResumeTokenData' class
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp7
-rw-r--r--src/mongo/db/pipeline/change_stream_test_helpers.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp15
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp24
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp23
-rw-r--r--src/mongo/db/pipeline/resume_token.h18
-rw-r--r--src/mongo/db/pipeline/resume_token_test.cpp97
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp9
8 files changed, 113 insertions, 94 deletions
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index f41ae425423..5e2cdafaa66 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -46,6 +46,7 @@ namespace mongo {
namespace {
constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
constexpr auto checkValueTypeOrMissing = &DocumentSourceChangeStream::checkValueTypeOrMissing;
+constexpr auto resolveResumeToken = &DocumentSourceChangeStream::resolveResumeTokenFromSpec;
Document copyDocExceptFields(const Document& source, const std::set<StringData>& fieldNames) {
MutableDocument doc(source);
@@ -93,11 +94,7 @@ NamespaceString createNamespaceStringFromOplogEntry(Value tid, StringData ns) {
ChangeStreamEventTransformation::ChangeStreamEventTransformation(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec)
- : _changeStreamSpec(spec), _expCtx(expCtx) {
- // Extract the resume token from the spec and store it.
- _resumeToken =
- DocumentSourceChangeStream::resolveResumeTokenFromSpec(_expCtx, _changeStreamSpec);
-
+ : _changeStreamSpec(spec), _expCtx(expCtx), _resumeToken(resolveResumeToken(expCtx, spec)) {
// Determine whether the user requested a point-in-time pre-image, which will affect this
// stage's output.
_preImageRequested =
diff --git a/src/mongo/db/pipeline/change_stream_test_helpers.cpp b/src/mongo/db/pipeline/change_stream_test_helpers.cpp
index 9ee4bf2b33a..81b7ad860cb 100644
--- a/src/mongo/db/pipeline/change_stream_test_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_test_helpers.cpp
@@ -75,13 +75,13 @@ Document makeResumeTokenWithEventId(Timestamp ts,
ImplicitValue eventIdentifier,
ResumeTokenData::FromInvalidate fromInvalidate,
size_t txnOpIndex) {
- ResumeTokenData tokenData;
- tokenData.clusterTime = ts;
- tokenData.eventIdentifier = eventIdentifier;
- tokenData.fromInvalidate = fromInvalidate;
- tokenData.txnOpIndex = txnOpIndex;
- if (!uuid.missing())
- tokenData.uuid = uuid.getUuid();
+ auto optionalUuid = uuid.missing() ? boost::none : boost::make_optional(uuid.getUuid());
+ ResumeTokenData tokenData{ts,
+ ResumeTokenData::kDefaultTokenVersion,
+ txnOpIndex,
+ optionalUuid,
+ eventIdentifier,
+ fromInvalidate};
return ResumeToken(tokenData).toDocument();
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
index 8aa0d00deb9..ba6b8e751a8 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
@@ -70,12 +70,19 @@ public:
Document makeResumeToken(ImplicitValue id = Value()) {
const Timestamp ts(100, 1);
if (id.missing()) {
- ResumeTokenData tokenData;
- tokenData.clusterTime = ts;
+ ResumeTokenData tokenData(ts,
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpIndex */ 0,
+ /* uuid */ boost::none,
+ /* eventIdentifier */ Value());
return ResumeToken(tokenData).toDocument();
}
- return ResumeToken(ResumeTokenData(ts, 0, 0, testUuid(), Value(Document{{"_id", id}})))
- .toDocument();
+ ResumeTokenData tokenData(ts,
+ /* version */ 0,
+ /* txnOpIndex */ 0,
+ testUuid(),
+ /* eventIdentifier */ Value(Document{{"_id", id}}));
+ return ResumeToken(tokenData).toDocument();
}
DocumentSourceChangeStreamSpec getSpec(
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 5d250101fa2..e93e68aa0ca 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -651,11 +651,13 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn
// Create a resume token representing an 'invalidate' event, and use it to seed the stage. A
// resume token with {fromInvalidate:true} can only be used with startAfter, to start a new
// stream after the old stream is invalidated.
- ResumeTokenData invalidateToken;
- invalidateToken.clusterTime = resumeTimestamp;
- invalidateToken.uuid = uuids[0];
- invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
- invalidateToken.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
+ auto eventIdentifier = Value{Document{{"operationType", "drop"_sd}}};
+ ResumeTokenData invalidateToken{resumeTimestamp,
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpIndex */ 0,
+ uuids[0],
+ std::move(eventIdentifier),
+ ResumeTokenData::kFromInvalidate};
auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken);
// Add three documents which each have the invalidate resume token. We expect to see this in the
@@ -692,11 +694,13 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv
// Create a resume token representing an 'invalidate' event, and use it to seed the stage. A
// resume token with {fromInvalidate:true} can only be used with startAfter, to start a new
// stream after the old stream is invalidated.
- ResumeTokenData invalidateToken;
- invalidateToken.clusterTime = resumeTimestamp;
- invalidateToken.uuid = uuids[0];
- invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
- invalidateToken.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
+ auto eventIdentifier = Value{Document{{"operationType", "drop"_sd}}};
+ ResumeTokenData invalidateToken{resumeTimestamp,
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpIndex */ 0,
+ uuids[0],
+ eventIdentifier,
+ ResumeTokenData::kFromInvalidate};
auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken);
// Create a second invalidate token with the same clusterTime but a different UUID.
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index cc6d3631fd3..c17eb5d009a 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -46,17 +46,6 @@ namespace mongo {
constexpr StringData ResumeToken::kDataFieldName;
constexpr StringData ResumeToken::kTypeBitsFieldName;
-namespace {
-// Helper function for makeHighWaterMarkToken and isHighWaterMarkToken.
-ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime, int version) {
- ResumeTokenData tokenData;
- tokenData.version = version;
- tokenData.clusterTime = clusterTime;
- tokenData.tokenType = ResumeTokenData::kHighWaterMarkToken;
- return tokenData;
-}
-} // namespace
-
ResumeTokenData::ResumeTokenData(Timestamp clusterTimeIn,
int versionIn,
size_t txnOpIndexIn,
@@ -297,12 +286,20 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) {
return ResumeToken(resumeDoc);
}
+ResumeTokenData ResumeToken::makeHighWaterMarkTokenData(Timestamp clusterTime, int version) {
+ ResumeTokenData tokenData;
+ tokenData.version = version;
+ tokenData.clusterTime = clusterTime;
+ tokenData.tokenType = ResumeTokenData::kHighWaterMarkToken;
+ return tokenData;
+}
+
ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime, int version) {
- return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, version));
+ return ResumeToken(makeHighWaterMarkTokenData(clusterTime, version));
}
bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) {
- return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime, tokenData.version);
+ return tokenData == makeHighWaterMarkTokenData(tokenData.clusterTime, tokenData.version);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 589cf458ee7..e626bb43b2b 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -62,9 +62,6 @@ struct ResumeTokenData {
kEventToken = 128, // Token refers to an actual event in the stream.
};
- ResumeTokenData(){};
-
- // TODO SERVER-65257: force all callers to go through this constructor.
ResumeTokenData(Timestamp clusterTimeIn,
int versionIn,
size_t txnOpIndexIn,
@@ -78,10 +75,14 @@ struct ResumeTokenData {
int versionIn,
size_t txnOpIndexIn,
const boost::optional<UUID>& uuidIn,
- Value eventIdentifierIn)
+ Value eventIdentifierIn,
+ FromInvalidate fromInvalidate = FromInvalidate::kNotFromInvalidate,
+ TokenType tokenType = TokenType::kEventToken)
: clusterTime(clusterTimeIn),
version(versionIn),
+ tokenType(tokenType),
txnOpIndex(txnOpIndexIn),
+ fromInvalidate(fromInvalidate),
uuid(uuidIn),
eventIdentifier(std::move(eventIdentifierIn)){};
@@ -107,6 +108,12 @@ struct ResumeTokenData {
// The eventIdentifier can be either be a document key for CRUD operations, or a more
// descriptive operation details for non-CRUD operations.
Value eventIdentifier;
+
+private:
+ // This private constructor should only ever be used internally or by the ResumeToken class.
+ ResumeTokenData() = default;
+
+ friend class ResumeToken;
};
std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData);
@@ -184,6 +191,9 @@ public:
}
private:
+ // Helper function for makeHighWaterMarkToken and isHighWaterMarkToken.
+ static ResumeTokenData makeHighWaterMarkTokenData(Timestamp clusterTime, int version);
+
explicit ResumeToken(const Document& resumeData);
// This is the hex-encoded string encoding all the pieces of the resume token.
diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp
index 8592e701665..dcfcf284486 100644
--- a/src/mongo/db/pipeline/resume_token_test.cpp
+++ b/src/mongo/db/pipeline/resume_token_test.cpp
@@ -59,12 +59,12 @@ TEST(ResumeToken, EncodesFullTokenFromData) {
}
TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) {
- Timestamp ts(1001, 3);
-
- ResumeTokenData resumeTokenDataIn;
- resumeTokenDataIn.clusterTime = ts;
- resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
-
+ ResumeTokenData resumeTokenDataIn{
+ Timestamp{1001, 3},
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpindex */ 0,
+ /* uuid */ boost::none,
+ /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})};
ResumeToken token(resumeTokenDataIn);
ResumeTokenData tokenData = token.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
@@ -90,11 +90,12 @@ TEST(ResumeToken, ShouldRoundTripThroughHexEncoding) {
}
TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexEncoding) {
- Timestamp ts(1001, 3);
-
- ResumeTokenData resumeTokenDataIn;
- resumeTokenDataIn.clusterTime = ts;
- resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
+ ResumeTokenData resumeTokenDataIn{
+ Timestamp{1001, 3},
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpIndex */ 0,
+ /* uuid */ boost::none,
+ /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})};
// Test serialization/parsing through Document.
auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
@@ -108,15 +109,14 @@ TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexEncoding) {
}
TEST(ResumeToken, NonDocumentKeyResumeTokenRoundTripsThroughHexEncoding) {
- Timestamp ts(1001, 3);
-
- ResumeTokenData resumeTokenDataIn;
- resumeTokenDataIn.clusterTime = ts;
- resumeTokenDataIn.uuid = UUID::gen();
-
- resumeTokenDataIn.eventIdentifier = Value(BSON("operationType"
- << "create"
- << "operationDescription" << BSONObj()));
+ auto eventIdentifier = Value(BSON("operationType"
+ << "create"
+ << "operationDescription" << BSONObj()));
+ ResumeTokenData resumeTokenDataIn{Timestamp{1001, 3},
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpIndex */ 0,
+ UUID::gen(),
+ eventIdentifier};
// Test serialization/parsing through Document.
auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
@@ -244,10 +244,11 @@ TEST(ResumeToken, FailsToParseForInvalidTokenFormats) {
AssertionException);
// Valid data field, but wrong type typeBits.
- Timestamp ts(1010, 4);
- ResumeTokenData tokenData;
- tokenData.clusterTime = ts;
- tokenData.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
+ ResumeTokenData tokenData{Timestamp{1010, 4},
+ ResumeTokenData::kDefaultTokenVersion,
+ /* version */ 0,
+ /* uuid */ boost::none,
+ /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})};
auto goodTokenDocBinData = ResumeToken(tokenData).toDocument();
auto goodData = goodTokenDocBinData["_data"].getStringData();
ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, goodData}, {"_typeBits", "string"_sd}}),
@@ -260,11 +261,11 @@ TEST(ResumeToken, FailsToParseForInvalidTokenFormats) {
}
TEST(ResumeToken, FailsToDecodeInvalidKeyString) {
- Timestamp ts(1010, 4);
-
- ResumeTokenData tokenData;
- tokenData.clusterTime = ts;
- tokenData.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
+ ResumeTokenData tokenData{Timestamp{1010, 4},
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpIndex */ 0,
+ /* uuid */ boost::none,
+ /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})};
auto goodTokenDocBinData = ResumeToken(tokenData).toDocument();
auto goodData = goodTokenDocBinData["_data"].getStringData();
@@ -314,13 +315,14 @@ TEST(ResumeToken, FailsToDecodeInvalidKeyString) {
TEST(ResumeToken, WrongVersionToken) {
Timestamp ts(1001, 3);
-
- ResumeTokenData resumeTokenDataIn;
- resumeTokenDataIn.clusterTime = ts;
- resumeTokenDataIn.version = 0;
- resumeTokenDataIn.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate;
- const auto eventIdentifier =
+ auto eventIdentifier =
Value(Document{{"operationType", "insert"_sd}, {"documentKey", Document{{"_id", 1}}}});
+ ResumeTokenData resumeTokenDataIn{ts,
+ /* version */ 0,
+ /* txnOpIndex */ 0,
+ UUID::gen(),
+ eventIdentifier,
+ ResumeTokenData::FromInvalidate::kFromInvalidate};
// This one with version 0 should succeed. Version 0 cannot encode the fromInvalidate bool, so
// we expect it to be set to the default 'kNotFromInvalidate' after serialization.
@@ -337,8 +339,6 @@ TEST(ResumeToken, WrongVersionToken) {
ASSERT_EQ(resumeTokenDataIn, tokenData);
resumeTokenDataIn.version = 2;
- resumeTokenDataIn.uuid = UUID::gen();
- resumeTokenDataIn.eventIdentifier = eventIdentifier;
rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
tokenData = rtToken.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
@@ -350,10 +350,13 @@ TEST(ResumeToken, WrongVersionToken) {
// For version 0, the 'tokenType' field is not encoded. We expect it to default from the value
// 'kHighWaterMarkToken' back to 'kEventToken' after serialization.
- resumeTokenDataIn = {};
- resumeTokenDataIn.version = 0;
- resumeTokenDataIn.tokenType = ResumeTokenData::kHighWaterMarkToken;
-
+ resumeTokenDataIn = ResumeTokenData{ts,
+ /* version */ 0,
+ /* txnOpIndex */ 0,
+ /* uuid */ boost::none,
+ /* eventIdentifier */ Value{},
+ ResumeTokenData::kNotFromInvalidate,
+ ResumeTokenData::TokenType::kHighWaterMarkToken};
rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
tokenData = rtToken.getData();
ASSERT_NE(resumeTokenDataIn, tokenData);
@@ -383,12 +386,12 @@ TEST(ResumeToken, WrongVersionToken) {
}
TEST(ResumeToken, InvalidTxnOpIndex) {
- Timestamp ts(1001, 3);
-
- ResumeTokenData resumeTokenDataIn;
- resumeTokenDataIn.clusterTime = ts;
- resumeTokenDataIn.txnOpIndex = 1234;
- resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
+ ResumeTokenData resumeTokenDataIn{
+ Timestamp{1001, 3},
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpIndex */ 1234,
+ /* uuid */ boost::none,
+ /* eventIdentifier */ Value(Document{{"operationType", "drop"_sd}})};
// Should round trip with a non-negative txnOpIndex.
auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 4ba891f55f5..e20c06538f9 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -63,10 +63,11 @@ BSONObj makePostBatchResumeToken(Timestamp clusterTime) {
}
BSONObj makeResumeToken(Timestamp clusterTime, UUID uuid, BSONObj docKey) {
- ResumeTokenData data;
- data.clusterTime = clusterTime;
- data.uuid = uuid;
- data.eventIdentifier = Value(Document{docKey});
+ ResumeTokenData data(clusterTime,
+ ResumeTokenData::kDefaultTokenVersion,
+ /* txnOpIndex */ 0,
+ uuid,
+ /* eventIdentifier */ Value(Document{docKey}));
return ResumeToken(data).toDocument().toBson();
}