diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-10-03 16:22:37 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-10-05 10:20:35 -0400 |
commit | 316a341735b2ffd12ee203581ac0f736a6aaef88 (patch) | |
tree | 956dc9e0c0e6b2ee530182c18544af20f3cb887f /src/mongo/db | |
parent | 47b62c3fdd712209dbe48fdf3928901304624320 (diff) | |
download | mongo-316a341735b2ffd12ee203581ac0f736a6aaef88.tar.gz |
SERVER-29716 Keystring-encode ResumeTokens to allow bytewise comparisons
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.h | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token_test.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_sources.idl | 35 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.cpp | 133 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.h | 83 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token_test.cpp | 322 | ||||
-rw-r--r-- | src/mongo/db/pipeline/value.h | 8 |
13 files changed, 575 insertions, 152 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index ecf9c52a5f2..af543bfc8b3 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -533,7 +533,19 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/storage/key_string', '$BUILD_DIR/mongo/idl/idl_parser', 'document_value', ], ) + +env.CppUnitTest( + target='resume_token_test', + source='resume_token_test.cpp', + LIBDEPS=[ + '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + 'document_sources_idl', + 'document_source_lookup', + ], +) diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index fced8514c8c..e9b49de2c47 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -272,19 +272,20 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( elem.embeddedObject()); if (auto resumeAfter = spec.getResumeAfter()) { ResumeToken token = resumeAfter.get(); - auto resumeNamespace = UUIDCatalog::get(expCtx->opCtx).lookupNSSByUUID(token.getUuid()); + ResumeTokenData tokenData = token.getData(); + uassert(40645, + "The resume token is invalid (no UUID), possibly from an invalidate.", + tokenData.uuid); + auto resumeNamespace = + UUIDCatalog::get(expCtx->opCtx).lookupNSSByUUID(tokenData.uuid.get()); uassert(40615, "The resume token UUID does not exist. Has the collection been dropped?", !resumeNamespace.isEmpty()); - startFrom = token.getTimestamp(); + startFrom = tokenData.clusterTime; if (expCtx->needsMerge) { - DocumentSourceShardCheckResumabilitySpec spec; - spec.setResumeToken(std::move(token)); - resumeStage = DocumentSourceShardCheckResumability::create(expCtx, std::move(spec)); + resumeStage = DocumentSourceShardCheckResumability::create(expCtx, std::move(token)); } else { - DocumentSourceEnsureResumeTokenPresentSpec spec; - spec.setResumeToken(std::move(token)); - resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(spec)); + resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token)); } } const bool changeStreamIsResuming = resumeStage != nullptr; @@ -414,10 +415,12 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D // Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will // not appear in the output. - Document resumeToken{{kClusterTimeField, Document{{kTimestampField, ts}}}, - {kUuidField, uuid}, - {kDocumentKeyField, documentKey}}; - doc.addField(kIdField, Value(resumeToken)); + ResumeTokenData resumeTokenData; + resumeTokenData.clusterTime = ts.getTimestamp(); + resumeTokenData.documentKey = documentKey; + if (!uuid.missing()) + resumeTokenData.uuid = uuid.getUuid(); + doc.addField(kIdField, Value(ResumeToken(resumeTokenData).toDocument())); doc.addField(kOperationTypeField, Value(operationType)); // "invalidate" and "retryNeeded" entries have fewer fields. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index ab5d9a11e86..989a331a1a4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -155,10 +155,12 @@ public: Document makeResumeToken(Timestamp ts, ImplicitValue uuid = Value(), ImplicitValue docKey = Value()) { - if (docKey.missing()) { - return {{"clusterTime", D{{"ts", ts}}}, {"uuid", uuid}}; - } - return {{"clusterTime", D{{"ts", ts}}}, {"uuid", uuid}, {"documentKey", docKey}}; + ResumeTokenData tokenData; + tokenData.clusterTime = ts; + tokenData.documentKey = docKey; + if (!uuid.missing()) + tokenData.uuid = uuid.getUuid(); + return ResumeToken(tokenData).toDocument(); } /** diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 435f464af8f..94d42e2920d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -45,13 +45,13 @@ Value DocumentSourceEnsureResumeTokenPresent::serialize( intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx, - DocumentSourceEnsureResumeTokenPresentSpec spec) { - return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(spec)); + ResumeToken token) { + return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token)); } DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent( - const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceEnsureResumeTokenPresentSpec spec) - : DocumentSource(expCtx), _token(spec.getResumeToken()), _seenDoc(false) {} + const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token) + : DocumentSource(expCtx), _token(std::move(token)), _seenDoc(false) {} DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() { pExpCtx->checkForInterrupt(); @@ -67,7 +67,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() _seenDoc = true; auto doc = nextInput.getDocument(); - ResumeToken receivedToken(doc["_id"]); + auto receivedToken = ResumeToken::parse(doc["_id"].getDocument()); uassert(40585, str::stream() << "resume of change stream was not possible, as the resume token was not found. " @@ -89,14 +89,14 @@ Value DocumentSourceShardCheckResumability::serialize( } intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create( - const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceShardCheckResumabilitySpec spec) { - return new DocumentSourceShardCheckResumability(expCtx, std::move(spec)); + const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token) { + return new DocumentSourceShardCheckResumability(expCtx, std::move(token)); } DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability( - const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceShardCheckResumabilitySpec spec) + const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token) : DocumentSourceNeedsMongoProcessInterface(expCtx), - _token(spec.getResumeToken()), + _token(std::move(token)), _verifiedResumability(false) {} DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { @@ -110,7 +110,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { if (nextInput.isAdvanced()) { auto doc = nextInput.getDocument(); - ResumeToken receivedToken(doc["_id"]); + auto receivedToken = ResumeToken::parse(doc["_id"].getDocument()); if (receivedToken == _token) { // Pass along the document, as the DocumentSourceEnsureResumeTokenPresent stage on the // merger will @@ -129,7 +129,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { uassert(40576, "resume of change notification was not possible, as the resume point may no longer " "be in the oplog. ", - firstOplogEntry["ts"].getTimestamp() < _token.getTimestamp()); + firstOplogEntry["ts"].getTimestamp() < _token.getData().clusterTime); return nextInput; } // Very unusual case: the oplog is empty. We can always resume. It should never be possible diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 2ff83bb2b35..5fedb8bb444 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -33,9 +33,6 @@ #include "mongo/db/pipeline/resume_token.h" namespace mongo { -// Currently the two resume sources take the same specification. -typedef DocumentSourceEnsureResumeTokenPresentSpec DocumentSourceShardCheckResumabilitySpec; - /** * This checks for resumability on a single shard in the sharded case. The rules are * @@ -71,15 +68,14 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - DocumentSourceShardCheckResumabilitySpec spec); + const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token); private: /** * Use the create static method to create a DocumentSourceShardCheckResumability. */ DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx, - DocumentSourceShardCheckResumabilitySpec spec); + ResumeToken token); ResumeToken _token; bool _verifiedResumability; @@ -111,9 +107,7 @@ public: * be at any shard. */ boost::intrusive_ptr<DocumentSource> getShardSource() final { - DocumentSourceShardCheckResumabilitySpec shardSpec; - shardSpec.setResumeToken(_token); - return DocumentSourceShardCheckResumability::create(pExpCtx, shardSpec); + return DocumentSourceShardCheckResumability::create(pExpCtx, _token); }; boost::intrusive_ptr<DocumentSource> getMergeSource() final { @@ -123,8 +117,7 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - DocumentSourceEnsureResumeTokenPresentSpec spec); + const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token); const ResumeToken& getTokenForTest() { return _token; @@ -135,7 +128,7 @@ private: * Use the create static method to create a DocumentSourceEnsureResumeTokenPresent. */ DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx, - DocumentSourceEnsureResumeTokenPresentSpec spec); + ResumeToken token); ResumeToken _token; bool _seenDoc; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 2ea54327516..8d3c922917a 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -64,10 +64,9 @@ protected: * namespace in the mock queue. */ void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) { - _mock->queue.push_back(Document{{"_id", - Document{{"clusterTime", Document{{"ts", ts}}}, - {"uuid", uuid}, - {"documentKey", Document{{"_id", id}}}}}}); + _mock->queue.push_back(Document{ + {"_id", + ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), uuid)).toDocument()}}); } void addPause() { @@ -79,12 +78,8 @@ protected: */ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( Timestamp ts, StringData id, UUID uuid = testUuid()) { - auto token = ResumeToken::parse(BSON("clusterTime" << BSON("ts" << ts) << "uuid" << uuid - << "documentKey" - << BSON("_id" << id))); - DocumentSourceEnsureResumeTokenPresentSpec spec; - spec.setResumeToken(token); - auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), spec); + ResumeToken token(ResumeTokenData(ts, Value(Document{{"_id", id}}), uuid)); + auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), token); checkResumeToken->setSource(_mock.get()); return checkResumeToken; } @@ -105,13 +100,9 @@ class ShardCheckResumabilityTest : public CheckResumeTokenTest { protected: intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability( Timestamp ts, StringData id, UUID uuid = testUuid()) { - auto token = ResumeToken::parse(BSON("clusterTime" << BSON("ts" << ts) << "uuid" << uuid - << "documentKey" - << BSON("_id" << id))); - DocumentSourceShardCheckResumabilitySpec spec; - spec.setResumeToken(token); + ResumeToken token(ResumeTokenData(ts, Value(Document{{"_id", id}}), uuid)); auto shardCheckResumability = - DocumentSourceShardCheckResumability::create(getExpCtx(), spec); + DocumentSourceShardCheckResumability::create(getExpCtx(), token); shardCheckResumability->setSource(_mock.get()); return shardCheckResumability; } @@ -153,8 +144,7 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { auto result1 = checkResumeToken->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); - ASSERT_VALUE_EQ(Value(Document{{"ts", doc1Timestamp}}), - doc1["_id"].getDocument()["clusterTime"]); + ASSERT_EQ(doc1Timestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -172,13 +162,11 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) auto result1 = checkResumeToken->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); - ASSERT_VALUE_EQ(Value(Document{{"ts", doc1Timestamp}}), - doc1["_id"].getDocument()["clusterTime"]); + ASSERT_EQ(doc1Timestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); auto result2 = checkResumeToken->getNext(); ASSERT_TRUE(result2.isAdvanced()); auto& doc2 = result2.getDocument(); - ASSERT_VALUE_EQ(Value(Document{{"ts", doc2Timestamp}}), - doc2["_id"].getDocument()["clusterTime"]); + ASSERT_EQ(doc2Timestamp, ResumeToken::parse(doc2["_id"].getDocument()).getData().clusterTime); ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } @@ -278,7 +266,7 @@ TEST_F(ShardCheckResumabilityTest, auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); - ASSERT_VALUE_EQ(Value(resumeTimestamp), doc["_id"]["clusterTime"]["ts"]); + ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } TEST_F(ShardCheckResumabilityTest, @@ -295,7 +283,7 @@ TEST_F(ShardCheckResumabilityTest, auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); - ASSERT_VALUE_EQ(Value(resumeTimestamp), doc["_id"]["clusterTime"]["ts"]); + ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIsEmpty) { @@ -310,7 +298,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIs auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); - ASSERT_VALUE_EQ(Value(resumeTimestamp), doc["_id"]["clusterTime"]["ts"]); + ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } TEST_F(ShardCheckResumabilityTest, @@ -363,7 +351,7 @@ TEST_F(ShardCheckResumabilityTest, auto result = shardCheckResumability->getNext(); ASSERT_TRUE(result.isAdvanced()); auto& doc = result.getDocument(); - ASSERT_VALUE_EQ(Value(docTimestamp), doc["_id"]["clusterTime"]["ts"]); + ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime); } TEST_F(ShardCheckResumabilityTest, @@ -394,7 +382,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) { auto result1 = shardCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); - ASSERT_VALUE_EQ(Value(docTimestamp), doc1["_id"]["clusterTime"]["ts"]); + ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); mockOplog = {Document{{"ts", oplogFutureTimestamp}}}; shardCheckResumability->injectMongoProcessInterface( @@ -418,7 +406,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAf auto result1 = shardCheckResumability->getNext(); ASSERT_TRUE(result1.isAdvanced()); auto& doc1 = result1.getDocument(); - ASSERT_VALUE_EQ(Value(docTimestamp), doc1["_id"]["clusterTime"]["ts"]); + ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime); auto result2 = shardCheckResumability->getNext(); ASSERT_TRUE(result2.isEOF()); } diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index c14d4fcbc4b..9535ad95bb2 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -98,11 +98,12 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat auto matchSpec = BSON("$match" << documentKey); // Extract the UUID from resume token and do change stream lookups by UUID. - ResumeToken resumeToken(updateOp[DocumentSourceChangeStream::kIdField]); + auto resumeToken = + ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument()); // TODO SERVER-29134 we need to extract the namespace from the document and set them on the new // ExpressionContext if we're getting notifications from an entire database. - auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getUuid()); + auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getData().uuid); auto pipelineStatus = _mongoProcessInterface->makePipeline({matchSpec}, foreignExpCtx); if (pipelineStatus.getStatus() == ErrorCodes::NamespaceNotFound) { // We couldn't find the collection with UUID, it may have been dropped. diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 0c31852a993..d9db1db38ce 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -66,11 +66,12 @@ public: Document makeResumeToken(ImplicitValue id = Value()) { const Timestamp ts(100, 1); if (id.missing()) { - return {{"clusterTime", Document{{"ts", ts}}}, {"uuid", testUuid()}}; + ResumeTokenData tokenData; + tokenData.clusterTime = ts; + return ResumeToken(tokenData).toDocument(); } - return {{"clusterTime", Document{{"ts", ts}}}, - {"uuid", testUuid()}, - {"documentKey", Document{{"_id", id}}}}; + return ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), testUuid())) + .toDocument(); } }; diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl index f7f19df40bb..ef427a711f4 100644 --- a/src/mongo/db/pipeline/document_sources.idl +++ b/src/mongo/db/pipeline/document_sources.idl @@ -35,8 +35,9 @@ imports: - "mongo/idl/basic_types.idl" types: - # A resume token could be parsed as a struct, but since we may make it opaque in the future, we - # parse it as a type with a custom class now. + # A resume token is an opaque document we return to the user that contains all the information + # needed to resume a stream where they left off. It also provides the ordering of streams + # from multiple shards. resumeToken: bson_serialization_type: object description: An object representing a resume token for a change stream @@ -72,16 +73,6 @@ structs: description: A string '"updateLookup"' or '"default"', indicating whether or not we should return a full document or just changes for an update. - - DocumentSourceEnsureResumeTokenPresentSpec: - description: A document used to specify the internal stage which checks the presence of the - resume token. - fields: - resumeToken: - cpp_name: resumeToken - type: resumeToken - description: The resume token which is required to be present in the pipeline. - ResumeTokenClusterTime: description: The IDL type of cluster time fields: @@ -107,23 +98,3 @@ structs: users: type: array<ListSessionsUser> optional: true - - ResumeTokenInternal: - description: The internal format of a resume token. For use by the ResumeToken class - only. - fields: - clusterTime: - cpp_name: clusterTime - type: ResumeTokenClusterTime - description: The timestamp of the oplog entry represented by this resume token. - - uuid: - cpp_name: uuid - type: uuid - description: The UUID of the oplog entry represented by this resume token. - - documentKey: - cpp_name: documentKey - type: resumeTokenDocumentKey - description: The document key of the document in the oplog entry represented by this - resume token. diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index d9a1f3376b9..2618a70186d 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -28,51 +28,128 @@ #include "mongo/db/pipeline/resume_token.h" +#include <boost/optional/optional_io.hpp> + #include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_sources_gen.h" #include "mongo/db/pipeline/value_comparator.h" +#include "mongo/db/storage/key_string.h" namespace mongo { +constexpr StringData ResumeToken::kDataFieldName; +constexpr StringData ResumeToken::kTypeBitsFieldName; -ResumeToken::ResumeToken(const BSONObj& resumeBson) { - auto token = - ResumeTokenInternal::parse(IDLParserErrorContext("$changeStream.resumeAfter"), resumeBson); - _timestamp = token.getClusterTime().getTimestamp(); - _uuid = token.getUuid(); - _documentId = token.getDocumentKey(); +bool ResumeTokenData::operator==(const ResumeTokenData& other) const { + return clusterTime == other.clusterTime && + (Value::compare(this->documentKey, other.documentKey, nullptr) == 0) && uuid == other.uuid; } -ResumeToken::ResumeToken(const Value& resumeValue) { - Document resumeTokenDoc = resumeValue.getDocument(); - Value clusterTime = resumeTokenDoc[ResumeTokenInternal::kClusterTimeFieldName]; - Value timestamp = clusterTime[ResumeTokenClusterTime::kTimestampFieldName]; - _timestamp = timestamp.getTimestamp(); - Value uuid = resumeTokenDoc[ResumeTokenInternal::kUuidFieldName]; - _uuid = uuid.getUuid(); - _documentId = resumeTokenDoc[ResumeTokenInternal::kDocumentKeyFieldName]; +std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) { + return out << "{clusterTime: " << tokenData.clusterTime.toString() + << " documentKey: " << tokenData.documentKey << " uuid: " << tokenData.uuid << "}"; } -bool ResumeToken::operator==(const ResumeToken& other) { - return _timestamp == other._timestamp && _uuid == other._uuid && - ValueComparator::kInstance.evaluate(_documentId == other._documentId); +ResumeToken::ResumeToken(const Document& resumeDoc) { + _keyStringData = resumeDoc[kDataFieldName]; + _typeBits = resumeDoc[kTypeBitsFieldName]; + uassert(40647, + str::stream() << "Bad resume token: _data of missing or of wrong type" + << resumeDoc.toString(), + _keyStringData.getType() == BinData && + _keyStringData.getBinData().type == BinDataGeneral); + uassert(40648, + str::stream() << "Bad resume token: _typeBits of wrong type" << resumeDoc.toString(), + _typeBits.missing() || + (_typeBits.getType() == BinData && _typeBits.getBinData().type == BinDataGeneral)); } -Document ResumeToken::toDocument() const { - ResumeTokenClusterTime clusterTime; - clusterTime.setTimestamp(_timestamp); - return Document({{ResumeTokenInternal::kClusterTimeFieldName, clusterTime.toBSON()}, - {{ResumeTokenInternal::kUuidFieldName}, _uuid}, - {{ResumeTokenInternal::kDocumentKeyFieldName}, _documentId}}); +// We encode the resume token as a KeyString with the sequence: clusterTime, documentKey, uuid. +// Only the clusterTime is required. +ResumeToken::ResumeToken(const ResumeTokenData& data) { + BSONObjBuilder builder; + builder.append("", data.clusterTime); + data.documentKey.addToBsonObj(&builder, ""); + if (data.uuid) { + if (data.documentKey.missing()) { + // Never allow a missing document key with a UUID present, as that will mess up + // the field order. + builder.appendNull(""); + } + data.uuid->appendToBuilder(&builder, ""); + } + auto keyObj = builder.obj(); + KeyString encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj())); + _keyStringData = Value( + BSONBinData(encodedToken.getBuffer(), encodedToken.getSize(), BinDataType::BinDataGeneral)); + const auto& typeBits = encodedToken.getTypeBits(); + if (!typeBits.isAllZeros()) + _typeBits = Value( + BSONBinData(typeBits.getBuffer(), typeBits.getSize(), BinDataType::BinDataGeneral)); +} + +ResumeTokenData ResumeToken::getData() const { + KeyString::TypeBits typeBits(KeyString::Version::V1); + if (!_typeBits.missing()) { + BSONBinData typeBitsBinData = _typeBits.getBinData(); + BufReader typeBitsReader(typeBitsBinData.data, typeBitsBinData.length); + typeBits.resetFromBuffer(&typeBitsReader); + } + BSONBinData keyStringBinData = _keyStringData.getBinData(); + auto internalBson = KeyString::toBson(static_cast<const char*>(keyStringBinData.data), + keyStringBinData.length, + Ordering::make(BSONObj()), + typeBits); + + BSONObjIterator i(internalBson); + ResumeTokenData result; + uassert(40649, "invalid empty resume token", i.more()); + result.clusterTime = i.next().timestamp(); + if (i.more()) + result.documentKey = Value(i.next()); + if (i.more()) + result.uuid = uassertStatusOK(UUID::parse(i.next())); + uassert(40646, "invalid oversized resume token", !i.more()); + return result; +} + +int ResumeToken::compare(const ResumeToken& other) const { + BSONBinData thisData = _keyStringData.getBinData(); + BSONBinData otherData = other._keyStringData.getBinData(); + return StringData(static_cast<const char*>(thisData.data), thisData.length) + .compare(StringData(static_cast<const char*>(otherData.data), otherData.length)); +} + +bool ResumeToken::operator==(const ResumeToken& other) const { + return compare(other) == 0; +} + +bool ResumeToken::operator!=(const ResumeToken& other) const { + return compare(other) != 0; +} + +bool ResumeToken::operator<(const ResumeToken& other) const { + return compare(other) < 0; +} + +bool ResumeToken::operator<=(const ResumeToken& other) const { + return compare(other) <= 0; } -BSONObj ResumeToken::toBSON() const { - return toDocument().toBson(); +bool ResumeToken::operator>(const ResumeToken& other) const { + return compare(other) > 0; +} + +bool ResumeToken::operator>=(const ResumeToken& other) const { + return compare(other) >= 0; +} + +Document ResumeToken::toDocument() const { + return Document{{kDataFieldName, _keyStringData}, {kTypeBitsFieldName, _typeBits}}; } -ResumeToken ResumeToken::parse(const BSONObj& resumeBson) { - return ResumeToken(resumeBson); +ResumeToken ResumeToken::parse(const Document& resumeDoc) { + return ResumeToken(resumeDoc); } } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 6837443d844..3e75bc0942d 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -28,17 +28,48 @@ #pragma once +#include <boost/optional.hpp> + #include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" +#include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/value.h" #include "mongo/util/uuid.h" namespace mongo { + +struct ResumeTokenData { + ResumeTokenData(){}; + ResumeTokenData(Timestamp clusterTimeIn, + Value documentKeyIn, + const boost::optional<UUID>& uuidIn) + : clusterTime(clusterTimeIn), documentKey(std::move(documentKeyIn)), uuid(uuidIn){}; + + bool operator==(const ResumeTokenData& other) const; + bool operator!=(const ResumeTokenData& other) const { + return !(*this == other); + }; + + Timestamp clusterTime; + Value documentKey; + boost::optional<UUID> uuid; +}; + +std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData); + /** * A token passed in by the user to indicate where in the oplog we should start for - * $changeStream. + * $changeStream. This token has the following format: + * { + * _data: <binary data>, + * _typeBits: <binary data> + * } + * The _data field data is encoded such that byte by byte comparisons provide the correct + * ordering of tokens. The _typeBits field may be missing and should not affect token + * comparison. */ + class ResumeToken { public: /** @@ -46,35 +77,49 @@ public: * fields. */ ResumeToken() = default; - explicit ResumeToken(const Value& resumeValue); - bool operator==(const ResumeToken&); - Timestamp getTimestamp() const { - return _timestamp; - } + explicit ResumeToken(const ResumeTokenData& resumeValue); - UUID getUuid() const { - return _uuid; - } + bool operator==(const ResumeToken&) const; + bool operator!=(const ResumeToken&) const; + bool operator<(const ResumeToken&) const; + bool operator<=(const ResumeToken&) const; + bool operator>(const ResumeToken&) const; + bool operator>=(const ResumeToken&) const; + + /** Three way comparison, returns 0 if *this is equal to other, < 0 if *this is less than + * other, and > 0 if *this is greater than other. + */ + int compare(const ResumeToken& other) const; Document toDocument() const; - BSONObj toBSON() const; + BSONObj toBSON() const { + return toDocument().toBson(); + } + + ResumeTokenData getData() const; /** * Parse a resume token from a BSON object; used as an interface to the IDL parser. */ - static ResumeToken parse(const BSONObj& obj); + static ResumeToken parse(const BSONObj& resumeBson) { + return ResumeToken::parse(Document(resumeBson)); + } + + static ResumeToken parse(const Document& document); + + friend std::ostream& operator<<(std::ostream& out, const ResumeToken& token) { + return out << token.getData(); + } + + constexpr static StringData kDataFieldName = "_data"_sd; + constexpr static StringData kTypeBitsFieldName = "_typeBits"_sd; private: - /** - * Construct from a BSON object. - * External callers should use the static ResumeToken::parse(const BSONObj&) method instead. - */ - explicit ResumeToken(const BSONObj& resumeBson); + explicit ResumeToken(const Document& resumeData); - Timestamp _timestamp; - UUID _uuid; - Value _documentId; + Value _keyStringData; + Value _typeBits; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp new file mode 100644 index 00000000000..c8196183aa6 --- /dev/null +++ b/src/mongo/db/pipeline/resume_token_test.cpp @@ -0,0 +1,322 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/resume_token.h" + +#include <boost/optional/optional_io.hpp> + +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +namespace { + +TEST(ResumeToken, EncodesFullTokenFromData) { + Timestamp ts(1000, 2); + UUID testUuid = UUID::gen(); + Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; + + ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid); + ResumeToken token(resumeTokenDataIn); + ResumeTokenData tokenData = token.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); +} + +TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) { + Timestamp ts(1001, 3); + + ResumeTokenData resumeTokenDataIn; + resumeTokenDataIn.clusterTime = ts; + ResumeToken token(resumeTokenDataIn); + ResumeTokenData tokenData = token.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); +} + +TEST(ResumeToken, RoundTripThroughBsonFullToken) { + Timestamp ts(1000, 2); + UUID testUuid = UUID::gen(); + Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; + + ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid); + auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toBSON()); + ResumeTokenData tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); +} + +TEST(ResumeToken, RoundTripThroughBsonTimestampOnlyToken) { + Timestamp ts(1001, 3); + + ResumeTokenData resumeTokenDataIn; + resumeTokenDataIn.clusterTime = ts; + auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toBSON()); + ResumeTokenData tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); +} + +TEST(ResumeToken, RoundTripThroughDocumentFullToken) { + Timestamp ts(1000, 2); + UUID testUuid = UUID::gen(); + Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; + + ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid); + auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument()); + ResumeTokenData tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); +} + +TEST(ResumeToken, RoundTripThroughDocumentTimestampOnlyToken) { + Timestamp ts(1001, 3); + + ResumeTokenData resumeTokenDataIn; + resumeTokenDataIn.clusterTime = ts; + auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument()); + ResumeTokenData tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); +} + +TEST(ResumeToken, TestMissingTypebitsOptimization) { + Timestamp ts(1000, 1); + UUID testUuid = UUID::gen(); + + ResumeTokenData hasTypeBitsData(ts, Value(Document{{"_id", 1.0}}), testUuid); + ResumeTokenData noTypeBitsData(ResumeTokenData(ts, Value(Document{{"_id", 1}}), testUuid)); + ResumeToken hasTypeBitsToken(hasTypeBitsData); + ResumeToken noTypeBitsToken(noTypeBitsData); + ASSERT_EQ(noTypeBitsToken, hasTypeBitsToken); + auto hasTypeBitsDoc = hasTypeBitsToken.toDocument(); + auto noTypeBitsDoc = noTypeBitsToken.toDocument(); + ASSERT_FALSE(hasTypeBitsDoc["_typeBits"].missing()); + ASSERT_TRUE(noTypeBitsDoc["_typeBits"].missing()) << noTypeBitsDoc["_typeBits"]; + auto rtHasTypeBitsData = ResumeToken::parse(hasTypeBitsDoc).getData(); + auto rtNoTypeBitsData = ResumeToken::parse(noTypeBitsDoc).getData(); + ASSERT_EQ(hasTypeBitsData, rtHasTypeBitsData); + ASSERT_EQ(noTypeBitsData, rtNoTypeBitsData); + ASSERT_EQ(BSONType::NumberDouble, rtHasTypeBitsData.documentKey["_id"].getType()); + ASSERT_EQ(BSONType::NumberInt, rtNoTypeBitsData.documentKey["_id"].getType()); +} + +// Tests comparison functions for tokens constructed from oplog data. +TEST(ResumeToken, CompareFromData) { + Timestamp ts1(1000, 1); + Timestamp ts2(1000, 2); + Timestamp ts3(1001, 1); + UUID testUuid = UUID::gen(); + UUID testUuid2 = UUID::gen(); + Document documentKey1a{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; + Document documentKey1b{{"_id"_sd, "stuff"_sd}, + {"otherkey"_sd, Document{{"otherstuff"_sd, 2.0}}}}; + Document documentKey2{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 3}}}}; + Document documentKey3{{"_id"_sd, "ztuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 0}}}}; + + ResumeToken token1a(ResumeTokenData(ts1, Value(documentKey1a), testUuid)); + ResumeToken token1b(ResumeTokenData(ts1, Value(documentKey1b), testUuid)); + + // Equivalent types don't matter. + ASSERT_EQ(token1a, token1b); + ASSERT_LTE(token1a, token1b); + ASSERT_GTE(token1a, token1b); + + // UUIDs matter, but all that really matters is they compare unequal. + ResumeToken tokenOtherCollection(ResumeTokenData(ts1, Value(documentKey1a), testUuid2)); + ASSERT_NE(token1a, tokenOtherCollection); + + ResumeToken token2(ResumeTokenData(ts1, Value(documentKey2), testUuid)); + + // Document keys matter. + ASSERT_LT(token1a, token2); + ASSERT_LTE(token1a, token2); + ASSERT_GT(token2, token1a); + ASSERT_GTE(token2, token1a); + + ResumeToken token3(ResumeTokenData(ts1, Value(documentKey3), testUuid)); + + // Order within document keys matters. + ASSERT_LT(token1a, token3); + ASSERT_LTE(token1a, token3); + ASSERT_GT(token3, token1a); + ASSERT_GTE(token3, token1a); + + ASSERT_LT(token2, token3); + ASSERT_LTE(token2, token3); + ASSERT_GT(token3, token2); + ASSERT_GTE(token3, token2); + + ResumeToken token4(ResumeTokenData(ts2, Value(documentKey1a), testUuid)); + + // Timestamps matter. + ASSERT_LT(token1a, token4); + ASSERT_LTE(token1a, token4); + ASSERT_GT(token4, token1a); + ASSERT_GTE(token4, token1a); + + // Timestamps matter more than document key. + ASSERT_LT(token3, token4); + ASSERT_LTE(token3, token4); + ASSERT_GT(token4, token3); + ASSERT_GTE(token4, token3); + + ResumeToken token5(ResumeTokenData(ts3, Value(documentKey1a), testUuid)); + + // Time matters more than increment in timestamp + ASSERT_LT(token4, token5); + ASSERT_LTE(token4, token5); + ASSERT_GT(token5, token4); + ASSERT_GTE(token5, token4); +} + +// Tests comparison functions for tokens constructed from the keystring-encoded form. +TEST(ResumeToken, CompareFromEncodedData) { + Timestamp ts1(1000, 1); + Timestamp ts2(1000, 2); + Timestamp ts3(1001, 1); + UUID testUuid = UUID::gen(); + UUID testUuid2 = UUID::gen(); + Document documentKey1a{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; + Document documentKey1b{{"_id"_sd, "stuff"_sd}, + {"otherkey"_sd, Document{{"otherstuff"_sd, 2.0}}}}; + Document documentKey2{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 3}}}}; + Document documentKey3{{"_id"_sd, "ztuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 0}}}}; + + auto token1a = ResumeToken::parse( + ResumeToken(ResumeTokenData(ts1, Value(documentKey1a), testUuid)).toDocument()); + auto token1b = ResumeToken::parse( + ResumeToken(ResumeTokenData(ts1, Value(documentKey1b), testUuid)).toDocument()); + + // Equivalent types don't matter. + ASSERT_EQ(token1a, token1b); + ASSERT_LTE(token1a, token1b); + ASSERT_GTE(token1a, token1b); + + // UUIDs matter, but all that really matters is they compare unequal. + auto tokenOtherCollection = ResumeToken::parse( + ResumeToken(ResumeTokenData(ts1, Value(documentKey1a), testUuid2)).toDocument()); + ASSERT_NE(token1a, tokenOtherCollection); + + auto token2 = ResumeToken::parse( + ResumeToken(ResumeTokenData(ts1, Value(documentKey2), testUuid)).toDocument()); + + // Document keys matter. + ASSERT_LT(token1a, token2); + ASSERT_LTE(token1a, token2); + ASSERT_GT(token2, token1a); + ASSERT_GTE(token2, token1a); + + auto token3 = ResumeToken::parse( + ResumeToken(ResumeTokenData(ts1, Value(documentKey3), testUuid)).toDocument()); + + // Order within document keys matters. + ASSERT_LT(token1a, token3); + ASSERT_LTE(token1a, token3); + ASSERT_GT(token3, token1a); + ASSERT_GTE(token3, token1a); + + ASSERT_LT(token2, token3); + ASSERT_LTE(token2, token3); + ASSERT_GT(token3, token2); + ASSERT_GTE(token3, token2); + + auto token4 = ResumeToken::parse( + ResumeToken(ResumeTokenData(ts2, Value(documentKey1a), testUuid)).toDocument()); + + // Timestamps matter. + ASSERT_LT(token1a, token4); + ASSERT_LTE(token1a, token4); + ASSERT_GT(token4, token1a); + ASSERT_GTE(token4, token1a); + + // Timestamps matter more than document key. + ASSERT_LT(token3, token4); + ASSERT_LTE(token3, token4); + ASSERT_GT(token4, token3); + ASSERT_GTE(token4, token3); + + auto token5 = ResumeToken::parse( + ResumeToken(ResumeTokenData(ts3, Value(documentKey1a), testUuid)).toDocument()); + + // Time matters more than increment in timestamp + ASSERT_LT(token4, token5); + ASSERT_LTE(token4, token5); + ASSERT_GT(token5, token4); + ASSERT_GTE(token5, token4); +} + +TEST(ResumeToken, CorruptTokens) { + // Missing document. + ASSERT_THROWS(ResumeToken::parse(Document()), AssertionException); + // Missing data field. + ASSERT_THROWS(ResumeToken::parse(Document{{"somefield"_sd, "stuff"_sd}}), AssertionException); + // Wrong type data field + ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, "string"_sd}}), AssertionException); + ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, 0}}), AssertionException); + + // Valid data field, but wrong type typeBits. + Timestamp ts(1010, 4); + ResumeTokenData tokenData; + tokenData.clusterTime = ts; + auto goodTokenDoc = ResumeToken(tokenData).toDocument(); + auto goodData = goodTokenDoc["_data"].getBinData(); + ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, goodData}, {"_typeBits", "string"_sd}}), + AssertionException); + + // Valid data except wrong bindata type. + ASSERT_THROWS(ResumeToken::parse( + Document{{"_data"_sd, BSONBinData(goodData.data, goodData.length, newUUID)}}), + AssertionException); + // Valid data, wrong typeBits bindata type. + ASSERT_THROWS( + ResumeToken::parse(Document{{"_data"_sd, goodData}, + {"_typeBits", BSONBinData(goodData.data, 0, newUUID)}}), + AssertionException); + + const char zeroes[] = {0, 0, 0, 0, 0}; + const char nonsense[] = {-91, 85, 77, 86, -1}; + // Data of correct type, but empty. This won't fail until we try to decode the data. + auto emptyToken = + ResumeToken::parse(Document{{"_data"_sd, BSONBinData(zeroes, 0, BinDataGeneral)}}); + ASSERT_THROWS(emptyToken.getData(), AssertionException); + + // Data of correct type with a bunch of zeros. + auto zeroesToken = + ResumeToken::parse(Document{{"_data"_sd, BSONBinData(zeroes, 5, BinDataGeneral)}}); + ASSERT_THROWS(emptyToken.getData(), AssertionException); + + // Data of correct type with a bunch of nonsense + auto nonsenseToken = + ResumeToken::parse(Document{{"_data"_sd, BSONBinData(nonsense, 5, BinDataGeneral)}}); + ASSERT_THROWS(emptyToken.getData(), AssertionException); + + // Valid data, bad typeBits; note that an all-zeros typebits is valid so it is not tested here. + auto badTypeBitsToken = ResumeToken::parse( + Document{{"_data"_sd, goodData}, {"_typeBits", BSONBinData(nonsense, 5, BinDataGeneral)}}); + ASSERT_THROWS(badTypeBitsToken.getData(), AssertionException); +} + +} // namspace +} // namspace mongo diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h index c77ccc2239a..9a36661e7b5 100644 --- a/src/mongo/db/pipeline/value.h +++ b/src/mongo/db/pipeline/value.h @@ -192,6 +192,8 @@ public: int getInt() const; long long getLong() const; UUID getUuid() const; + // The returned BSONBinData remains owned by this Value. + BSONBinData getBinData() const; const std::vector<Value>& getArray() const { return _storage.getArray(); } @@ -460,4 +462,10 @@ inline UUID Value::getUuid() const { auto stringData = _storage.getString(); return UUID::fromCDR({stringData.rawData(), stringData.size()}); } + +inline BSONBinData Value::getBinData() const { + verify(getType() == BinData); + auto stringData = _storage.getString(); + return BSONBinData(stringData.rawData(), stringData.size(), _storage.binDataType()); +} }; |