diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-09 14:12:05 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-13 16:18:35 -0400 |
commit | a820491e9402a52d7575157a9897306d49129370 (patch) | |
tree | 61ed25fdec3912a8fc1407bcb52380b110b697fa | |
parent | 4b894b4a55467c38bb7910317af00793b493de37 (diff) | |
download | mongo-a820491e9402a52d7575157a9897306d49129370.tar.gz |
SERVER-34313 Use hex-encoded string for resume token
15 files changed, 602 insertions, 332 deletions
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js index 64b7d0c698d..470b534aafa 100644 --- a/jstests/multiVersion/change_streams_feature_compatibility_version.js +++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js @@ -3,12 +3,13 @@ (function() { "use strict"; - load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. - load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. const rst = new ReplSetTest({ nodes: 2, - nodeOpts: {binVersion: "latest"}, + nodeOptions: {binVersion: "last-stable"}, }); if (!startSetIfSupportsReadMajority(rst)) { @@ -19,66 +20,216 @@ rst.initiate(); - const testDB = rst.getPrimary().getDB(jsTestName()); - const adminDB = rst.getPrimary().getDB("admin"); - - // Test both whole-db change streams and cluster-wide change streams. - const testCases = [{db: testDB, spec: {}}, {db: adminDB, spec: {allChangesForCluster: true}}]; - - for (let testCase of testCases) { - const coll = testDB[jsTestName()]; - coll.drop(); - - // Explicitly set feature compatibility version 4.0. - let res = adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"}); - assert.commandWorked(res); - const startAtTime = res.$clusterTime.clusterTime; - - // Open and test a change stream using 4.0 features. - const cst = new ChangeStreamTest(testCase.db); - - const multiCollectionCursor = - cst.startWatchingChanges({pipeline: [{$changeStream: testCase.spec}], collection: 1}); - const startAtSpec = - Object.assign({}, testCase.spec, {startAtClusterTime: {ts: startAtTime}}); - const startAtClusterTimeCursor = - cst.startWatchingChanges({pipeline: [{$changeStream: startAtSpec}], collection: 1}); - - assert.writeOK(coll.insert({_id: 0})); - let change = cst.getOneChange(multiCollectionCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 0); - - change = cst.getOneChange(startAtClusterTimeCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 0); - - // Set the feature compatibility version to 3.6. - assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"})); - - // An already created change stream should continue to work. - assert.writeOK(coll.insert({_id: 1})); - change = cst.getOneChange(multiCollectionCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 1); - - change = cst.getOneChange(startAtClusterTimeCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 1); - - // Creating a new change stream with a 4.0 feature should fail. - assert.commandFailedWithCode( - testCase.db.runCommand( - {aggregate: 1, pipeline: [{$changeStream: testCase.spec}], cursor: {}}), - ErrorCodes.QueryFeatureNotAllowed); - - assert.commandFailedWithCode(testDB.runCommand({ - aggregate: coll.getName(), - pipeline: [{$changeStream: {startAtClusterTime: {ts: startAtTime}}}], - cursor: {} - }), - ErrorCodes.QueryFeatureNotAllowed); + function assertResumeTokenUsesStringFormat(resumeToken) { + assert.neq(resumeToken._data, undefined); + assert.eq(typeof resumeToken._data, "string", tojson(resumeToken)); } + function assertResumeTokenUsesBinDataFormat(resumeToken) { + assert.neq(resumeToken._data, undefined); + assert(resumeToken._data instanceof BinData, tojson(resumeToken)); + } + + let testDB = rst.getPrimary().getDB(jsTestName()); + let adminDB = rst.getPrimary().getDB("admin"); + let coll = testDB[jsTestName()]; + + let cst = new ChangeStreamTest(testDB); + let adminCST = new ChangeStreamTest(adminDB); + + // We can't open a change stream on a non-existent database on last-stable, so we insert a dummy + // document to create the database. + // TODO BACKPORT-34138 Remove this check once the change has been backported. + assert.writeOK(testDB.dummy.insert({_id: "dummy"})); + + // Open a change stream against a 3.6 binary. We will use the resume token from this stream to + // resume the stream once the set has been upgraded. + let streamOnOldVersion = + cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: coll.getName()}); + assert.writeOK(coll.insert({_id: 0})); + + let change = cst.getOneChange(streamOnOldVersion); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 0); + + // Extract the resume token and test that it is using the old resume token format using + // BinData. + const resumeTokenFromLastStable = change._id; + assertResumeTokenUsesBinDataFormat(resumeTokenFromLastStable); + + // Test that new query features are banned on 3.6. + const failedResponse = assert.commandFailedWithCode( + testDB.runCommand({aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {}}), + ErrorCodes.InvalidNamespace); + assert.commandFailedWithCode( + adminDB.runCommand( + {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}}), + 40415); + + assert.commandFailedWithCode(testDB.runCommand({ + aggregate: coll.getName(), + pipeline: + [{$changeStream: {startAtClusterTime: {ts: failedResponse.$clusterTime.clusterTime}}}], + cursor: {} + }), + 40415); + + // Upgrade the set to the new binary version, but keep the feature compatibility version at + // 3.6. + rst.upgradeSet({binVersion: "latest"}); + testDB = rst.getPrimary().getDB(jsTestName()); + adminDB = rst.getPrimary().getDB("admin"); + coll = testDB[jsTestName()]; + cst = new ChangeStreamTest(testDB); + adminCST = new ChangeStreamTest(adminDB); + + // Test that we can resume the stream on the new binaries. + streamOnOldVersion = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromLastStable}}], + collection: coll.getName() + }); + + assert.writeOK(coll.insert({_id: 1})); + + change = cst.getOneChange(streamOnOldVersion); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 1); + + // Test that the stream is still using the old resume token format using BinData. + assertResumeTokenUsesBinDataFormat(change._id); + + // Explicitly set feature compatibility version 4.0. Remember the cluster time from that + // response to use later. + const startTime = + assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"})) + .$clusterTime.clusterTime; + + // Test that we can now use 4.0 features to open a stream. + const wholeDbCursor = + cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + const wholeClusterCursor = adminCST.startWatchingChanges( + {pipeline: [{$changeStream: {allChangesForCluster: true}}], collection: 1}); + const cursorStartedWithTime = cst.startWatchingChanges({ + pipeline: [{$changeStream: {startAtClusterTime: {ts: startTime}}}], + collection: coll.getName() + }); + + assert.writeOK(coll.insert({_id: 2})); + + // Test that the stream opened in FCV 3.6 continues to work and still generates tokens in the + // old format. + change = cst.getOneChange(streamOnOldVersion); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 2); + assertResumeTokenUsesBinDataFormat(change._id); + + // Test all the newly created streams can see an insert. + change = cst.getOneChange(wholeDbCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 2); + + change = adminCST.getOneChange(wholeClusterCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 2); + + change = cst.getOneChange(cursorStartedWithTime); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 2); + + // Test that the resume token is using the new format and has type string while FCV is 4.0. + const resumeToken = change._id; + assertResumeTokenUsesStringFormat(resumeToken); + + // Test that we can resume with the resume token with either format, either against the entire + // DB, the entire cluster, or against the single collection. + assert.doesNotThrow(() => cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromLastStable}}], + collection: 1 + })); + assert.doesNotThrow(() => adminCST.startWatchingChanges({ + pipeline: + [{$changeStream: {allChangesForCluster: true, resumeAfter: resumeTokenFromLastStable}}], + collection: 1 + })); + assert.doesNotThrow(() => cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromLastStable}}], + collection: coll.getName() + })); + assert.doesNotThrow( + () => cst.startWatchingChanges( + {pipeline: [{$changeStream: {resumeAfter: resumeToken}}], collection: 1})); + assert.doesNotThrow(() => adminCST.startWatchingChanges({ + pipeline: [{$changeStream: {allChangesForCluster: true, resumeAfter: resumeToken}}], + collection: 1 + })); + assert.doesNotThrow( + () => cst.startWatchingChanges( + {pipeline: [{$changeStream: {resumeAfter: resumeToken}}], collection: coll.getName()})); + + // Set the feature compatibility version to 3.6. + assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"})); + + // Test that existing streams continue, but still generate resume tokens in the new format. + assert.writeOK(coll.insert({_id: 3})); + change = cst.getOneChange(wholeDbCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 3); + assertResumeTokenUsesStringFormat(change._id); + + change = adminCST.getOneChange(wholeClusterCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 3); + assertResumeTokenUsesStringFormat(change._id); + + change = cst.getOneChange(cursorStartedWithTime); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 3); + assertResumeTokenUsesStringFormat(change._id); + + // Creating a new change stream with a 4.0 feature should fail. + assert.commandFailedWithCode( + testDB.runCommand({aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {}}), + ErrorCodes.QueryFeatureNotAllowed); + assert.commandFailedWithCode( + adminDB.runCommand( + {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}}), + ErrorCodes.QueryFeatureNotAllowed); + + assert.commandFailedWithCode(testDB.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {startAtClusterTime: {ts: startTime}}}], + cursor: {} + }), + ErrorCodes.QueryFeatureNotAllowed); + + // Test that resuming a change stream opened on FCV 4.0 should continue to work, though a + // whole-db or whole-cluster cursor cannot be resumed. + assert.commandFailedWithCode( + testDB.runCommand( + {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}}), + ErrorCodes.QueryFeatureNotAllowed); + assert.commandFailedWithCode(adminDB.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {allChangesForCluster: true, resumeAfter: resumeToken}}], + cursor: {} + }), + ErrorCodes.QueryFeatureNotAllowed); + let resumedOnFCV36With40BinaryResumeToken; + assert.doesNotThrow(() => { + resumedOnFCV36With40BinaryResumeToken = coll.watch([], {resumeAfter: resumeToken}); + }); + assert.soon(() => resumedOnFCV36With40BinaryResumeToken.hasNext()); + change = resumedOnFCV36With40BinaryResumeToken.next(); + assertResumeTokenUsesBinDataFormat(change._id); + + // Test that resuming a change stream with the original resume token still works. + let resumedWith36Token; + assert.doesNotThrow(() => { + resumedWith36Token = coll.watch([], {resumeAfter: resumeTokenFromLastStable}); + }); + assert.soon(() => resumedWith36Token.hasNext()); + change = resumedWith36Token.next(); + assertResumeTokenUsesBinDataFormat(change._id); + rst.stopSet(); }()); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 6fe2428c088..70b69b14cda 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -371,6 +371,7 @@ namespace { */ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec, + ServerGlobalParams::FeatureCompatibility::Version fcv, intrusive_ptr<DocumentSource>* resumeStageOut, boost::optional<Timestamp>* startFromOut) { if (!expCtx->inMongos) { @@ -419,8 +420,7 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, !(*resumeStageOut) || (!resumeAfterClusterTime && !startAtClusterTime)); if (resumeAfterClusterTime) { - if (serverGlobalParams.featureCompatibility.getVersion() >= - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) { + if (fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) { warning() << "The '$_resumeAfterClusterTime' option is deprecated, please use " "'startAtClusterTime' instead."; } @@ -434,8 +434,7 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, "feature compatibility version. See " << feature_compatibility_version_documentation::kCompatibilityLink << " for more information.", - serverGlobalParams.featureCompatibility.getVersion() >= - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40); + fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40); uassert(50573, str::stream() << "Do not specify both " @@ -466,12 +465,13 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), elem.embeddedObject()); + const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); // Make sure that it is legal to run this $changeStream before proceeding. - DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec); + DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec, fcv); boost::optional<Timestamp> startFrom; intrusive_ptr<DocumentSource> resumeStage = nullptr; - parseResumeOptions(expCtx, spec, &resumeStage, &startFrom); + parseResumeOptions(expCtx, spec, fcv, &resumeStage, &startFrom); auto fullDocOption = spec.getFullDocument(); uassert(40575, @@ -495,7 +495,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( buildMatchFilter(expCtx, *startFrom, startFromInclusive), expCtx)); } - stages.push_back(createTransformationStage(elem.embeddedObject(), expCtx)); + stages.push_back(createTransformationStage(expCtx, elem.embeddedObject(), fcv)); if (resumeStage) { stages.push_back(resumeStage); } @@ -536,7 +536,9 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or } void DocumentSourceChangeStream::assertIsLegalSpecification( - const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { + const intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec, + ServerGlobalParams::FeatureCompatibility::Version fcv) { // Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in // test mode. // TODO SERVER-34283: remove once whole-database $changeStream is feature-complete. @@ -552,8 +554,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( << feature_compatibility_version_documentation::kCompatibilityLink << " for more information.", !expCtx->ns.isCollectionlessAggregateNS() || - serverGlobalParams.featureCompatibility.getVersion() >= - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40); + fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40); // If 'allChangesForCluster' is true, the stream must be opened on the 'admin' database with // {aggregate: 1}. @@ -580,11 +581,13 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( } intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage( - BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) { + const intrusive_ptr<ExpressionContext>& expCtx, + BSONObj changeStreamSpec, + ServerGlobalParams::FeatureCompatibility::Version fcv) { // Mark the transformation stage as independent of any collection if the change stream is // watching all collections in the database. const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS(); return intrusive_ptr<DocumentSource>(new DocumentSourceChangeStreamTransform( - expCtx, changeStreamSpec, isIndependentOfAnyCollection)); + expCtx, changeStreamSpec, fcv, isIndependentOfAnyCollection)); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index bdd384da4b9..ca4b579b165 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -33,6 +33,7 @@ #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_sources_gen.h" #include "mongo/db/pipeline/field_path.h" +#include "mongo/db/pipeline/resume_token.h" namespace mongo { @@ -161,7 +162,9 @@ public: BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); static boost::intrusive_ptr<DocumentSource> createTransformationStage( - BSONObj changeStreamSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + BSONObj changeStreamSpec, + ServerGlobalParams::FeatureCompatibility::Version fcv); /** * Given a BSON object containing an aggregation command with a $changeStream stage, and a @@ -185,7 +188,8 @@ private: // For instance, whether it is permitted to run given the current FCV, whether the namespace is // valid for the options specified in the spec, etc. static void assertIsLegalSpecification(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceChangeStreamSpec& spec); + const DocumentSourceChangeStreamSpec& spec, + ServerGlobalParams::FeatureCompatibility::Version fcv); // It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson() // instead. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 45f2c3373d4..cdbbf4d8b86 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -184,7 +184,7 @@ public: tokenData.documentKey = docKey; if (!uuid.missing()) tokenData.uuid = uuid.getUuid(); - return ResumeToken(tokenData).toDocument(); + return ResumeToken(tokenData).toDocument(ResumeToken::SerializationFormat::kHexString); } /** diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 16301b7fcd5..50620f898bd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -86,9 +86,14 @@ bool isOpTypeRelevant(const Document& d) { DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj changeStreamSpec, + ServerGlobalParams::FeatureCompatibility::Version fcv, bool isIndependentOfAnyCollection) : DocumentSource(expCtx), _changeStreamSpec(changeStreamSpec.getOwned()), + _resumeTokenFormat( + fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40 + ? ResumeToken::SerializationFormat::kHexString + : ResumeToken::SerializationFormat::kBinData), _isIndependentOfAnyCollection(isIndependentOfAnyCollection) { if (expCtx->ns.isCollectionlessAggregateNS()) { @@ -296,7 +301,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document } doc.addField(DocumentSourceChangeStream::kIdField, - Value(ResumeToken(resumeTokenData).toDocument())); + Value(ResumeToken(resumeTokenData).toDocument(_resumeTokenFormat))); doc.addField(DocumentSourceChangeStream::kOperationTypeField, Value(operationType)); doc.addField(DocumentSourceChangeStream::kClusterTimeField, Value(resumeTokenData.clusterTime)); diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 61e6bfb25eb..5ae0ea3fc6f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -40,6 +40,7 @@ class DocumentSourceChangeStreamTransform : public DocumentSource { public: DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj changeStreamSpec, + ServerGlobalParams::FeatureCompatibility::Version fcv, bool isIndependentOfAnyCollection); Document applyTransformation(const Document& input); DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; @@ -111,6 +112,8 @@ private: BSONObj _changeStreamSpec; + ResumeToken::SerializationFormat _resumeTokenFormat; + // Map of collection UUID to document key fields. std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index a81d3083c8f..6b5e386051b 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -181,7 +181,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() uassert(40585, str::stream() << "resume of change stream was not possible, as the resume token was not found. " - << ResumeToken::parse(documentFromResumedStream["_id"].getDocument()).toBSON(), + << documentFromResumedStream["_id"].getDocument().toString(), _resumeStatus != ResumeStatus::kCannotResume); // If we reach this point, then we've seen the resume token. diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 530d6b13129..c216c81b52d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -66,7 +66,9 @@ protected: */ void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) { _mock->queue.push_back( - Document{{"_id", ResumeToken(ResumeTokenData(ts, Value(docKey), uuid)).toDocument()}}); + Document{{"_id", + ResumeToken(ResumeTokenData(ts, Value(docKey), uuid)) + .toDocument(ResumeToken::SerializationFormat::kHexString)}}); } /** * Pushes a document with a resume token corresponding to the given timestamp, _id string, and diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index b90ca65f594..15d236fd409 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -68,10 +68,10 @@ public: if (id.missing()) { ResumeTokenData tokenData; tokenData.clusterTime = ts; - return ResumeToken(tokenData).toDocument(); + return ResumeToken(tokenData).toDocument(ResumeToken::SerializationFormat::kHexString); } return ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), testUuid())) - .toDocument(); + .toDocument(ResumeToken::SerializationFormat::kHexString); } }; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 32efd87cdda..3c27f9a90ab 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -64,6 +64,7 @@ const auto kExplain = ExplainOptions::Verbosity::kQueryPlanner; class ReplDocumentSourceLookUpTest : public DocumentSourceLookUpTest { public: void setUp() override { + Test::setUp(); // Will establish a feature compatibility version. auto service = getExpCtx()->opCtx->getServiceContext(); repl::ReplSettings settings; diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl index df287295d2c..90a74777e70 100644 --- a/src/mongo/db/pipeline/document_sources.idl +++ b/src/mongo/db/pipeline/document_sources.idl @@ -42,7 +42,11 @@ types: bson_serialization_type: object description: An object representing a resume token for a change stream cpp_type: ResumeToken - serializer: ResumeToken::toBSON + # The IDL requires a serializer for any custom type. The serializer for this type actually + # needs to know which format to use though, so requires an argument which is not currently + # supported by the IDL. We don't expect anyone to use the IDL to serialize the type, so we + # just provide a dummy serializer here. + serializer: ResumeToken::toBSON_do_not_use deserializer: ResumeToken::parse # The document key element in a resume token can be any BSON element, so we need a custom type diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index 2618a70186d..fd1e8cd3ce7 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -26,20 +26,59 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/pipeline/resume_token.h" #include <boost/optional/optional_io.hpp> +#include <limits> #include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/pipeline/document_sources_gen.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/storage/key_string.h" +#include "mongo/util/hex.h" namespace mongo { constexpr StringData ResumeToken::kDataFieldName; constexpr StringData ResumeToken::kTypeBitsFieldName; +namespace { + +/** + * Returns a pair of values representing the key-string encoded data and the type bits respectively. + * Both are of type BinData, but if the type bits of the key string are all zeros then the second + * Value will be the missing value. + */ +std::pair<Value, Value> encodeInBinDataFormat(const ResumeTokenData& data) { + // In the legacy format we serialize clusterTime, then documentKey, then UUID. + BSONObjBuilder builder; + builder.append("", data.clusterTime); + data.documentKey.addToBsonObj(&builder, ""); + if (data.uuid) { + if (data.documentKey.missing()) { + // Never allow a missing document key with a UUID present, as that will mess up + // the field order. + builder.appendNull(""); + } + data.uuid->appendToBuilder(&builder, ""); + } + auto keyObj = builder.obj(); + + // After writing all the pieces to an object, keystring-encode that object into binary. + KeyString encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj())); + const auto& typeBits = encodedToken.getTypeBits(); + + auto rawBinary = + BSONBinData(encodedToken.getBuffer(), encodedToken.getSize(), BinDataType::BinDataGeneral); + auto typeBitsValue = typeBits.isAllZeros() + ? Value() + : Value(BSONBinData(typeBits.getBuffer(), typeBits.getSize(), BinDataType::BinDataGeneral)); + return {Value(rawBinary), typeBitsValue}; +} +} // namespace + bool ResumeTokenData::operator==(const ResumeTokenData& other) const { return clusterTime == other.clusterTime && (Value::compare(this->documentKey, other.documentKey, nullptr) == 0) && uuid == other.uuid; @@ -56,38 +95,47 @@ ResumeToken::ResumeToken(const Document& resumeDoc) { uassert(40647, str::stream() << "Bad resume token: _data of missing or of wrong type" << resumeDoc.toString(), - _keyStringData.getType() == BinData && - _keyStringData.getBinData().type == BinDataGeneral); + (_keyStringData.getType() == BSONType::BinData && + _keyStringData.getBinData().type == BinDataGeneral) || + _keyStringData.getType() == BSONType::String); uassert(40648, str::stream() << "Bad resume token: _typeBits of wrong type" << resumeDoc.toString(), - _typeBits.missing() || - (_typeBits.getType() == BinData && _typeBits.getBinData().type == BinDataGeneral)); + _typeBits.missing() || (_typeBits.getType() == BSONType::BinData && + _typeBits.getBinData().type == BinDataGeneral)); } -// We encode the resume token as a KeyString with the sequence: clusterTime, documentKey, uuid. +// We encode the resume token as a KeyString with the sequence: clusterTime, uuid, documentKey. // Only the clusterTime is required. ResumeToken::ResumeToken(const ResumeTokenData& data) { BSONObjBuilder builder; builder.append("", data.clusterTime); - data.documentKey.addToBsonObj(&builder, ""); + uassert(50788, + "Unexpected resume token with a documentKey but no UUID", + data.uuid || data.documentKey.missing()); + if (data.uuid) { - if (data.documentKey.missing()) { - // Never allow a missing document key with a UUID present, as that will mess up - // the field order. - builder.appendNull(""); - } data.uuid->appendToBuilder(&builder, ""); } + data.documentKey.addToBsonObj(&builder, ""); auto keyObj = builder.obj(); KeyString encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj())); - _keyStringData = Value( - BSONBinData(encodedToken.getBuffer(), encodedToken.getSize(), BinDataType::BinDataGeneral)); + _keyStringData = Value(toHex(encodedToken.getBuffer(), encodedToken.getSize())); const auto& typeBits = encodedToken.getTypeBits(); if (!typeBits.isAllZeros()) _typeBits = Value( BSONBinData(typeBits.getBuffer(), typeBits.getSize(), BinDataType::BinDataGeneral)); } +bool ResumeToken::operator==(const ResumeToken& other) const { + // '_keyStringData' is enough to determine equality. The type bits are used to unambiguously + // re-construct the original data, but we do not expect any two resume tokens to have the same + // data and different type bits, since that would imply they have (1) the same timestamp and (2) + // the same documentKey (possibly different types). This should not be possible because + // documents with the same documentKey should be on the same shard and therefore should have + // different timestamps. + return ValueComparator::kInstance.evaluate(_keyStringData == other._keyStringData); +} + ResumeTokenData ResumeToken::getData() const { KeyString::TypeBits typeBits(KeyString::Version::V1); if (!_typeBits.missing()) { @@ -95,7 +143,29 @@ ResumeTokenData ResumeToken::getData() const { BufReader typeBitsReader(typeBitsBinData.data, typeBitsBinData.length); typeBits.resetFromBuffer(&typeBitsReader); } - BSONBinData keyStringBinData = _keyStringData.getBinData(); + + // Accept either serialization format. + BufBuilder hexDecodeBuf; // Keep this in scope until we've decoded the bytes. + BSONBinData keyStringBinData{nullptr, 0, BinDataType::BinDataGeneral}; + boost::optional<std::string> decodedString; + switch (_keyStringData.getType()) { + case BSONType::BinData: { + keyStringBinData = _keyStringData.getBinData(); + break; + } + case BSONType::String: { + uassert(ErrorCodes::FailedToParse, + "resume token string was not a valid hex string", + isValidHex(_keyStringData.getStringData())); + fromHexString(_keyStringData.getStringData(), &hexDecodeBuf); + keyStringBinData = BSONBinData( + hexDecodeBuf.buf(), hexDecodeBuf.getSize(), BinDataType::BinDataGeneral); + break; + } + default: + // We validate the type at parse time. + MONGO_UNREACHABLE; + } auto internalBson = KeyString::toBson(static_cast<const char*>(keyStringBinData.data), keyStringBinData.length, Ordering::make(BSONObj()), @@ -105,47 +175,61 @@ ResumeTokenData ResumeToken::getData() const { ResumeTokenData result; uassert(40649, "invalid empty resume token", i.more()); result.clusterTime = i.next().timestamp(); - if (i.more()) - result.documentKey = Value(i.next()); - if (i.more()) - result.uuid = uassertStatusOK(UUID::parse(i.next())); + if (!i.more()) { + // There was nothing other than the timestamp. + return result; + } + switch (_keyStringData.getType()) { + case BSONType::BinData: { + // In the old format, the documentKey came first, then the UUID. + result.documentKey = Value(i.next()); + if (i.more()) { + result.uuid = uassertStatusOK(UUID::parse(i.next())); + } + break; + } + case BSONType::String: { + // In the new format, the UUID comes first, then the documentKey. + result.uuid = uassertStatusOK(UUID::parse(i.next())); + if (i.more()) { + result.documentKey = Value(i.next()); + } + break; + } + default: { MONGO_UNREACHABLE } + } uassert(40646, "invalid oversized resume token", !i.more()); return result; } -int ResumeToken::compare(const ResumeToken& other) const { - BSONBinData thisData = _keyStringData.getBinData(); - BSONBinData otherData = other._keyStringData.getBinData(); - return StringData(static_cast<const char*>(thisData.data), thisData.length) - .compare(StringData(static_cast<const char*>(otherData.data), otherData.length)); -} - -bool ResumeToken::operator==(const ResumeToken& other) const { - return compare(other) == 0; -} - -bool ResumeToken::operator!=(const ResumeToken& other) const { - return compare(other) != 0; -} - -bool ResumeToken::operator<(const ResumeToken& other) const { - return compare(other) < 0; -} - -bool ResumeToken::operator<=(const ResumeToken& other) const { - return compare(other) <= 0; -} - -bool ResumeToken::operator>(const ResumeToken& other) const { - return compare(other) > 0; -} - -bool ResumeToken::operator>=(const ResumeToken& other) const { - return compare(other) >= 0; -} +Document ResumeToken::toDocument(SerializationFormat format) const { + // In most cases we expect to be serializing in the same format we were given. + const auto dataType = _keyStringData.getType(); + if ((dataType == BSONType::BinData && format == SerializationFormat::kBinData) || + (dataType == BSONType::String && format == SerializationFormat::kHexString)) { + return Document{{kDataFieldName, _keyStringData}, {kTypeBitsFieldName, _typeBits}}; + } -Document ResumeToken::toDocument() const { - return Document{{kDataFieldName, _keyStringData}, {kTypeBitsFieldName, _typeBits}}; + // If we have to switch formats, then decompose the resume token into its pieces and + // re-construct a resume token in the new format. + auto data = getData(); + + switch (format) { + case SerializationFormat::kBinData: { + // Going from the three pieces of data into BinData requires special logic, since + // re-constructing a ResumeToken from 'data' will generate the new format. + Value rawBinary, typeBits; + std::tie(rawBinary, typeBits) = encodeInBinDataFormat(data); + return Document{{kDataFieldName, rawBinary}, {kTypeBitsFieldName, typeBits}}; + } + case SerializationFormat::kHexString: { + // Constructing a new ResumeToken from the three pieces of data will generate a + // hex-encoded KeyString as the token. + const ResumeToken newResumeToken(data); + return newResumeToken.toDocument(format); + } + default: { MONGO_UNREACHABLE; } + } } ResumeToken ResumeToken::parse(const Document& resumeDoc) { diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index ba41270fbed..7d1bb3c432f 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -59,43 +59,64 @@ struct ResumeTokenData { std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData); /** - * A token passed in by the user to indicate where in the oplog we should start for - * $changeStream. This token has the following format: - * { - * _data: <binary data>, - * _typeBits: <binary data> - * } - * The _data field data is encoded such that byte by byte comparisons provide the correct - * ordering of tokens. The _typeBits field may be missing and should not affect token - * comparison. + * A token passed in by the user to indicate where in the oplog we should start for $changeStream. + * This token has one of the following formats: + * 1. Using BinData: + * { + * _data: BinData - The keystring encoded resume token, in clusterTime, documentKey, UUID order. + * _typeBits: BinData - The keystring type bits used for deserialization. + * } + * 2. Using a hex-encoded string in a similar format: + * { + * _data: String, A hex encoding of the binary generated by keystring encoding the clusterTime, + * UUID, then documentKey in that order. + * _typeBits: BinData - The keystring type bits used for deserialization. + * } + * The _data field data is encoded such that string comparisons provide the correct ordering of + * tokens. Unlike the BinData, this can be sorted correctly using a MongoDB sort. BinData + * unfortunately orders by the length of the data first, then by the contents. + * + * In both cases, the _typeBits field may be missing and should not affect token comparison. */ - class ResumeToken { public: + enum class SerializationFormat { + kBinData, + kHexString, + }; + + constexpr static StringData kDataFieldName = "_data"_sd; + constexpr static StringData kTypeBitsFieldName = "_typeBits"_sd; + + /** + * Parse a resume token from a BSON object; used as an interface to the IDL parser. + */ + static ResumeToken parse(const BSONObj& resumeBson) { + return ResumeToken::parse(Document(resumeBson)); + } + + static ResumeToken parse(const Document& document); + /** * The default no-argument constructor is required by the IDL for types used as non-optional * fields. */ ResumeToken() = default; - explicit ResumeToken(const ResumeTokenData& resumeValue); - - bool operator==(const ResumeToken&) const; - bool operator!=(const ResumeToken&) const; - bool operator<(const ResumeToken&) const; - bool operator<=(const ResumeToken&) const; - bool operator>(const ResumeToken&) const; - bool operator>=(const ResumeToken&) const; - - /** Three way comparison, returns 0 if *this is equal to other, < 0 if *this is less than - * other, and > 0 if *this is greater than other. + /** + * Parses 'resumeValue' into a ResumeToken using the hex-encoded string format. */ - int compare(const ResumeToken& other) const; + explicit ResumeToken(const ResumeTokenData& resumeValue); - Document toDocument() const; + Document toDocument(SerializationFormat) const; - BSONObj toBSON() const { - return toDocument().toBson(); + /** + * Because we use the IDL we require a serializer. However, the serialization format depends on + * the feature compatibility version, so a serializer without an argument doesn't make sense. + * This should never be used. + */ + BSONObj toBSON_do_not_use() const { + MONGO_UNREACHABLE; } ResumeTokenData getData() const; @@ -104,26 +125,26 @@ public: return getData().clusterTime; } - /** - * Parse a resume token from a BSON object; used as an interface to the IDL parser. - */ - static ResumeToken parse(const BSONObj& resumeBson) { - return ResumeToken::parse(Document(resumeBson)); + bool operator==(const ResumeToken&) const; + bool operator!=(const ResumeToken& other) const { + return !(*this == other); } - static ResumeToken parse(const Document& document); - friend std::ostream& operator<<(std::ostream& out, const ResumeToken& token) { return out << token.getData(); } - constexpr static StringData kDataFieldName = "_data"_sd; - constexpr static StringData kTypeBitsFieldName = "_typeBits"_sd; - private: explicit ResumeToken(const Document& resumeData); + // This is either the BinData or the hex-encoded string encoding all the pieces of the + // resume token. Value _keyStringData; + + // Since we are using a KeyString encoding, we might lose some information about what the + // original types of the serialized values were. For example, the integer 2 and the double 2.0 + // will generate the same KeyString. We keep the type bits around so we can deserialize without + // losing information. Value _typeBits; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp index 7aad15ccc4a..2fe8c0ac33c 100644 --- a/src/mongo/db/pipeline/resume_token_test.cpp +++ b/src/mongo/db/pipeline/resume_token_test.cpp @@ -28,6 +28,7 @@ #include "mongo/db/pipeline/resume_token.h" +#include <algorithm> #include <boost/optional/optional_io.hpp> #include "mongo/db/pipeline/document.h" @@ -38,6 +39,8 @@ namespace mongo { namespace { +using Format = ResumeToken::SerializationFormat; + TEST(ResumeToken, EncodesFullTokenFromData) { Timestamp ts(1000, 2); UUID testUuid = UUID::gen(); @@ -59,46 +62,82 @@ TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) { ASSERT_EQ(resumeTokenDataIn, tokenData); } -TEST(ResumeToken, RoundTripThroughBsonFullToken) { +TEST(ResumeToken, ShouldRoundTripThroughHexStringEncoding) { Timestamp ts(1000, 2); UUID testUuid = UUID::gen(); Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid); - auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toBSON()); + + // Test serialization/parsing through Document. + auto rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString)); ResumeTokenData tokenData = rtToken.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); -} - -TEST(ResumeToken, RoundTripThroughBsonTimestampOnlyToken) { - Timestamp ts(1001, 3); - ResumeTokenData resumeTokenDataIn; - resumeTokenDataIn.clusterTime = ts; - auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toBSON()); - ResumeTokenData tokenData = rtToken.getData(); + // Test serialization/parsing through BSON. + rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson()); + tokenData = rtToken.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); } -TEST(ResumeToken, RoundTripThroughDocumentFullToken) { +TEST(ResumeToken, ShouldRoundTripThroughBinDataEncoding) { Timestamp ts(1000, 2); UUID testUuid = UUID::gen(); Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid); - auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument()); + + // Test serialization/parsing through Document. + auto rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kBinData).toBson()); ResumeTokenData tokenData = rtToken.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); + + // Test serialization/parsing through BSON. + rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kBinData).toBson()); + tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); } -TEST(ResumeToken, RoundTripThroughDocumentTimestampOnlyToken) { +TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexStringEncoding) { Timestamp ts(1001, 3); ResumeTokenData resumeTokenDataIn; resumeTokenDataIn.clusterTime = ts; - auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument()); + + // Test serialization/parsing through Document. + auto rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson()); ResumeTokenData tokenData = rtToken.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); + + // Test serialization/parsing through BSON. + rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson()); + tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); +} + +TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughBinDataEncoding) { + Timestamp ts(1001, 3); + + ResumeTokenData resumeTokenDataIn; + resumeTokenDataIn.clusterTime = ts; + + // Test serialization/parsing through Document. + auto rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kBinData).toBson()); + ResumeTokenData tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); + + // Test serialization/parsing through BSON. + rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kBinData).toBson()); + tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); } TEST(ResumeToken, TestMissingTypebitsOptimization) { @@ -110,8 +149,8 @@ TEST(ResumeToken, TestMissingTypebitsOptimization) { ResumeToken hasTypeBitsToken(hasTypeBitsData); ResumeToken noTypeBitsToken(noTypeBitsData); ASSERT_EQ(noTypeBitsToken, hasTypeBitsToken); - auto hasTypeBitsDoc = hasTypeBitsToken.toDocument(); - auto noTypeBitsDoc = noTypeBitsToken.toDocument(); + auto hasTypeBitsDoc = hasTypeBitsToken.toDocument(Format::kHexString); + auto noTypeBitsDoc = noTypeBitsToken.toDocument(Format::kHexString); ASSERT_FALSE(hasTypeBitsDoc["_typeBits"].missing()); ASSERT_TRUE(noTypeBitsDoc["_typeBits"].missing()) << noTypeBitsDoc["_typeBits"]; auto rtHasTypeBitsData = ResumeToken::parse(hasTypeBitsDoc).getData(); @@ -122,150 +161,6 @@ TEST(ResumeToken, TestMissingTypebitsOptimization) { ASSERT_EQ(BSONType::NumberInt, rtNoTypeBitsData.documentKey["_id"].getType()); } -// Tests comparison functions for tokens constructed from oplog data. -TEST(ResumeToken, CompareFromData) { - Timestamp ts1(1000, 1); - Timestamp ts2(1000, 2); - Timestamp ts3(1001, 1); - UUID testUuid = UUID::gen(); - UUID testUuid2 = UUID::gen(); - Document documentKey1a{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; - Document documentKey1b{{"_id"_sd, "stuff"_sd}, - {"otherkey"_sd, Document{{"otherstuff"_sd, 2.0}}}}; - Document documentKey2{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 3}}}}; - Document documentKey3{{"_id"_sd, "ztuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 0}}}}; - - ResumeToken token1a(ResumeTokenData(ts1, Value(documentKey1a), testUuid)); - ResumeToken token1b(ResumeTokenData(ts1, Value(documentKey1b), testUuid)); - - // Equivalent types don't matter. - ASSERT_EQ(token1a, token1b); - ASSERT_LTE(token1a, token1b); - ASSERT_GTE(token1a, token1b); - - // UUIDs matter, but all that really matters is they compare unequal. - ResumeToken tokenOtherCollection(ResumeTokenData(ts1, Value(documentKey1a), testUuid2)); - ASSERT_NE(token1a, tokenOtherCollection); - - ResumeToken token2(ResumeTokenData(ts1, Value(documentKey2), testUuid)); - - // Document keys matter. - ASSERT_LT(token1a, token2); - ASSERT_LTE(token1a, token2); - ASSERT_GT(token2, token1a); - ASSERT_GTE(token2, token1a); - - ResumeToken token3(ResumeTokenData(ts1, Value(documentKey3), testUuid)); - - // Order within document keys matters. - ASSERT_LT(token1a, token3); - ASSERT_LTE(token1a, token3); - ASSERT_GT(token3, token1a); - ASSERT_GTE(token3, token1a); - - ASSERT_LT(token2, token3); - ASSERT_LTE(token2, token3); - ASSERT_GT(token3, token2); - ASSERT_GTE(token3, token2); - - ResumeToken token4(ResumeTokenData(ts2, Value(documentKey1a), testUuid)); - - // Timestamps matter. - ASSERT_LT(token1a, token4); - ASSERT_LTE(token1a, token4); - ASSERT_GT(token4, token1a); - ASSERT_GTE(token4, token1a); - - // Timestamps matter more than document key. - ASSERT_LT(token3, token4); - ASSERT_LTE(token3, token4); - ASSERT_GT(token4, token3); - ASSERT_GTE(token4, token3); - - ResumeToken token5(ResumeTokenData(ts3, Value(documentKey1a), testUuid)); - - // Time matters more than increment in timestamp - ASSERT_LT(token4, token5); - ASSERT_LTE(token4, token5); - ASSERT_GT(token5, token4); - ASSERT_GTE(token5, token4); -} - -// Tests comparison functions for tokens constructed from the keystring-encoded form. -TEST(ResumeToken, CompareFromEncodedData) { - Timestamp ts1(1000, 1); - Timestamp ts2(1000, 2); - Timestamp ts3(1001, 1); - UUID testUuid = UUID::gen(); - UUID testUuid2 = UUID::gen(); - Document documentKey1a{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; - Document documentKey1b{{"_id"_sd, "stuff"_sd}, - {"otherkey"_sd, Document{{"otherstuff"_sd, 2.0}}}}; - Document documentKey2{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 3}}}}; - Document documentKey3{{"_id"_sd, "ztuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 0}}}}; - - auto token1a = ResumeToken::parse( - ResumeToken(ResumeTokenData(ts1, Value(documentKey1a), testUuid)).toDocument()); - auto token1b = ResumeToken::parse( - ResumeToken(ResumeTokenData(ts1, Value(documentKey1b), testUuid)).toDocument()); - - // Equivalent types don't matter. - ASSERT_EQ(token1a, token1b); - ASSERT_LTE(token1a, token1b); - ASSERT_GTE(token1a, token1b); - - // UUIDs matter, but all that really matters is they compare unequal. - auto tokenOtherCollection = ResumeToken::parse( - ResumeToken(ResumeTokenData(ts1, Value(documentKey1a), testUuid2)).toDocument()); - ASSERT_NE(token1a, tokenOtherCollection); - - auto token2 = ResumeToken::parse( - ResumeToken(ResumeTokenData(ts1, Value(documentKey2), testUuid)).toDocument()); - - // Document keys matter. - ASSERT_LT(token1a, token2); - ASSERT_LTE(token1a, token2); - ASSERT_GT(token2, token1a); - ASSERT_GTE(token2, token1a); - - auto token3 = ResumeToken::parse( - ResumeToken(ResumeTokenData(ts1, Value(documentKey3), testUuid)).toDocument()); - - // Order within document keys matters. - ASSERT_LT(token1a, token3); - ASSERT_LTE(token1a, token3); - ASSERT_GT(token3, token1a); - ASSERT_GTE(token3, token1a); - - ASSERT_LT(token2, token3); - ASSERT_LTE(token2, token3); - ASSERT_GT(token3, token2); - ASSERT_GTE(token3, token2); - - auto token4 = ResumeToken::parse( - ResumeToken(ResumeTokenData(ts2, Value(documentKey1a), testUuid)).toDocument()); - - // Timestamps matter. - ASSERT_LT(token1a, token4); - ASSERT_LTE(token1a, token4); - ASSERT_GT(token4, token1a); - ASSERT_GTE(token4, token1a); - - // Timestamps matter more than document key. - ASSERT_LT(token3, token4); - ASSERT_LTE(token3, token4); - ASSERT_GT(token4, token3); - ASSERT_GTE(token4, token3); - - auto token5 = ResumeToken::parse( - ResumeToken(ResumeTokenData(ts3, Value(documentKey1a), testUuid)).toDocument()); - - // Time matters more than increment in timestamp - ASSERT_LT(token4, token5); - ASSERT_LTE(token4, token5); - ASSERT_GT(token5, token4); - ASSERT_GTE(token5, token4); -} TEST(ResumeToken, CorruptTokens) { // Missing document. @@ -273,17 +168,22 @@ TEST(ResumeToken, CorruptTokens) { // Missing data field. ASSERT_THROWS(ResumeToken::parse(Document{{"somefield"_sd, "stuff"_sd}}), AssertionException); // Wrong type data field - ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, "string"_sd}}), AssertionException); + ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, BSONNULL}}), AssertionException); ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, 0}}), AssertionException); // Valid data field, but wrong type typeBits. Timestamp ts(1010, 4); ResumeTokenData tokenData; tokenData.clusterTime = ts; - auto goodTokenDoc = ResumeToken(tokenData).toDocument(); - auto goodData = goodTokenDoc["_data"].getBinData(); + auto goodTokenDocBinData = ResumeToken(tokenData).toDocument(Format::kBinData); + auto goodData = goodTokenDocBinData["_data"].getBinData(); ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, goodData}, {"_typeBits", "string"_sd}}), AssertionException); + auto goodTokenDocString = ResumeToken(tokenData).toDocument(Format::kHexString); + auto goodString = goodTokenDocString["_data"].getString(); + ASSERT_THROWS( + ResumeToken::parse(Document{{"_data"_sd, goodString}, {"_typeBits", "string"_sd}}), + AssertionException); // Valid data except wrong bindata type. ASSERT_THROWS(ResumeToken::parse( @@ -301,16 +201,22 @@ TEST(ResumeToken, CorruptTokens) { auto emptyToken = ResumeToken::parse(Document{{"_data"_sd, BSONBinData(zeroes, 0, BinDataGeneral)}}); ASSERT_THROWS(emptyToken.getData(), AssertionException); + emptyToken = ResumeToken::parse(Document{{"_data"_sd, "string"_sd}}); + ASSERT_THROWS(emptyToken.getData(), AssertionException); // Data of correct type with a bunch of zeros. auto zeroesToken = ResumeToken::parse(Document{{"_data"_sd, BSONBinData(zeroes, 5, BinDataGeneral)}}); ASSERT_THROWS(emptyToken.getData(), AssertionException); + zeroesToken = ResumeToken::parse(Document{{"_data"_sd, "00000"_sd}}); + ASSERT_THROWS(emptyToken.getData(), AssertionException); - // Data of correct type with a bunch of nonsense + // Data of correct type with a bunch of nonsense. auto nonsenseToken = ResumeToken::parse(Document{{"_data"_sd, BSONBinData(nonsense, 5, BinDataGeneral)}}); ASSERT_THROWS(emptyToken.getData(), AssertionException); + nonsenseToken = ResumeToken::parse(Document{{"_data"_sd, "nonsense"_sd}}); + ASSERT_THROWS(emptyToken.getData(), AssertionException); // Valid data, bad typeBits; note that an all-zeros typebits is valid so it is not tested here. auto badTypeBitsToken = ResumeToken::parse( @@ -318,5 +224,68 @@ TEST(ResumeToken, CorruptTokens) { ASSERT_THROWS(badTypeBitsToken.getData(), AssertionException); } +TEST(ResumeToken, StringEncodingSortsCorrectly) { + // Make sure that the string encoding of the resume tokens will compare in the correct order, + // namely timestamp, uuid, then documentKey. + Timestamp ts2_2(2, 2); + Timestamp ts10_4(10, 4); + Timestamp ts10_5(10, 5); + Timestamp ts11_3(11, 3); + + // Generate two different UUIDs, and figure out which one is smaller. Store the smaller one in + // 'lower_uuid'. + UUID lower_uuid = UUID::gen(); + UUID higher_uuid = UUID::gen(); + if (lower_uuid > higher_uuid) { + std::swap(lower_uuid, higher_uuid); + } + + auto assertLt = [](const ResumeTokenData& lower, const ResumeTokenData& higher) { + auto lowerString = ResumeToken(lower).toDocument(Format::kHexString)["_data"].getString(); + auto higherString = ResumeToken(higher).toDocument(Format::kHexString)["_data"].getString(); + ASSERT_LT(lowerString, higherString); + }; + + // Test using only Timestamps. + assertLt({ts2_2, Value(), boost::none}, {ts10_4, Value(), boost::none}); + assertLt({ts2_2, Value(), boost::none}, {ts10_5, Value(), boost::none}); + assertLt({ts2_2, Value(), boost::none}, {ts11_3, Value(), boost::none}); + assertLt({ts10_4, Value(), boost::none}, {ts10_5, Value(), boost::none}); + assertLt({ts10_4, Value(), boost::none}, {ts11_3, Value(), boost::none}); + assertLt({ts10_5, Value(), boost::none}, {ts11_3, Value(), boost::none}); + + // Test that the Timestamp is more important than the UUID and documentKey. + assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_5, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_5, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_5, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts10_4, Value(Document{{"_id", 0}}), higher_uuid}, + {ts10_5, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_5, Value(Document{{"_id", 0}}), higher_uuid}); + + // Test that when the Timestamp is the same, the UUID breaks the tie. + assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid}, + {ts2_2, Value(Document{{"_id", 0}}), higher_uuid}); + assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_4, Value(Document{{"_id", 0}}), higher_uuid}); + assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_4, Value(Document{{"_id", 0}}), higher_uuid}); + assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_4, Value(Document{{"_id", 2}}), higher_uuid}); + + // Test that when the Timestamp and the UUID are the same, the documentKey breaks the tie. + assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid}, + {ts2_2, Value(Document{{"_id", 1}}), lower_uuid}); + assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_4, Value(Document{{"_id", 1}}), lower_uuid}); + assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_4, Value(Document{{"_id", "string"_sd}}), lower_uuid}); + assertLt({ts10_4, Value(Document{{"_id", BSONNULL}}), lower_uuid}, + {ts10_4, Value(Document{{"_id", 0}}), lower_uuid}); +} + } // namspace } // namspace mongo diff --git a/src/mongo/util/hex.h b/src/mongo/util/hex.h index 802d6b13737..382eb0bd86c 100644 --- a/src/mongo/util/hex.h +++ b/src/mongo/util/hex.h @@ -29,6 +29,8 @@ #pragma once +#include <algorithm> +#include <cctype> #include <string> #include "mongo/base/string_data.h" @@ -53,6 +55,27 @@ inline char fromHex(StringData c) { return (char)((fromHex(c[0]) << 4) | fromHex(c[1])); } +/** + * Decodes 'hexString' into raw bytes, appended to the out parameter 'buf'. Callers must first + * ensure that 'hexString' is a valid hex encoding. + */ +inline void fromHexString(StringData hexString, BufBuilder* buf) { + invariant(hexString.size() % 2 == 0); + // Combine every pair of two characters into one byte. + for (std::size_t i = 0; i < hexString.size(); i += 2) { + buf->appendChar(fromHex(StringData(&hexString.rawData()[i], 2))); + } +} + +/** + * Returns true if 'hexString' is a valid hexadecimal encoding. + */ +inline bool isValidHex(StringData hexString) { + // There must be an even number of characters, since each pair encodes a single byte. + return hexString.size() % 2 == 0 && + std::all_of(hexString.begin(), hexString.end(), [](char c) { return std::isxdigit(c); }); +} + inline std::string toHex(const void* inRaw, int len) { static const char hexchars[] = "0123456789ABCDEF"; |