summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-04-09 14:12:05 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-04-13 16:18:35 -0400
commita820491e9402a52d7575157a9897306d49129370 (patch)
tree61ed25fdec3912a8fc1407bcb52380b110b697fa
parent4b894b4a55467c38bb7910317af00793b493de37 (diff)
downloadmongo-a820491e9402a52d7575157a9897306d49129370.tar.gz
SERVER-34313 Use hex-encoded string for resume token
-rw-r--r--jstests/multiVersion/change_streams_feature_compatibility_version.js275
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp27
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp1
-rw-r--r--src/mongo/db/pipeline/document_sources.idl6
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp184
-rw-r--r--src/mongo/db/pipeline/resume_token.h91
-rw-r--r--src/mongo/db/pipeline/resume_token_test.cpp297
-rw-r--r--src/mongo/util/hex.h23
15 files changed, 602 insertions, 332 deletions
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js
index 64b7d0c698d..470b534aafa 100644
--- a/jstests/multiVersion/change_streams_feature_compatibility_version.js
+++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js
@@ -3,12 +3,13 @@
(function() {
"use strict";
- load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
- load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet.
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
const rst = new ReplSetTest({
nodes: 2,
- nodeOpts: {binVersion: "latest"},
+ nodeOptions: {binVersion: "last-stable"},
});
if (!startSetIfSupportsReadMajority(rst)) {
@@ -19,66 +20,216 @@
rst.initiate();
- const testDB = rst.getPrimary().getDB(jsTestName());
- const adminDB = rst.getPrimary().getDB("admin");
-
- // Test both whole-db change streams and cluster-wide change streams.
- const testCases = [{db: testDB, spec: {}}, {db: adminDB, spec: {allChangesForCluster: true}}];
-
- for (let testCase of testCases) {
- const coll = testDB[jsTestName()];
- coll.drop();
-
- // Explicitly set feature compatibility version 4.0.
- let res = adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"});
- assert.commandWorked(res);
- const startAtTime = res.$clusterTime.clusterTime;
-
- // Open and test a change stream using 4.0 features.
- const cst = new ChangeStreamTest(testCase.db);
-
- const multiCollectionCursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: testCase.spec}], collection: 1});
- const startAtSpec =
- Object.assign({}, testCase.spec, {startAtClusterTime: {ts: startAtTime}});
- const startAtClusterTimeCursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: startAtSpec}], collection: 1});
-
- assert.writeOK(coll.insert({_id: 0}));
- let change = cst.getOneChange(multiCollectionCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 0);
-
- change = cst.getOneChange(startAtClusterTimeCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 0);
-
- // Set the feature compatibility version to 3.6.
- assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"}));
-
- // An already created change stream should continue to work.
- assert.writeOK(coll.insert({_id: 1}));
- change = cst.getOneChange(multiCollectionCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 1);
-
- change = cst.getOneChange(startAtClusterTimeCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 1);
-
- // Creating a new change stream with a 4.0 feature should fail.
- assert.commandFailedWithCode(
- testCase.db.runCommand(
- {aggregate: 1, pipeline: [{$changeStream: testCase.spec}], cursor: {}}),
- ErrorCodes.QueryFeatureNotAllowed);
-
- assert.commandFailedWithCode(testDB.runCommand({
- aggregate: coll.getName(),
- pipeline: [{$changeStream: {startAtClusterTime: {ts: startAtTime}}}],
- cursor: {}
- }),
- ErrorCodes.QueryFeatureNotAllowed);
+ function assertResumeTokenUsesStringFormat(resumeToken) {
+ assert.neq(resumeToken._data, undefined);
+ assert.eq(typeof resumeToken._data, "string", tojson(resumeToken));
}
+ function assertResumeTokenUsesBinDataFormat(resumeToken) {
+ assert.neq(resumeToken._data, undefined);
+ assert(resumeToken._data instanceof BinData, tojson(resumeToken));
+ }
+
+ let testDB = rst.getPrimary().getDB(jsTestName());
+ let adminDB = rst.getPrimary().getDB("admin");
+ let coll = testDB[jsTestName()];
+
+ let cst = new ChangeStreamTest(testDB);
+ let adminCST = new ChangeStreamTest(adminDB);
+
+ // We can't open a change stream on a non-existent database on last-stable, so we insert a dummy
+ // document to create the database.
+ // TODO BACKPORT-34138 Remove this check once the change has been backported.
+ assert.writeOK(testDB.dummy.insert({_id: "dummy"}));
+
+ // Open a change stream against a 3.6 binary. We will use the resume token from this stream to
+ // resume the stream once the set has been upgraded.
+ let streamOnOldVersion =
+ cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: coll.getName()});
+ assert.writeOK(coll.insert({_id: 0}));
+
+ let change = cst.getOneChange(streamOnOldVersion);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 0);
+
+ // Extract the resume token and test that it is using the old resume token format using
+ // BinData.
+ const resumeTokenFromLastStable = change._id;
+ assertResumeTokenUsesBinDataFormat(resumeTokenFromLastStable);
+
+ // Test that new query features are banned on 3.6.
+ const failedResponse = assert.commandFailedWithCode(
+ testDB.runCommand({aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {}}),
+ ErrorCodes.InvalidNamespace);
+ assert.commandFailedWithCode(
+ adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}}),
+ 40415);
+
+ assert.commandFailedWithCode(testDB.runCommand({
+ aggregate: coll.getName(),
+ pipeline:
+ [{$changeStream: {startAtClusterTime: {ts: failedResponse.$clusterTime.clusterTime}}}],
+ cursor: {}
+ }),
+ 40415);
+
+ // Upgrade the set to the new binary version, but keep the feature compatibility version at
+ // 3.6.
+ rst.upgradeSet({binVersion: "latest"});
+ testDB = rst.getPrimary().getDB(jsTestName());
+ adminDB = rst.getPrimary().getDB("admin");
+ coll = testDB[jsTestName()];
+ cst = new ChangeStreamTest(testDB);
+ adminCST = new ChangeStreamTest(adminDB);
+
+ // Test that we can resume the stream on the new binaries.
+ streamOnOldVersion = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromLastStable}}],
+ collection: coll.getName()
+ });
+
+ assert.writeOK(coll.insert({_id: 1}));
+
+ change = cst.getOneChange(streamOnOldVersion);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 1);
+
+ // Test that the stream is still using the old resume token format using BinData.
+ assertResumeTokenUsesBinDataFormat(change._id);
+
+ // Explicitly set feature compatibility version 4.0. Remember the cluster time from that
+ // response to use later.
+ const startTime =
+ assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"}))
+ .$clusterTime.clusterTime;
+
+ // Test that we can now use 4.0 features to open a stream.
+ const wholeDbCursor =
+ cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+ const wholeClusterCursor = adminCST.startWatchingChanges(
+ {pipeline: [{$changeStream: {allChangesForCluster: true}}], collection: 1});
+ const cursorStartedWithTime = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {startAtClusterTime: {ts: startTime}}}],
+ collection: coll.getName()
+ });
+
+ assert.writeOK(coll.insert({_id: 2}));
+
+ // Test that the stream opened in FCV 3.6 continues to work and still generates tokens in the
+ // old format.
+ change = cst.getOneChange(streamOnOldVersion);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 2);
+ assertResumeTokenUsesBinDataFormat(change._id);
+
+ // Test all the newly created streams can see an insert.
+ change = cst.getOneChange(wholeDbCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 2);
+
+ change = adminCST.getOneChange(wholeClusterCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 2);
+
+ change = cst.getOneChange(cursorStartedWithTime);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 2);
+
+ // Test that the resume token is using the new format and has type string while FCV is 4.0.
+ const resumeToken = change._id;
+ assertResumeTokenUsesStringFormat(resumeToken);
+
+ // Test that we can resume with the resume token with either format, either against the entire
+ // DB, the entire cluster, or against the single collection.
+ assert.doesNotThrow(() => cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromLastStable}}],
+ collection: 1
+ }));
+ assert.doesNotThrow(() => adminCST.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {allChangesForCluster: true, resumeAfter: resumeTokenFromLastStable}}],
+ collection: 1
+ }));
+ assert.doesNotThrow(() => cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromLastStable}}],
+ collection: coll.getName()
+ }));
+ assert.doesNotThrow(
+ () => cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {resumeAfter: resumeToken}}], collection: 1}));
+ assert.doesNotThrow(() => adminCST.startWatchingChanges({
+ pipeline: [{$changeStream: {allChangesForCluster: true, resumeAfter: resumeToken}}],
+ collection: 1
+ }));
+ assert.doesNotThrow(
+ () => cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {resumeAfter: resumeToken}}], collection: coll.getName()}));
+
+ // Set the feature compatibility version to 3.6.
+ assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"}));
+
+ // Test that existing streams continue, but still generate resume tokens in the new format.
+ assert.writeOK(coll.insert({_id: 3}));
+ change = cst.getOneChange(wholeDbCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 3);
+ assertResumeTokenUsesStringFormat(change._id);
+
+ change = adminCST.getOneChange(wholeClusterCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 3);
+ assertResumeTokenUsesStringFormat(change._id);
+
+ change = cst.getOneChange(cursorStartedWithTime);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 3);
+ assertResumeTokenUsesStringFormat(change._id);
+
+ // Creating a new change stream with a 4.0 feature should fail.
+ assert.commandFailedWithCode(
+ testDB.runCommand({aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {}}),
+ ErrorCodes.QueryFeatureNotAllowed);
+ assert.commandFailedWithCode(
+ adminDB.runCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}}),
+ ErrorCodes.QueryFeatureNotAllowed);
+
+ assert.commandFailedWithCode(testDB.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {startAtClusterTime: {ts: startTime}}}],
+ cursor: {}
+ }),
+ ErrorCodes.QueryFeatureNotAllowed);
+
+ // Test that resuming a change stream opened on FCV 4.0 should continue to work, though a
+ // whole-db or whole-cluster cursor cannot be resumed.
+ assert.commandFailedWithCode(
+ testDB.runCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}}),
+ ErrorCodes.QueryFeatureNotAllowed);
+ assert.commandFailedWithCode(adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {allChangesForCluster: true, resumeAfter: resumeToken}}],
+ cursor: {}
+ }),
+ ErrorCodes.QueryFeatureNotAllowed);
+ let resumedOnFCV36With40BinaryResumeToken;
+ assert.doesNotThrow(() => {
+ resumedOnFCV36With40BinaryResumeToken = coll.watch([], {resumeAfter: resumeToken});
+ });
+ assert.soon(() => resumedOnFCV36With40BinaryResumeToken.hasNext());
+ change = resumedOnFCV36With40BinaryResumeToken.next();
+ assertResumeTokenUsesBinDataFormat(change._id);
+
+ // Test that resuming a change stream with the original resume token still works.
+ let resumedWith36Token;
+ assert.doesNotThrow(() => {
+ resumedWith36Token = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
+ });
+ assert.soon(() => resumedWith36Token.hasNext());
+ change = resumedWith36Token.next();
+ assertResumeTokenUsesBinDataFormat(change._id);
+
rst.stopSet();
}());
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 6fe2428c088..70b69b14cda 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -371,6 +371,7 @@ namespace {
*/
void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv,
intrusive_ptr<DocumentSource>* resumeStageOut,
boost::optional<Timestamp>* startFromOut) {
if (!expCtx->inMongos) {
@@ -419,8 +420,7 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
!(*resumeStageOut) || (!resumeAfterClusterTime && !startAtClusterTime));
if (resumeAfterClusterTime) {
- if (serverGlobalParams.featureCompatibility.getVersion() >=
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) {
+ if (fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) {
warning() << "The '$_resumeAfterClusterTime' option is deprecated, please use "
"'startAtClusterTime' instead.";
}
@@ -434,8 +434,7 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
"feature compatibility version. See "
<< feature_compatibility_version_documentation::kCompatibilityLink
<< " for more information.",
- serverGlobalParams.featureCompatibility.getVersion() >=
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40);
+ fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40);
uassert(50573,
str::stream()
<< "Do not specify both "
@@ -466,12 +465,13 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
elem.embeddedObject());
+ const auto fcv = serverGlobalParams.featureCompatibility.getVersion();
// Make sure that it is legal to run this $changeStream before proceeding.
- DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec);
+ DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec, fcv);
boost::optional<Timestamp> startFrom;
intrusive_ptr<DocumentSource> resumeStage = nullptr;
- parseResumeOptions(expCtx, spec, &resumeStage, &startFrom);
+ parseResumeOptions(expCtx, spec, fcv, &resumeStage, &startFrom);
auto fullDocOption = spec.getFullDocument();
uassert(40575,
@@ -495,7 +495,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
buildMatchFilter(expCtx, *startFrom, startFromInclusive), expCtx));
}
- stages.push_back(createTransformationStage(elem.embeddedObject(), expCtx));
+ stages.push_back(createTransformationStage(expCtx, elem.embeddedObject(), fcv));
if (resumeStage) {
stages.push_back(resumeStage);
}
@@ -536,7 +536,9 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or
}
void DocumentSourceChangeStream::assertIsLegalSpecification(
- const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) {
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv) {
// Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in
// test mode.
// TODO SERVER-34283: remove once whole-database $changeStream is feature-complete.
@@ -552,8 +554,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
<< feature_compatibility_version_documentation::kCompatibilityLink
<< " for more information.",
!expCtx->ns.isCollectionlessAggregateNS() ||
- serverGlobalParams.featureCompatibility.getVersion() >=
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40);
+ fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40);
// If 'allChangesForCluster' is true, the stream must be opened on the 'admin' database with
// {aggregate: 1}.
@@ -580,11 +581,13 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
}
intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage(
- BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) {
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ BSONObj changeStreamSpec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv) {
// Mark the transformation stage as independent of any collection if the change stream is
// watching all collections in the database.
const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS();
return intrusive_ptr<DocumentSource>(new DocumentSourceChangeStreamTransform(
- expCtx, changeStreamSpec, isIndependentOfAnyCollection));
+ expCtx, changeStreamSpec, fcv, isIndependentOfAnyCollection));
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index bdd384da4b9..ca4b579b165 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -33,6 +33,7 @@
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_sources_gen.h"
#include "mongo/db/pipeline/field_path.h"
+#include "mongo/db/pipeline/resume_token.h"
namespace mongo {
@@ -161,7 +162,9 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
static boost::intrusive_ptr<DocumentSource> createTransformationStage(
- BSONObj changeStreamSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ BSONObj changeStreamSpec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv);
/**
* Given a BSON object containing an aggregation command with a $changeStream stage, and a
@@ -185,7 +188,8 @@ private:
// For instance, whether it is permitted to run given the current FCV, whether the namespace is
// valid for the options specified in the spec, etc.
static void assertIsLegalSpecification(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceChangeStreamSpec& spec);
+ const DocumentSourceChangeStreamSpec& spec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv);
// It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson()
// instead.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 45f2c3373d4..cdbbf4d8b86 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -184,7 +184,7 @@ public:
tokenData.documentKey = docKey;
if (!uuid.missing())
tokenData.uuid = uuid.getUuid();
- return ResumeToken(tokenData).toDocument();
+ return ResumeToken(tokenData).toDocument(ResumeToken::SerializationFormat::kHexString);
}
/**
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 16301b7fcd5..50620f898bd 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -86,9 +86,14 @@ bool isOpTypeRelevant(const Document& d) {
DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
BSONObj changeStreamSpec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv,
bool isIndependentOfAnyCollection)
: DocumentSource(expCtx),
_changeStreamSpec(changeStreamSpec.getOwned()),
+ _resumeTokenFormat(
+ fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40
+ ? ResumeToken::SerializationFormat::kHexString
+ : ResumeToken::SerializationFormat::kBinData),
_isIndependentOfAnyCollection(isIndependentOfAnyCollection) {
if (expCtx->ns.isCollectionlessAggregateNS()) {
@@ -296,7 +301,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
}
doc.addField(DocumentSourceChangeStream::kIdField,
- Value(ResumeToken(resumeTokenData).toDocument()));
+ Value(ResumeToken(resumeTokenData).toDocument(_resumeTokenFormat)));
doc.addField(DocumentSourceChangeStream::kOperationTypeField, Value(operationType));
doc.addField(DocumentSourceChangeStream::kClusterTimeField, Value(resumeTokenData.clusterTime));
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 61e6bfb25eb..5ae0ea3fc6f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -40,6 +40,7 @@ class DocumentSourceChangeStreamTransform : public DocumentSource {
public:
DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx,
BSONObj changeStreamSpec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv,
bool isIndependentOfAnyCollection);
Document applyTransformation(const Document& input);
DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
@@ -111,6 +112,8 @@ private:
BSONObj _changeStreamSpec;
+ ResumeToken::SerializationFormat _resumeTokenFormat;
+
// Map of collection UUID to document key fields.
std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index a81d3083c8f..6b5e386051b 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -181,7 +181,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext()
uassert(40585,
str::stream()
<< "resume of change stream was not possible, as the resume token was not found. "
- << ResumeToken::parse(documentFromResumedStream["_id"].getDocument()).toBSON(),
+ << documentFromResumedStream["_id"].getDocument().toString(),
_resumeStatus != ResumeStatus::kCannotResume);
// If we reach this point, then we've seen the resume token.
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 530d6b13129..c216c81b52d 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -66,7 +66,9 @@ protected:
*/
void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
_mock->queue.push_back(
- Document{{"_id", ResumeToken(ResumeTokenData(ts, Value(docKey), uuid)).toDocument()}});
+ Document{{"_id",
+ ResumeToken(ResumeTokenData(ts, Value(docKey), uuid))
+ .toDocument(ResumeToken::SerializationFormat::kHexString)}});
}
/**
* Pushes a document with a resume token corresponding to the given timestamp, _id string, and
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index b90ca65f594..15d236fd409 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -68,10 +68,10 @@ public:
if (id.missing()) {
ResumeTokenData tokenData;
tokenData.clusterTime = ts;
- return ResumeToken(tokenData).toDocument();
+ return ResumeToken(tokenData).toDocument(ResumeToken::SerializationFormat::kHexString);
}
return ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), testUuid()))
- .toDocument();
+ .toDocument(ResumeToken::SerializationFormat::kHexString);
}
};
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 32efd87cdda..3c27f9a90ab 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -64,6 +64,7 @@ const auto kExplain = ExplainOptions::Verbosity::kQueryPlanner;
class ReplDocumentSourceLookUpTest : public DocumentSourceLookUpTest {
public:
void setUp() override {
+ Test::setUp(); // Will establish a feature compatibility version.
auto service = getExpCtx()->opCtx->getServiceContext();
repl::ReplSettings settings;
diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl
index df287295d2c..90a74777e70 100644
--- a/src/mongo/db/pipeline/document_sources.idl
+++ b/src/mongo/db/pipeline/document_sources.idl
@@ -42,7 +42,11 @@ types:
bson_serialization_type: object
description: An object representing a resume token for a change stream
cpp_type: ResumeToken
- serializer: ResumeToken::toBSON
+ # The IDL requires a serializer for any custom type. The serializer for this type actually
+ # needs to know which format to use though, so requires an argument which is not currently
+ # supported by the IDL. We don't expect anyone to use the IDL to serialize the type, so we
+ # just provide a dummy serializer here.
+ serializer: ResumeToken::toBSON_do_not_use
deserializer: ResumeToken::parse
# The document key element in a resume token can be any BSON element, so we need a custom type
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index 2618a70186d..fd1e8cd3ce7 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -26,20 +26,59 @@
* it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include "mongo/db/pipeline/resume_token.h"
#include <boost/optional/optional_io.hpp>
+#include <limits>
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/pipeline/document_sources_gen.h"
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/storage/key_string.h"
+#include "mongo/util/hex.h"
namespace mongo {
constexpr StringData ResumeToken::kDataFieldName;
constexpr StringData ResumeToken::kTypeBitsFieldName;
+namespace {
+
+/**
+ * Returns a pair of values representing the key-string encoded data and the type bits respectively.
+ * Both are of type BinData, but if the type bits of the key string are all zeros then the second
+ * Value will be the missing value.
+ */
+std::pair<Value, Value> encodeInBinDataFormat(const ResumeTokenData& data) {
+ // In the legacy format we serialize clusterTime, then documentKey, then UUID.
+ BSONObjBuilder builder;
+ builder.append("", data.clusterTime);
+ data.documentKey.addToBsonObj(&builder, "");
+ if (data.uuid) {
+ if (data.documentKey.missing()) {
+ // Never allow a missing document key with a UUID present, as that will mess up
+ // the field order.
+ builder.appendNull("");
+ }
+ data.uuid->appendToBuilder(&builder, "");
+ }
+ auto keyObj = builder.obj();
+
+ // After writing all the pieces to an object, keystring-encode that object into binary.
+ KeyString encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj()));
+ const auto& typeBits = encodedToken.getTypeBits();
+
+ auto rawBinary =
+ BSONBinData(encodedToken.getBuffer(), encodedToken.getSize(), BinDataType::BinDataGeneral);
+ auto typeBitsValue = typeBits.isAllZeros()
+ ? Value()
+ : Value(BSONBinData(typeBits.getBuffer(), typeBits.getSize(), BinDataType::BinDataGeneral));
+ return {Value(rawBinary), typeBitsValue};
+}
+} // namespace
+
bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
return clusterTime == other.clusterTime &&
(Value::compare(this->documentKey, other.documentKey, nullptr) == 0) && uuid == other.uuid;
@@ -56,38 +95,47 @@ ResumeToken::ResumeToken(const Document& resumeDoc) {
uassert(40647,
str::stream() << "Bad resume token: _data of missing or of wrong type"
<< resumeDoc.toString(),
- _keyStringData.getType() == BinData &&
- _keyStringData.getBinData().type == BinDataGeneral);
+ (_keyStringData.getType() == BSONType::BinData &&
+ _keyStringData.getBinData().type == BinDataGeneral) ||
+ _keyStringData.getType() == BSONType::String);
uassert(40648,
str::stream() << "Bad resume token: _typeBits of wrong type" << resumeDoc.toString(),
- _typeBits.missing() ||
- (_typeBits.getType() == BinData && _typeBits.getBinData().type == BinDataGeneral));
+ _typeBits.missing() || (_typeBits.getType() == BSONType::BinData &&
+ _typeBits.getBinData().type == BinDataGeneral));
}
-// We encode the resume token as a KeyString with the sequence: clusterTime, documentKey, uuid.
+// We encode the resume token as a KeyString with the sequence: clusterTime, uuid, documentKey.
// Only the clusterTime is required.
ResumeToken::ResumeToken(const ResumeTokenData& data) {
BSONObjBuilder builder;
builder.append("", data.clusterTime);
- data.documentKey.addToBsonObj(&builder, "");
+ uassert(50788,
+ "Unexpected resume token with a documentKey but no UUID",
+ data.uuid || data.documentKey.missing());
+
if (data.uuid) {
- if (data.documentKey.missing()) {
- // Never allow a missing document key with a UUID present, as that will mess up
- // the field order.
- builder.appendNull("");
- }
data.uuid->appendToBuilder(&builder, "");
}
+ data.documentKey.addToBsonObj(&builder, "");
auto keyObj = builder.obj();
KeyString encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj()));
- _keyStringData = Value(
- BSONBinData(encodedToken.getBuffer(), encodedToken.getSize(), BinDataType::BinDataGeneral));
+ _keyStringData = Value(toHex(encodedToken.getBuffer(), encodedToken.getSize()));
const auto& typeBits = encodedToken.getTypeBits();
if (!typeBits.isAllZeros())
_typeBits = Value(
BSONBinData(typeBits.getBuffer(), typeBits.getSize(), BinDataType::BinDataGeneral));
}
+bool ResumeToken::operator==(const ResumeToken& other) const {
+ // '_keyStringData' is enough to determine equality. The type bits are used to unambiguously
+ // re-construct the original data, but we do not expect any two resume tokens to have the same
+ // data and different type bits, since that would imply they have (1) the same timestamp and (2)
+ // the same documentKey (possibly different types). This should not be possible because
+ // documents with the same documentKey should be on the same shard and therefore should have
+ // different timestamps.
+ return ValueComparator::kInstance.evaluate(_keyStringData == other._keyStringData);
+}
+
ResumeTokenData ResumeToken::getData() const {
KeyString::TypeBits typeBits(KeyString::Version::V1);
if (!_typeBits.missing()) {
@@ -95,7 +143,29 @@ ResumeTokenData ResumeToken::getData() const {
BufReader typeBitsReader(typeBitsBinData.data, typeBitsBinData.length);
typeBits.resetFromBuffer(&typeBitsReader);
}
- BSONBinData keyStringBinData = _keyStringData.getBinData();
+
+ // Accept either serialization format.
+ BufBuilder hexDecodeBuf; // Keep this in scope until we've decoded the bytes.
+ BSONBinData keyStringBinData{nullptr, 0, BinDataType::BinDataGeneral};
+ boost::optional<std::string> decodedString;
+ switch (_keyStringData.getType()) {
+ case BSONType::BinData: {
+ keyStringBinData = _keyStringData.getBinData();
+ break;
+ }
+ case BSONType::String: {
+ uassert(ErrorCodes::FailedToParse,
+ "resume token string was not a valid hex string",
+ isValidHex(_keyStringData.getStringData()));
+ fromHexString(_keyStringData.getStringData(), &hexDecodeBuf);
+ keyStringBinData = BSONBinData(
+ hexDecodeBuf.buf(), hexDecodeBuf.getSize(), BinDataType::BinDataGeneral);
+ break;
+ }
+ default:
+ // We validate the type at parse time.
+ MONGO_UNREACHABLE;
+ }
auto internalBson = KeyString::toBson(static_cast<const char*>(keyStringBinData.data),
keyStringBinData.length,
Ordering::make(BSONObj()),
@@ -105,47 +175,61 @@ ResumeTokenData ResumeToken::getData() const {
ResumeTokenData result;
uassert(40649, "invalid empty resume token", i.more());
result.clusterTime = i.next().timestamp();
- if (i.more())
- result.documentKey = Value(i.next());
- if (i.more())
- result.uuid = uassertStatusOK(UUID::parse(i.next()));
+ if (!i.more()) {
+ // There was nothing other than the timestamp.
+ return result;
+ }
+ switch (_keyStringData.getType()) {
+ case BSONType::BinData: {
+ // In the old format, the documentKey came first, then the UUID.
+ result.documentKey = Value(i.next());
+ if (i.more()) {
+ result.uuid = uassertStatusOK(UUID::parse(i.next()));
+ }
+ break;
+ }
+ case BSONType::String: {
+ // In the new format, the UUID comes first, then the documentKey.
+ result.uuid = uassertStatusOK(UUID::parse(i.next()));
+ if (i.more()) {
+ result.documentKey = Value(i.next());
+ }
+ break;
+ }
+ default: { MONGO_UNREACHABLE }
+ }
uassert(40646, "invalid oversized resume token", !i.more());
return result;
}
-int ResumeToken::compare(const ResumeToken& other) const {
- BSONBinData thisData = _keyStringData.getBinData();
- BSONBinData otherData = other._keyStringData.getBinData();
- return StringData(static_cast<const char*>(thisData.data), thisData.length)
- .compare(StringData(static_cast<const char*>(otherData.data), otherData.length));
-}
-
-bool ResumeToken::operator==(const ResumeToken& other) const {
- return compare(other) == 0;
-}
-
-bool ResumeToken::operator!=(const ResumeToken& other) const {
- return compare(other) != 0;
-}
-
-bool ResumeToken::operator<(const ResumeToken& other) const {
- return compare(other) < 0;
-}
-
-bool ResumeToken::operator<=(const ResumeToken& other) const {
- return compare(other) <= 0;
-}
-
-bool ResumeToken::operator>(const ResumeToken& other) const {
- return compare(other) > 0;
-}
-
-bool ResumeToken::operator>=(const ResumeToken& other) const {
- return compare(other) >= 0;
-}
+Document ResumeToken::toDocument(SerializationFormat format) const {
+ // In most cases we expect to be serializing in the same format we were given.
+ const auto dataType = _keyStringData.getType();
+ if ((dataType == BSONType::BinData && format == SerializationFormat::kBinData) ||
+ (dataType == BSONType::String && format == SerializationFormat::kHexString)) {
+ return Document{{kDataFieldName, _keyStringData}, {kTypeBitsFieldName, _typeBits}};
+ }
-Document ResumeToken::toDocument() const {
- return Document{{kDataFieldName, _keyStringData}, {kTypeBitsFieldName, _typeBits}};
+ // If we have to switch formats, then decompose the resume token into its pieces and
+ // re-construct a resume token in the new format.
+ auto data = getData();
+
+ switch (format) {
+ case SerializationFormat::kBinData: {
+ // Going from the three pieces of data into BinData requires special logic, since
+ // re-constructing a ResumeToken from 'data' will generate the new format.
+ Value rawBinary, typeBits;
+ std::tie(rawBinary, typeBits) = encodeInBinDataFormat(data);
+ return Document{{kDataFieldName, rawBinary}, {kTypeBitsFieldName, typeBits}};
+ }
+ case SerializationFormat::kHexString: {
+ // Constructing a new ResumeToken from the three pieces of data will generate a
+ // hex-encoded KeyString as the token.
+ const ResumeToken newResumeToken(data);
+ return newResumeToken.toDocument(format);
+ }
+ default: { MONGO_UNREACHABLE; }
+ }
}
ResumeToken ResumeToken::parse(const Document& resumeDoc) {
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index ba41270fbed..7d1bb3c432f 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -59,43 +59,64 @@ struct ResumeTokenData {
std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData);
/**
- * A token passed in by the user to indicate where in the oplog we should start for
- * $changeStream. This token has the following format:
- * {
- * _data: <binary data>,
- * _typeBits: <binary data>
- * }
- * The _data field data is encoded such that byte by byte comparisons provide the correct
- * ordering of tokens. The _typeBits field may be missing and should not affect token
- * comparison.
+ * A token passed in by the user to indicate where in the oplog we should start for $changeStream.
+ * This token has one of the following formats:
+ * 1. Using BinData:
+ * {
+ * _data: BinData - The keystring encoded resume token, in clusterTime, documentKey, UUID order.
+ * _typeBits: BinData - The keystring type bits used for deserialization.
+ * }
+ * 2. Using a hex-encoded string in a similar format:
+ * {
+ * _data: String, A hex encoding of the binary generated by keystring encoding the clusterTime,
+ * UUID, then documentKey in that order.
+ * _typeBits: BinData - The keystring type bits used for deserialization.
+ * }
+ * The _data field data is encoded such that string comparisons provide the correct ordering of
+ * tokens. Unlike the BinData, this can be sorted correctly using a MongoDB sort. BinData
+ * unfortunately orders by the length of the data first, then by the contents.
+ *
+ * In both cases, the _typeBits field may be missing and should not affect token comparison.
*/
-
class ResumeToken {
public:
+ enum class SerializationFormat {
+ kBinData,
+ kHexString,
+ };
+
+ constexpr static StringData kDataFieldName = "_data"_sd;
+ constexpr static StringData kTypeBitsFieldName = "_typeBits"_sd;
+
+ /**
+ * Parse a resume token from a BSON object; used as an interface to the IDL parser.
+ */
+ static ResumeToken parse(const BSONObj& resumeBson) {
+ return ResumeToken::parse(Document(resumeBson));
+ }
+
+ static ResumeToken parse(const Document& document);
+
/**
* The default no-argument constructor is required by the IDL for types used as non-optional
* fields.
*/
ResumeToken() = default;
- explicit ResumeToken(const ResumeTokenData& resumeValue);
-
- bool operator==(const ResumeToken&) const;
- bool operator!=(const ResumeToken&) const;
- bool operator<(const ResumeToken&) const;
- bool operator<=(const ResumeToken&) const;
- bool operator>(const ResumeToken&) const;
- bool operator>=(const ResumeToken&) const;
-
- /** Three way comparison, returns 0 if *this is equal to other, < 0 if *this is less than
- * other, and > 0 if *this is greater than other.
+ /**
+ * Parses 'resumeValue' into a ResumeToken using the hex-encoded string format.
*/
- int compare(const ResumeToken& other) const;
+ explicit ResumeToken(const ResumeTokenData& resumeValue);
- Document toDocument() const;
+ Document toDocument(SerializationFormat) const;
- BSONObj toBSON() const {
- return toDocument().toBson();
+ /**
+ * Because we use the IDL we require a serializer. However, the serialization format depends on
+ * the feature compatibility version, so a serializer without an argument doesn't make sense.
+ * This should never be used.
+ */
+ BSONObj toBSON_do_not_use() const {
+ MONGO_UNREACHABLE;
}
ResumeTokenData getData() const;
@@ -104,26 +125,26 @@ public:
return getData().clusterTime;
}
- /**
- * Parse a resume token from a BSON object; used as an interface to the IDL parser.
- */
- static ResumeToken parse(const BSONObj& resumeBson) {
- return ResumeToken::parse(Document(resumeBson));
+ bool operator==(const ResumeToken&) const;
+ bool operator!=(const ResumeToken& other) const {
+ return !(*this == other);
}
- static ResumeToken parse(const Document& document);
-
friend std::ostream& operator<<(std::ostream& out, const ResumeToken& token) {
return out << token.getData();
}
- constexpr static StringData kDataFieldName = "_data"_sd;
- constexpr static StringData kTypeBitsFieldName = "_typeBits"_sd;
-
private:
explicit ResumeToken(const Document& resumeData);
+ // This is either the BinData or the hex-encoded string encoding all the pieces of the
+ // resume token.
Value _keyStringData;
+
+ // Since we are using a KeyString encoding, we might lose some information about what the
+ // original types of the serialized values were. For example, the integer 2 and the double 2.0
+ // will generate the same KeyString. We keep the type bits around so we can deserialize without
+ // losing information.
Value _typeBits;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp
index 7aad15ccc4a..2fe8c0ac33c 100644
--- a/src/mongo/db/pipeline/resume_token_test.cpp
+++ b/src/mongo/db/pipeline/resume_token_test.cpp
@@ -28,6 +28,7 @@
#include "mongo/db/pipeline/resume_token.h"
+#include <algorithm>
#include <boost/optional/optional_io.hpp>
#include "mongo/db/pipeline/document.h"
@@ -38,6 +39,8 @@ namespace mongo {
namespace {
+using Format = ResumeToken::SerializationFormat;
+
TEST(ResumeToken, EncodesFullTokenFromData) {
Timestamp ts(1000, 2);
UUID testUuid = UUID::gen();
@@ -59,46 +62,82 @@ TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) {
ASSERT_EQ(resumeTokenDataIn, tokenData);
}
-TEST(ResumeToken, RoundTripThroughBsonFullToken) {
+TEST(ResumeToken, ShouldRoundTripThroughHexStringEncoding) {
Timestamp ts(1000, 2);
UUID testUuid = UUID::gen();
Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid);
- auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toBSON());
+
+ // Test serialization/parsing through Document.
+ auto rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString));
ResumeTokenData tokenData = rtToken.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
-}
-
-TEST(ResumeToken, RoundTripThroughBsonTimestampOnlyToken) {
- Timestamp ts(1001, 3);
- ResumeTokenData resumeTokenDataIn;
- resumeTokenDataIn.clusterTime = ts;
- auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toBSON());
- ResumeTokenData tokenData = rtToken.getData();
+ // Test serialization/parsing through BSON.
+ rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson());
+ tokenData = rtToken.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
}
-TEST(ResumeToken, RoundTripThroughDocumentFullToken) {
+TEST(ResumeToken, ShouldRoundTripThroughBinDataEncoding) {
Timestamp ts(1000, 2);
UUID testUuid = UUID::gen();
Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid);
- auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument());
+
+ // Test serialization/parsing through Document.
+ auto rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kBinData).toBson());
ResumeTokenData tokenData = rtToken.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
+
+ // Test serialization/parsing through BSON.
+ rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kBinData).toBson());
+ tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
}
-TEST(ResumeToken, RoundTripThroughDocumentTimestampOnlyToken) {
+TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexStringEncoding) {
Timestamp ts(1001, 3);
ResumeTokenData resumeTokenDataIn;
resumeTokenDataIn.clusterTime = ts;
- auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument());
+
+ // Test serialization/parsing through Document.
+ auto rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson());
ResumeTokenData tokenData = rtToken.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
+
+ // Test serialization/parsing through BSON.
+ rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson());
+ tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+}
+
+TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughBinDataEncoding) {
+ Timestamp ts(1001, 3);
+
+ ResumeTokenData resumeTokenDataIn;
+ resumeTokenDataIn.clusterTime = ts;
+
+ // Test serialization/parsing through Document.
+ auto rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kBinData).toBson());
+ ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+
+ // Test serialization/parsing through BSON.
+ rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kBinData).toBson());
+ tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
}
TEST(ResumeToken, TestMissingTypebitsOptimization) {
@@ -110,8 +149,8 @@ TEST(ResumeToken, TestMissingTypebitsOptimization) {
ResumeToken hasTypeBitsToken(hasTypeBitsData);
ResumeToken noTypeBitsToken(noTypeBitsData);
ASSERT_EQ(noTypeBitsToken, hasTypeBitsToken);
- auto hasTypeBitsDoc = hasTypeBitsToken.toDocument();
- auto noTypeBitsDoc = noTypeBitsToken.toDocument();
+ auto hasTypeBitsDoc = hasTypeBitsToken.toDocument(Format::kHexString);
+ auto noTypeBitsDoc = noTypeBitsToken.toDocument(Format::kHexString);
ASSERT_FALSE(hasTypeBitsDoc["_typeBits"].missing());
ASSERT_TRUE(noTypeBitsDoc["_typeBits"].missing()) << noTypeBitsDoc["_typeBits"];
auto rtHasTypeBitsData = ResumeToken::parse(hasTypeBitsDoc).getData();
@@ -122,150 +161,6 @@ TEST(ResumeToken, TestMissingTypebitsOptimization) {
ASSERT_EQ(BSONType::NumberInt, rtNoTypeBitsData.documentKey["_id"].getType());
}
-// Tests comparison functions for tokens constructed from oplog data.
-TEST(ResumeToken, CompareFromData) {
- Timestamp ts1(1000, 1);
- Timestamp ts2(1000, 2);
- Timestamp ts3(1001, 1);
- UUID testUuid = UUID::gen();
- UUID testUuid2 = UUID::gen();
- Document documentKey1a{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
- Document documentKey1b{{"_id"_sd, "stuff"_sd},
- {"otherkey"_sd, Document{{"otherstuff"_sd, 2.0}}}};
- Document documentKey2{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 3}}}};
- Document documentKey3{{"_id"_sd, "ztuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 0}}}};
-
- ResumeToken token1a(ResumeTokenData(ts1, Value(documentKey1a), testUuid));
- ResumeToken token1b(ResumeTokenData(ts1, Value(documentKey1b), testUuid));
-
- // Equivalent types don't matter.
- ASSERT_EQ(token1a, token1b);
- ASSERT_LTE(token1a, token1b);
- ASSERT_GTE(token1a, token1b);
-
- // UUIDs matter, but all that really matters is they compare unequal.
- ResumeToken tokenOtherCollection(ResumeTokenData(ts1, Value(documentKey1a), testUuid2));
- ASSERT_NE(token1a, tokenOtherCollection);
-
- ResumeToken token2(ResumeTokenData(ts1, Value(documentKey2), testUuid));
-
- // Document keys matter.
- ASSERT_LT(token1a, token2);
- ASSERT_LTE(token1a, token2);
- ASSERT_GT(token2, token1a);
- ASSERT_GTE(token2, token1a);
-
- ResumeToken token3(ResumeTokenData(ts1, Value(documentKey3), testUuid));
-
- // Order within document keys matters.
- ASSERT_LT(token1a, token3);
- ASSERT_LTE(token1a, token3);
- ASSERT_GT(token3, token1a);
- ASSERT_GTE(token3, token1a);
-
- ASSERT_LT(token2, token3);
- ASSERT_LTE(token2, token3);
- ASSERT_GT(token3, token2);
- ASSERT_GTE(token3, token2);
-
- ResumeToken token4(ResumeTokenData(ts2, Value(documentKey1a), testUuid));
-
- // Timestamps matter.
- ASSERT_LT(token1a, token4);
- ASSERT_LTE(token1a, token4);
- ASSERT_GT(token4, token1a);
- ASSERT_GTE(token4, token1a);
-
- // Timestamps matter more than document key.
- ASSERT_LT(token3, token4);
- ASSERT_LTE(token3, token4);
- ASSERT_GT(token4, token3);
- ASSERT_GTE(token4, token3);
-
- ResumeToken token5(ResumeTokenData(ts3, Value(documentKey1a), testUuid));
-
- // Time matters more than increment in timestamp
- ASSERT_LT(token4, token5);
- ASSERT_LTE(token4, token5);
- ASSERT_GT(token5, token4);
- ASSERT_GTE(token5, token4);
-}
-
-// Tests comparison functions for tokens constructed from the keystring-encoded form.
-TEST(ResumeToken, CompareFromEncodedData) {
- Timestamp ts1(1000, 1);
- Timestamp ts2(1000, 2);
- Timestamp ts3(1001, 1);
- UUID testUuid = UUID::gen();
- UUID testUuid2 = UUID::gen();
- Document documentKey1a{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
- Document documentKey1b{{"_id"_sd, "stuff"_sd},
- {"otherkey"_sd, Document{{"otherstuff"_sd, 2.0}}}};
- Document documentKey2{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 3}}}};
- Document documentKey3{{"_id"_sd, "ztuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 0}}}};
-
- auto token1a = ResumeToken::parse(
- ResumeToken(ResumeTokenData(ts1, Value(documentKey1a), testUuid)).toDocument());
- auto token1b = ResumeToken::parse(
- ResumeToken(ResumeTokenData(ts1, Value(documentKey1b), testUuid)).toDocument());
-
- // Equivalent types don't matter.
- ASSERT_EQ(token1a, token1b);
- ASSERT_LTE(token1a, token1b);
- ASSERT_GTE(token1a, token1b);
-
- // UUIDs matter, but all that really matters is they compare unequal.
- auto tokenOtherCollection = ResumeToken::parse(
- ResumeToken(ResumeTokenData(ts1, Value(documentKey1a), testUuid2)).toDocument());
- ASSERT_NE(token1a, tokenOtherCollection);
-
- auto token2 = ResumeToken::parse(
- ResumeToken(ResumeTokenData(ts1, Value(documentKey2), testUuid)).toDocument());
-
- // Document keys matter.
- ASSERT_LT(token1a, token2);
- ASSERT_LTE(token1a, token2);
- ASSERT_GT(token2, token1a);
- ASSERT_GTE(token2, token1a);
-
- auto token3 = ResumeToken::parse(
- ResumeToken(ResumeTokenData(ts1, Value(documentKey3), testUuid)).toDocument());
-
- // Order within document keys matters.
- ASSERT_LT(token1a, token3);
- ASSERT_LTE(token1a, token3);
- ASSERT_GT(token3, token1a);
- ASSERT_GTE(token3, token1a);
-
- ASSERT_LT(token2, token3);
- ASSERT_LTE(token2, token3);
- ASSERT_GT(token3, token2);
- ASSERT_GTE(token3, token2);
-
- auto token4 = ResumeToken::parse(
- ResumeToken(ResumeTokenData(ts2, Value(documentKey1a), testUuid)).toDocument());
-
- // Timestamps matter.
- ASSERT_LT(token1a, token4);
- ASSERT_LTE(token1a, token4);
- ASSERT_GT(token4, token1a);
- ASSERT_GTE(token4, token1a);
-
- // Timestamps matter more than document key.
- ASSERT_LT(token3, token4);
- ASSERT_LTE(token3, token4);
- ASSERT_GT(token4, token3);
- ASSERT_GTE(token4, token3);
-
- auto token5 = ResumeToken::parse(
- ResumeToken(ResumeTokenData(ts3, Value(documentKey1a), testUuid)).toDocument());
-
- // Time matters more than increment in timestamp
- ASSERT_LT(token4, token5);
- ASSERT_LTE(token4, token5);
- ASSERT_GT(token5, token4);
- ASSERT_GTE(token5, token4);
-}
TEST(ResumeToken, CorruptTokens) {
// Missing document.
@@ -273,17 +168,22 @@ TEST(ResumeToken, CorruptTokens) {
// Missing data field.
ASSERT_THROWS(ResumeToken::parse(Document{{"somefield"_sd, "stuff"_sd}}), AssertionException);
// Wrong type data field
- ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, "string"_sd}}), AssertionException);
+ ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, BSONNULL}}), AssertionException);
ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, 0}}), AssertionException);
// Valid data field, but wrong type typeBits.
Timestamp ts(1010, 4);
ResumeTokenData tokenData;
tokenData.clusterTime = ts;
- auto goodTokenDoc = ResumeToken(tokenData).toDocument();
- auto goodData = goodTokenDoc["_data"].getBinData();
+ auto goodTokenDocBinData = ResumeToken(tokenData).toDocument(Format::kBinData);
+ auto goodData = goodTokenDocBinData["_data"].getBinData();
ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, goodData}, {"_typeBits", "string"_sd}}),
AssertionException);
+ auto goodTokenDocString = ResumeToken(tokenData).toDocument(Format::kHexString);
+ auto goodString = goodTokenDocString["_data"].getString();
+ ASSERT_THROWS(
+ ResumeToken::parse(Document{{"_data"_sd, goodString}, {"_typeBits", "string"_sd}}),
+ AssertionException);
// Valid data except wrong bindata type.
ASSERT_THROWS(ResumeToken::parse(
@@ -301,16 +201,22 @@ TEST(ResumeToken, CorruptTokens) {
auto emptyToken =
ResumeToken::parse(Document{{"_data"_sd, BSONBinData(zeroes, 0, BinDataGeneral)}});
ASSERT_THROWS(emptyToken.getData(), AssertionException);
+ emptyToken = ResumeToken::parse(Document{{"_data"_sd, "string"_sd}});
+ ASSERT_THROWS(emptyToken.getData(), AssertionException);
// Data of correct type with a bunch of zeros.
auto zeroesToken =
ResumeToken::parse(Document{{"_data"_sd, BSONBinData(zeroes, 5, BinDataGeneral)}});
ASSERT_THROWS(emptyToken.getData(), AssertionException);
+ zeroesToken = ResumeToken::parse(Document{{"_data"_sd, "00000"_sd}});
+ ASSERT_THROWS(emptyToken.getData(), AssertionException);
- // Data of correct type with a bunch of nonsense
+ // Data of correct type with a bunch of nonsense.
auto nonsenseToken =
ResumeToken::parse(Document{{"_data"_sd, BSONBinData(nonsense, 5, BinDataGeneral)}});
ASSERT_THROWS(emptyToken.getData(), AssertionException);
+ nonsenseToken = ResumeToken::parse(Document{{"_data"_sd, "nonsense"_sd}});
+ ASSERT_THROWS(emptyToken.getData(), AssertionException);
// Valid data, bad typeBits; note that an all-zeros typebits is valid so it is not tested here.
auto badTypeBitsToken = ResumeToken::parse(
@@ -318,5 +224,68 @@ TEST(ResumeToken, CorruptTokens) {
ASSERT_THROWS(badTypeBitsToken.getData(), AssertionException);
}
+TEST(ResumeToken, StringEncodingSortsCorrectly) {
+ // Make sure that the string encoding of the resume tokens will compare in the correct order,
+ // namely timestamp, uuid, then documentKey.
+ Timestamp ts2_2(2, 2);
+ Timestamp ts10_4(10, 4);
+ Timestamp ts10_5(10, 5);
+ Timestamp ts11_3(11, 3);
+
+ // Generate two different UUIDs, and figure out which one is smaller. Store the smaller one in
+ // 'lower_uuid'.
+ UUID lower_uuid = UUID::gen();
+ UUID higher_uuid = UUID::gen();
+ if (lower_uuid > higher_uuid) {
+ std::swap(lower_uuid, higher_uuid);
+ }
+
+ auto assertLt = [](const ResumeTokenData& lower, const ResumeTokenData& higher) {
+ auto lowerString = ResumeToken(lower).toDocument(Format::kHexString)["_data"].getString();
+ auto higherString = ResumeToken(higher).toDocument(Format::kHexString)["_data"].getString();
+ ASSERT_LT(lowerString, higherString);
+ };
+
+ // Test using only Timestamps.
+ assertLt({ts2_2, Value(), boost::none}, {ts10_4, Value(), boost::none});
+ assertLt({ts2_2, Value(), boost::none}, {ts10_5, Value(), boost::none});
+ assertLt({ts2_2, Value(), boost::none}, {ts11_3, Value(), boost::none});
+ assertLt({ts10_4, Value(), boost::none}, {ts10_5, Value(), boost::none});
+ assertLt({ts10_4, Value(), boost::none}, {ts11_3, Value(), boost::none});
+ assertLt({ts10_5, Value(), boost::none}, {ts11_3, Value(), boost::none});
+
+ // Test that the Timestamp is more important than the UUID and documentKey.
+ assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_5, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_5, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_5, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", 0}}), higher_uuid},
+ {ts10_5, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_5, Value(Document{{"_id", 0}}), higher_uuid});
+
+ // Test that when the Timestamp is the same, the UUID breaks the tie.
+ assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts2_2, Value(Document{{"_id", 0}}), higher_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_4, Value(Document{{"_id", 0}}), higher_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_4, Value(Document{{"_id", 0}}), higher_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_4, Value(Document{{"_id", 2}}), higher_uuid});
+
+ // Test that when the Timestamp and the UUID are the same, the documentKey breaks the tie.
+ assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts2_2, Value(Document{{"_id", 1}}), lower_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_4, Value(Document{{"_id", 1}}), lower_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_4, Value(Document{{"_id", "string"_sd}}), lower_uuid});
+ assertLt({ts10_4, Value(Document{{"_id", BSONNULL}}), lower_uuid},
+ {ts10_4, Value(Document{{"_id", 0}}), lower_uuid});
+}
+
} // namspace
} // namspace mongo
diff --git a/src/mongo/util/hex.h b/src/mongo/util/hex.h
index 802d6b13737..382eb0bd86c 100644
--- a/src/mongo/util/hex.h
+++ b/src/mongo/util/hex.h
@@ -29,6 +29,8 @@
#pragma once
+#include <algorithm>
+#include <cctype>
#include <string>
#include "mongo/base/string_data.h"
@@ -53,6 +55,27 @@ inline char fromHex(StringData c) {
return (char)((fromHex(c[0]) << 4) | fromHex(c[1]));
}
+/**
+ * Decodes 'hexString' into raw bytes, appended to the out parameter 'buf'. Callers must first
+ * ensure that 'hexString' is a valid hex encoding.
+ */
+inline void fromHexString(StringData hexString, BufBuilder* buf) {
+ invariant(hexString.size() % 2 == 0);
+ // Combine every pair of two characters into one byte.
+ for (std::size_t i = 0; i < hexString.size(); i += 2) {
+ buf->appendChar(fromHex(StringData(&hexString.rawData()[i], 2)));
+ }
+}
+
+/**
+ * Returns true if 'hexString' is a valid hexadecimal encoding.
+ */
+inline bool isValidHex(StringData hexString) {
+ // There must be an even number of characters, since each pair encodes a single byte.
+ return hexString.size() % 2 == 0 &&
+ std::all_of(hexString.begin(), hexString.end(), [](char c) { return std::isxdigit(c); });
+}
+
inline std::string toHex(const void* inRaw, int len) {
static const char hexchars[] = "0123456789ABCDEF";