summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/resume_token.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/resume_token.cpp')
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp41
1 files changed, 29 insertions, 12 deletions
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index 5d3a5e78d4c..cc612288fa9 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -37,6 +37,7 @@
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/exec/document_value/value_comparator.h"
+#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/storage/key_string.h"
#include "mongo/util/hex.h"
@@ -61,7 +62,7 @@ bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
return clusterTime == other.clusterTime && version == other.version &&
tokenType == other.tokenType && txnOpIndex == other.txnOpIndex &&
fromInvalidate == other.fromInvalidate && uuid == other.uuid &&
- (Value::compare(this->documentKey, other.documentKey, nullptr) == 0);
+ (Value::compare(this->eventIdentifier, other.eventIdentifier, nullptr) == 0);
}
std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) {
@@ -75,7 +76,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) {
out << ", fromInvalidate: " << static_cast<bool>(tokenData.fromInvalidate);
}
out << ", uuid: " << tokenData.uuid;
- out << ", documentKey: " << tokenData.documentKey << "}";
+ out << ", eventIdentifier: " << tokenData.eventIdentifier << "}";
return out;
}
@@ -96,7 +97,7 @@ ResumeToken::ResumeToken(const Document& resumeDoc) {
}
// We encode the resume token as a KeyString with the sequence:
-// clusterTime, version, txnOpIndex, fromInvalidate, uuid, documentKey Only the clusterTime,
+// clusterTime, version, txnOpIndex, fromInvalidate, uuid, eventIdentifier Only the clusterTime,
// version, txnOpIndex, and fromInvalidate are required.
ResumeToken::ResumeToken(const ResumeTokenData& data) {
BSONObjBuilder builder;
@@ -110,13 +111,13 @@ ResumeToken::ResumeToken(const ResumeTokenData& data) {
builder.appendBool("", data.fromInvalidate);
}
uassert(50788,
- "Unexpected resume token with a documentKey but no UUID",
- data.uuid || data.documentKey.missing());
+ "Unexpected resume token with a eventIdentifier but no UUID",
+ data.uuid || data.eventIdentifier.missing());
if (data.uuid) {
data.uuid->appendToBuilder(&builder, "");
}
- data.documentKey.addToBsonObj(&builder, "");
+ data.eventIdentifier.addToBsonObj(&builder, "");
auto keyObj = builder.obj();
KeyString::Builder encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj()));
_hexKeyString = hexblob::encode(encodedToken.getBuffer(), encodedToken.getSize());
@@ -130,9 +131,10 @@ bool ResumeToken::operator==(const ResumeToken& other) const {
// '_hexKeyString' is enough to determine equality. The type bits are used to unambiguously
// re-construct the original data, but we do not expect any two resume tokens to have the same
// data and different type bits, since that would imply they have (1) the same timestamp and (2)
- // the same documentKey (possibly different types). This should not be possible because
- // documents with the same documentKey should be on the same shard and therefore should have
- // different timestamps.
+ // the same eventIdentifier fields and values, but with different types. Change events with the
+ // same eventIdentifier are either (1) on the same shard in the case of CRUD events, which
+ // implies that they must have different timestamps; or (2) refer to the same logical event on
+ // different shards, in the case of non-CRUD events.
return _hexKeyString == other._hexKeyString;
}
@@ -208,15 +210,15 @@ ResumeTokenData ResumeToken::getData() const {
result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean());
}
- // The UUID and documentKey are not required.
+ // The UUID and eventIdentifier are not required.
if (!i.more()) {
return result;
}
- // The UUID comes first, then the documentKey.
+ // The UUID comes first, then the eventIdentifier.
result.uuid = uassertStatusOK(UUID::parse(i.next()));
if (i.more()) {
- result.documentKey = Value(i.next());
+ result.eventIdentifier = Value(i.next());
}
uassert(40646, "invalid oversized resume token", !i.more());
@@ -239,4 +241,19 @@ bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) {
return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime, tokenData.uuid);
}
+Value ResumeToken::makeEventIdentifier(StringData opType, Value documentKey, Value opDescription) {
+ tassert(6280100,
+ "both documentKey and operationDescription cannot be present for an event",
+ documentKey.missing() || opDescription.missing());
+
+ // For classic change events, the eventIdentifier is always the documentKey, even if missing.
+ if (change_stream_legacy::kClassicOperationTypes.count(opType)) {
+ return documentKey;
+ }
+
+ // For an expanded event, the eventIdentifier is its operation type and description.
+ return Value(
+ Document{{"operationType"_sd, opType}, {"operationDescription"_sd, opDescription}});
+}
+
} // namespace mongo