summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-10-03 16:22:37 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-10-05 10:20:35 -0400
commit316a341735b2ffd12ee203581ac0f736a6aaef88 (patch)
tree956dc9e0c0e6b2ee530182c18544af20f3cb887f
parent47b62c3fdd712209dbe48fdf3928901304624320 (diff)
downloadmongo-316a341735b2ffd12ee203581ac0f736a6aaef88.tar.gz
SERVER-29716 Keystring-encode ResumeTokens to allow bytewise comparisons
-rw-r--r--jstests/change_streams/change_stream.js30
-rw-r--r--jstests/change_streams/change_stream_invalidation.js5
-rw-r--r--jstests/change_streams/lookup_post_image.js8
-rw-r--r--jstests/change_streams/only_wake_getmore_for_relevant_changes.js5
-rw-r--r--jstests/libs/change_stream_util.js8
-rw-r--r--src/mongo/db/pipeline/SConscript12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp27
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp22
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h17
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp44
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp9
-rw-r--r--src/mongo/db/pipeline/document_sources.idl35
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp133
-rw-r--r--src/mongo/db/pipeline/resume_token.h83
-rw-r--r--src/mongo/db/pipeline/resume_token_test.cpp322
-rw-r--r--src/mongo/db/pipeline/value.h8
18 files changed, 593 insertions, 190 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 9ac347f406b..c1415a2a1f5 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -16,10 +16,6 @@
assert.writeOK(db.t1.insert({_id: 0, a: 1}));
const t1Uuid = getUUIDFromListCollections(db, db.t1.getName());
let expected = {
- _id: {
- documentKey: {_id: 0},
- uuid: t1Uuid,
- },
documentKey: {_id: 0},
fullDocument: {_id: 0, a: 1},
ns: {db: "test", coll: "t1"},
@@ -35,10 +31,6 @@
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.insert({_id: 1, a: 2}));
expected = {
- _id: {
- documentKey: {_id: 1},
- uuid: t1Uuid,
- },
documentKey: {_id: 1},
fullDocument: {_id: 1, a: 2},
ns: {db: "test", coll: "t1"},
@@ -50,7 +42,6 @@
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 0}, {a: 3}));
expected = {
- _id: {documentKey: {_id: 0}, uuid: t1Uuid},
documentKey: {_id: 0},
fullDocument: {_id: 0, a: 3},
ns: {db: "test", coll: "t1"},
@@ -62,7 +53,6 @@
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 0}, {b: 3}));
expected = {
- _id: {documentKey: {_id: 0}, uuid: t1Uuid},
documentKey: {_id: 0},
fullDocument: {_id: 0, b: 3},
ns: {db: "test", coll: "t1"},
@@ -74,10 +64,6 @@
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true}));
expected = {
- _id: {
- documentKey: {_id: 2},
- uuid: t1Uuid,
- },
documentKey: {_id: 2},
fullDocument: {_id: 2, a: 4},
ns: {db: "test", coll: "t1"},
@@ -90,7 +76,6 @@
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 3}, {$inc: {b: 2}}));
expected = {
- _id: {documentKey: {_id: 3}, uuid: t1Uuid},
documentKey: {_id: 3},
ns: {db: "test", coll: "t1"},
operationType: "update",
@@ -102,7 +87,6 @@
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.remove({_id: 1}));
expected = {
- _id: {documentKey: {_id: 1}, uuid: t1Uuid},
documentKey: {_id: 1},
ns: {db: "test", coll: "t1"},
operationType: "delete",
@@ -116,10 +100,6 @@
const t2Uuid = getUUIDFromListCollections(db, db.t2.getName());
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: []});
expected = {
- _id: {
- documentKey: {_id: 100},
- uuid: t2Uuid,
- },
documentKey: {_id: 100},
fullDocument: {_id: 100, c: 1},
ns: {db: "test", coll: "t2"},
@@ -136,7 +116,7 @@
db.t3.drop();
t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
assert.writeOK(db.t2.renameCollection("t3"));
- expected = {_id: {uuid: t2Uuid}, operationType: "invalidate"};
+ expected = {operationType: "invalidate"};
cst.assertNextChangesEqual(
{cursor: t2cursor, expectedChanges: [expected], expectInvalidate: true});
@@ -169,7 +149,7 @@
// Note we do not project away 'id.ts' as it is part of the resume token.
let resumeCursor = cst.startWatchingChanges(
- {pipeline: [{$changeStream: {}}], collection: db.resume1, includeTs: true});
+ {pipeline: [{$changeStream: {}}], collection: db.resume1, includeToken: true});
// Insert a document and save the resulting change stream.
assert.writeOK(db.resume1.insert({_id: 1}));
@@ -180,7 +160,7 @@
resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
collection: db.resume1,
- includeTs: true,
+ includeToken: true,
aggregateOptions: {cursor: {batchSize: 0}},
});
@@ -196,7 +176,7 @@
resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
collection: db.resume1,
- includeTs: true,
+ includeToken: true,
aggregateOptions: {cursor: {batchSize: 0}},
});
assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
@@ -206,7 +186,7 @@
resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}],
collection: db.resume1,
- includeTs: true,
+ includeToken: true,
aggregateOptions: {cursor: {batchSize: 0}},
});
assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js
index d6aa05c57cb..8c4c1af2437 100644
--- a/jstests/change_streams/change_stream_invalidation.js
+++ b/jstests/change_streams/change_stream_invalidation.js
@@ -42,7 +42,7 @@
assert.eq(change.operationType, "delete", tojson(change));
cst.assertNextChangesEqual({
cursor: aggcursor,
- expectedChanges: [{_id: {uuid: collGetMoreUuid}, operationType: "invalidate"}],
+ expectedChanges: [{operationType: "invalidate"}],
expectInvalidate: true
});
@@ -51,7 +51,8 @@
db.createCollection(collAgg.getName());
const collAggUuid = getUUIDFromListCollections(db, collAgg.getName());
// Get a valid resume token that the next aggregate command can use.
- aggcursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collAgg});
+ aggcursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: collAgg, includeToken: true});
assert.writeOK(collAgg.insert({_id: 1}, {writeConcern: {w: "majority"}}));
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index ee5d90bf158..0f75ad073e9 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -17,7 +17,7 @@
// Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for an
// insert.
let cursor = cst.startWatchingChanges(
- {pipeline: [{$changeStream: {}}], collection: coll, includeTs: true});
+ {pipeline: [{$changeStream: {}}], collection: coll, includeToken: true});
assert.writeOK(coll.insert({_id: "fullDocument not specified"}));
let latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
@@ -45,7 +45,7 @@
cursor = cst.startWatchingChanges({
collection: coll,
pipeline: [{$changeStream: {fullDocument: "default"}}],
- includeTs: true
+ includeToken: true
});
assert.writeOK(coll.insert({_id: "fullDocument is default"}));
latestChange = cst.getOneChange(cursor);
@@ -74,7 +74,7 @@
cursor = cst.startWatchingChanges({
collection: coll,
pipeline: [{$changeStream: {fullDocument: "updateLookup"}}],
- includeTs: true
+ includeToken: true
});
assert.writeOK(coll.insert({_id: "fullDocument is lookup"}));
latestChange = cst.getOneChange(cursor);
@@ -103,7 +103,7 @@
collection: coll,
pipeline:
[{$changeStream: {fullDocument: "updateLookup"}}, {$match: {operationType: "update"}}],
- includeTs: true
+ includeToken: true
});
assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}}));
assert.writeOK(coll.remove({_id: "fullDocument is lookup"}));
diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
index 87c082199b7..f7d909f039f 100644
--- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
@@ -112,8 +112,8 @@ eventFn();`,
const wholeCollectionStreamComment = "change stream on entire collection";
let res = assert.commandWorked(db.runCommand({
aggregate: changesCollection.getName(),
- // Project out the timestamp, since that's subject to change unpredictably.
- pipeline: [{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}],
+ // Project out the resume token, since that's subject to change unpredictably.
+ pipeline: [{$changeStream: {}}, {$project: {"_id": 0}}],
cursor: {},
comment: wholeCollectionStreamComment
}));
@@ -132,7 +132,6 @@ eventFn();`,
assert.eq(getMoreResponse.cursor.nextBatch.length, 1);
const changesCollectionUuid = getUUIDFromListCollections(db, changesCollection.getName());
assert.docEq(getMoreResponse.cursor.nextBatch[0], {
- _id: {documentKey: {_id: "wake up"}, uuid: changesCollectionUuid},
documentKey: {_id: "wake up"},
fullDocument: {_id: "wake up"},
ns: {db: db.getName(), coll: changesCollection.getName()},
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 61b7a3cc13e..3e806579ec6 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -12,19 +12,19 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
// Prevent accidental usages of the default db.
const db = null;
- self.oplogProjection = {$project: {"_id.clusterTime": 0}};
+ self.oplogProjection = {$project: {"_id": 0}};
/**
* Starts a change stream cursor with the given pipeline on the given collection. It uses
- * the 'aggregateOptions' if provided and elides the clusterTime if 'includeTs' is not set.
+ * the 'aggregateOptions' if provided and elides the resume token if 'includeToken' is not set.
* This saves the cursor so that it can be cleaned up later.
*
* Returns the cursor returned by the 'aggregate' command.
*/
- self.startWatchingChanges = function({pipeline, collection, includeTs, aggregateOptions}) {
+ self.startWatchingChanges = function({pipeline, collection, includeToken, aggregateOptions}) {
aggregateOptions = aggregateOptions || {cursor: {batchSize: 1}};
- if (!includeTs) {
+ if (!includeToken) {
// Strip the oplog fields we aren't testing.
pipeline.push(self.oplogProjection);
}
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index ecf9c52a5f2..af543bfc8b3 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -533,7 +533,19 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/storage/key_string',
'$BUILD_DIR/mongo/idl/idl_parser',
'document_value',
],
)
+
+env.CppUnitTest(
+ target='resume_token_test',
+ source='resume_token_test.cpp',
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ 'document_sources_idl',
+ 'document_source_lookup',
+ ],
+)
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index fced8514c8c..e9b49de2c47 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -272,19 +272,20 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
elem.embeddedObject());
if (auto resumeAfter = spec.getResumeAfter()) {
ResumeToken token = resumeAfter.get();
- auto resumeNamespace = UUIDCatalog::get(expCtx->opCtx).lookupNSSByUUID(token.getUuid());
+ ResumeTokenData tokenData = token.getData();
+ uassert(40645,
+ "The resume token is invalid (no UUID), possibly from an invalidate.",
+ tokenData.uuid);
+ auto resumeNamespace =
+ UUIDCatalog::get(expCtx->opCtx).lookupNSSByUUID(tokenData.uuid.get());
uassert(40615,
"The resume token UUID does not exist. Has the collection been dropped?",
!resumeNamespace.isEmpty());
- startFrom = token.getTimestamp();
+ startFrom = tokenData.clusterTime;
if (expCtx->needsMerge) {
- DocumentSourceShardCheckResumabilitySpec spec;
- spec.setResumeToken(std::move(token));
- resumeStage = DocumentSourceShardCheckResumability::create(expCtx, std::move(spec));
+ resumeStage = DocumentSourceShardCheckResumability::create(expCtx, std::move(token));
} else {
- DocumentSourceEnsureResumeTokenPresentSpec spec;
- spec.setResumeToken(std::move(token));
- resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(spec));
+ resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token));
}
}
const bool changeStreamIsResuming = resumeStage != nullptr;
@@ -414,10 +415,12 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
// Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will
// not appear in the output.
- Document resumeToken{{kClusterTimeField, Document{{kTimestampField, ts}}},
- {kUuidField, uuid},
- {kDocumentKeyField, documentKey}};
- doc.addField(kIdField, Value(resumeToken));
+ ResumeTokenData resumeTokenData;
+ resumeTokenData.clusterTime = ts.getTimestamp();
+ resumeTokenData.documentKey = documentKey;
+ if (!uuid.missing())
+ resumeTokenData.uuid = uuid.getUuid();
+ doc.addField(kIdField, Value(ResumeToken(resumeTokenData).toDocument()));
doc.addField(kOperationTypeField, Value(operationType));
// "invalidate" and "retryNeeded" entries have fewer fields.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index ab5d9a11e86..989a331a1a4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -155,10 +155,12 @@ public:
Document makeResumeToken(Timestamp ts,
ImplicitValue uuid = Value(),
ImplicitValue docKey = Value()) {
- if (docKey.missing()) {
- return {{"clusterTime", D{{"ts", ts}}}, {"uuid", uuid}};
- }
- return {{"clusterTime", D{{"ts", ts}}}, {"uuid", uuid}, {"documentKey", docKey}};
+ ResumeTokenData tokenData;
+ tokenData.clusterTime = ts;
+ tokenData.documentKey = docKey;
+ if (!uuid.missing())
+ tokenData.uuid = uuid.getUuid();
+ return ResumeToken(tokenData).toDocument();
}
/**
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 435f464af8f..94d42e2920d 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -45,13 +45,13 @@ Value DocumentSourceEnsureResumeTokenPresent::serialize(
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent>
DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx,
- DocumentSourceEnsureResumeTokenPresentSpec spec) {
- return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(spec));
+ ResumeToken token) {
+ return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token));
}
DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
- const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceEnsureResumeTokenPresentSpec spec)
- : DocumentSource(expCtx), _token(spec.getResumeToken()), _seenDoc(false) {}
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token)
+ : DocumentSource(expCtx), _token(std::move(token)), _seenDoc(false) {}
DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
pExpCtx->checkForInterrupt();
@@ -67,7 +67,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext()
_seenDoc = true;
auto doc = nextInput.getDocument();
- ResumeToken receivedToken(doc["_id"]);
+ auto receivedToken = ResumeToken::parse(doc["_id"].getDocument());
uassert(40585,
str::stream()
<< "resume of change stream was not possible, as the resume token was not found. "
@@ -89,14 +89,14 @@ Value DocumentSourceShardCheckResumability::serialize(
}
intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
- const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceShardCheckResumabilitySpec spec) {
- return new DocumentSourceShardCheckResumability(expCtx, std::move(spec));
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token) {
+ return new DocumentSourceShardCheckResumability(expCtx, std::move(token));
}
DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
- const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceShardCheckResumabilitySpec spec)
+ const intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token)
: DocumentSourceNeedsMongoProcessInterface(expCtx),
- _token(spec.getResumeToken()),
+ _token(std::move(token)),
_verifiedResumability(false) {}
DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
@@ -110,7 +110,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
if (nextInput.isAdvanced()) {
auto doc = nextInput.getDocument();
- ResumeToken receivedToken(doc["_id"]);
+ auto receivedToken = ResumeToken::parse(doc["_id"].getDocument());
if (receivedToken == _token) {
// Pass along the document, as the DocumentSourceEnsureResumeTokenPresent stage on the
// merger will
@@ -129,7 +129,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
uassert(40576,
"resume of change notification was not possible, as the resume point may no longer "
"be in the oplog. ",
- firstOplogEntry["ts"].getTimestamp() < _token.getTimestamp());
+ firstOplogEntry["ts"].getTimestamp() < _token.getData().clusterTime);
return nextInput;
}
// Very unusual case: the oplog is empty. We can always resume. It should never be possible
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 2ff83bb2b35..5fedb8bb444 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -33,9 +33,6 @@
#include "mongo/db/pipeline/resume_token.h"
namespace mongo {
-// Currently the two resume sources take the same specification.
-typedef DocumentSourceEnsureResumeTokenPresentSpec DocumentSourceShardCheckResumabilitySpec;
-
/**
* This checks for resumability on a single shard in the sharded case. The rules are
*
@@ -71,15 +68,14 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- DocumentSourceShardCheckResumabilitySpec spec);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token);
private:
/**
* Use the create static method to create a DocumentSourceShardCheckResumability.
*/
DocumentSourceShardCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- DocumentSourceShardCheckResumabilitySpec spec);
+ ResumeToken token);
ResumeToken _token;
bool _verifiedResumability;
@@ -111,9 +107,7 @@ public:
* be at any shard.
*/
boost::intrusive_ptr<DocumentSource> getShardSource() final {
- DocumentSourceShardCheckResumabilitySpec shardSpec;
- shardSpec.setResumeToken(_token);
- return DocumentSourceShardCheckResumability::create(pExpCtx, shardSpec);
+ return DocumentSourceShardCheckResumability::create(pExpCtx, _token);
};
boost::intrusive_ptr<DocumentSource> getMergeSource() final {
@@ -123,8 +117,7 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- DocumentSourceEnsureResumeTokenPresentSpec spec);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeToken token);
const ResumeToken& getTokenForTest() {
return _token;
@@ -135,7 +128,7 @@ private:
* Use the create static method to create a DocumentSourceEnsureResumeTokenPresent.
*/
DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- DocumentSourceEnsureResumeTokenPresentSpec spec);
+ ResumeToken token);
ResumeToken _token;
bool _seenDoc;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 2ea54327516..8d3c922917a 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -64,10 +64,9 @@ protected:
* namespace in the mock queue.
*/
void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) {
- _mock->queue.push_back(Document{{"_id",
- Document{{"clusterTime", Document{{"ts", ts}}},
- {"uuid", uuid},
- {"documentKey", Document{{"_id", id}}}}}});
+ _mock->queue.push_back(Document{
+ {"_id",
+ ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), uuid)).toDocument()}});
}
void addPause() {
@@ -79,12 +78,8 @@ protected:
*/
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
Timestamp ts, StringData id, UUID uuid = testUuid()) {
- auto token = ResumeToken::parse(BSON("clusterTime" << BSON("ts" << ts) << "uuid" << uuid
- << "documentKey"
- << BSON("_id" << id)));
- DocumentSourceEnsureResumeTokenPresentSpec spec;
- spec.setResumeToken(token);
- auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), spec);
+ ResumeToken token(ResumeTokenData(ts, Value(Document{{"_id", id}}), uuid));
+ auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), token);
checkResumeToken->setSource(_mock.get());
return checkResumeToken;
}
@@ -105,13 +100,9 @@ class ShardCheckResumabilityTest : public CheckResumeTokenTest {
protected:
intrusive_ptr<DocumentSourceShardCheckResumability> createShardCheckResumability(
Timestamp ts, StringData id, UUID uuid = testUuid()) {
- auto token = ResumeToken::parse(BSON("clusterTime" << BSON("ts" << ts) << "uuid" << uuid
- << "documentKey"
- << BSON("_id" << id)));
- DocumentSourceShardCheckResumabilitySpec spec;
- spec.setResumeToken(token);
+ ResumeToken token(ResumeTokenData(ts, Value(Document{{"_id", id}}), uuid));
auto shardCheckResumability =
- DocumentSourceShardCheckResumability::create(getExpCtx(), spec);
+ DocumentSourceShardCheckResumability::create(getExpCtx(), token);
shardCheckResumability->setSource(_mock.get());
return shardCheckResumability;
}
@@ -153,8 +144,7 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
auto result1 = checkResumeToken->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
- ASSERT_VALUE_EQ(Value(Document{{"ts", doc1Timestamp}}),
- doc1["_id"].getDocument()["clusterTime"]);
+ ASSERT_EQ(doc1Timestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -172,13 +162,11 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken)
auto result1 = checkResumeToken->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
- ASSERT_VALUE_EQ(Value(Document{{"ts", doc1Timestamp}}),
- doc1["_id"].getDocument()["clusterTime"]);
+ ASSERT_EQ(doc1Timestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
auto result2 = checkResumeToken->getNext();
ASSERT_TRUE(result2.isAdvanced());
auto& doc2 = result2.getDocument();
- ASSERT_VALUE_EQ(Value(Document{{"ts", doc2Timestamp}}),
- doc2["_id"].getDocument()["clusterTime"]);
+ ASSERT_EQ(doc2Timestamp, ResumeToken::parse(doc2["_id"].getDocument()).getData().clusterTime);
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
@@ -278,7 +266,7 @@ TEST_F(ShardCheckResumabilityTest,
auto result = shardCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
- ASSERT_VALUE_EQ(Value(resumeTimestamp), doc["_id"]["clusterTime"]["ts"]);
+ ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
TEST_F(ShardCheckResumabilityTest,
@@ -295,7 +283,7 @@ TEST_F(ShardCheckResumabilityTest,
auto result = shardCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
- ASSERT_VALUE_EQ(Value(resumeTimestamp), doc["_id"]["clusterTime"]["ts"]);
+ ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIsEmpty) {
@@ -310,7 +298,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedIfResumeTokenIsPresentAndOplogIs
auto result = shardCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
- ASSERT_VALUE_EQ(Value(resumeTimestamp), doc["_id"]["clusterTime"]["ts"]);
+ ASSERT_EQ(resumeTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
TEST_F(ShardCheckResumabilityTest,
@@ -363,7 +351,7 @@ TEST_F(ShardCheckResumabilityTest,
auto result = shardCheckResumability->getNext();
ASSERT_TRUE(result.isAdvanced());
auto& doc = result.getDocument();
- ASSERT_VALUE_EQ(Value(docTimestamp), doc["_id"]["clusterTime"]["ts"]);
+ ASSERT_EQ(docTimestamp, ResumeToken::parse(doc["_id"].getDocument()).getData().clusterTime);
}
TEST_F(ShardCheckResumabilityTest,
@@ -394,7 +382,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldIgnoreOplogAfterFirstDoc) {
auto result1 = shardCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
- ASSERT_VALUE_EQ(Value(docTimestamp), doc1["_id"]["clusterTime"]["ts"]);
+ ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
mockOplog = {Document{{"ts", oplogFutureTimestamp}}};
shardCheckResumability->injectMongoProcessInterface(
@@ -418,7 +406,7 @@ TEST_F(ShardCheckResumabilityTest, ShouldSucceedWhenOplogEntriesExistBeforeAndAf
auto result1 = shardCheckResumability->getNext();
ASSERT_TRUE(result1.isAdvanced());
auto& doc1 = result1.getDocument();
- ASSERT_VALUE_EQ(Value(docTimestamp), doc1["_id"]["clusterTime"]["ts"]);
+ ASSERT_EQ(docTimestamp, ResumeToken::parse(doc1["_id"].getDocument()).getData().clusterTime);
auto result2 = shardCheckResumability->getNext();
ASSERT_TRUE(result2.isEOF());
}
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
index c14d4fcbc4b..9535ad95bb2 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -98,11 +98,12 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat
auto matchSpec = BSON("$match" << documentKey);
// Extract the UUID from resume token and do change stream lookups by UUID.
- ResumeToken resumeToken(updateOp[DocumentSourceChangeStream::kIdField]);
+ auto resumeToken =
+ ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument());
// TODO SERVER-29134 we need to extract the namespace from the document and set them on the new
// ExpressionContext if we're getting notifications from an entire database.
- auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getUuid());
+ auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getData().uuid);
auto pipelineStatus = _mongoProcessInterface->makePipeline({matchSpec}, foreignExpCtx);
if (pipelineStatus.getStatus() == ErrorCodes::NamespaceNotFound) {
// We couldn't find the collection with UUID, it may have been dropped.
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 0c31852a993..d9db1db38ce 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -66,11 +66,12 @@ public:
Document makeResumeToken(ImplicitValue id = Value()) {
const Timestamp ts(100, 1);
if (id.missing()) {
- return {{"clusterTime", Document{{"ts", ts}}}, {"uuid", testUuid()}};
+ ResumeTokenData tokenData;
+ tokenData.clusterTime = ts;
+ return ResumeToken(tokenData).toDocument();
}
- return {{"clusterTime", Document{{"ts", ts}}},
- {"uuid", testUuid()},
- {"documentKey", Document{{"_id", id}}}};
+ return ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), testUuid()))
+ .toDocument();
}
};
diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl
index f7f19df40bb..ef427a711f4 100644
--- a/src/mongo/db/pipeline/document_sources.idl
+++ b/src/mongo/db/pipeline/document_sources.idl
@@ -35,8 +35,9 @@ imports:
- "mongo/idl/basic_types.idl"
types:
- # A resume token could be parsed as a struct, but since we may make it opaque in the future, we
- # parse it as a type with a custom class now.
+ # A resume token is an opaque document we return to the user that contains all the information
+ # needed to resume a stream where they left off. It also provides the ordering of streams
+ # from multiple shards.
resumeToken:
bson_serialization_type: object
description: An object representing a resume token for a change stream
@@ -72,16 +73,6 @@ structs:
description: A string '"updateLookup"' or '"default"', indicating whether or not we
should return a full document or just changes for an update.
-
- DocumentSourceEnsureResumeTokenPresentSpec:
- description: A document used to specify the internal stage which checks the presence of the
- resume token.
- fields:
- resumeToken:
- cpp_name: resumeToken
- type: resumeToken
- description: The resume token which is required to be present in the pipeline.
-
ResumeTokenClusterTime:
description: The IDL type of cluster time
fields:
@@ -107,23 +98,3 @@ structs:
users:
type: array<ListSessionsUser>
optional: true
-
- ResumeTokenInternal:
- description: The internal format of a resume token. For use by the ResumeToken class
- only.
- fields:
- clusterTime:
- cpp_name: clusterTime
- type: ResumeTokenClusterTime
- description: The timestamp of the oplog entry represented by this resume token.
-
- uuid:
- cpp_name: uuid
- type: uuid
- description: The UUID of the oplog entry represented by this resume token.
-
- documentKey:
- cpp_name: documentKey
- type: resumeTokenDocumentKey
- description: The document key of the document in the oplog entry represented by this
- resume token.
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index d9a1f3376b9..2618a70186d 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -28,51 +28,128 @@
#include "mongo/db/pipeline/resume_token.h"
+#include <boost/optional/optional_io.hpp>
+
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_sources_gen.h"
#include "mongo/db/pipeline/value_comparator.h"
+#include "mongo/db/storage/key_string.h"
namespace mongo {
+constexpr StringData ResumeToken::kDataFieldName;
+constexpr StringData ResumeToken::kTypeBitsFieldName;
-ResumeToken::ResumeToken(const BSONObj& resumeBson) {
- auto token =
- ResumeTokenInternal::parse(IDLParserErrorContext("$changeStream.resumeAfter"), resumeBson);
- _timestamp = token.getClusterTime().getTimestamp();
- _uuid = token.getUuid();
- _documentId = token.getDocumentKey();
+bool ResumeTokenData::operator==(const ResumeTokenData& other) const {
+ return clusterTime == other.clusterTime &&
+ (Value::compare(this->documentKey, other.documentKey, nullptr) == 0) && uuid == other.uuid;
}
-ResumeToken::ResumeToken(const Value& resumeValue) {
- Document resumeTokenDoc = resumeValue.getDocument();
- Value clusterTime = resumeTokenDoc[ResumeTokenInternal::kClusterTimeFieldName];
- Value timestamp = clusterTime[ResumeTokenClusterTime::kTimestampFieldName];
- _timestamp = timestamp.getTimestamp();
- Value uuid = resumeTokenDoc[ResumeTokenInternal::kUuidFieldName];
- _uuid = uuid.getUuid();
- _documentId = resumeTokenDoc[ResumeTokenInternal::kDocumentKeyFieldName];
+std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) {
+ return out << "{clusterTime: " << tokenData.clusterTime.toString()
+ << " documentKey: " << tokenData.documentKey << " uuid: " << tokenData.uuid << "}";
}
-bool ResumeToken::operator==(const ResumeToken& other) {
- return _timestamp == other._timestamp && _uuid == other._uuid &&
- ValueComparator::kInstance.evaluate(_documentId == other._documentId);
+ResumeToken::ResumeToken(const Document& resumeDoc) {
+ _keyStringData = resumeDoc[kDataFieldName];
+ _typeBits = resumeDoc[kTypeBitsFieldName];
+ uassert(40647,
+ str::stream() << "Bad resume token: _data of missing or of wrong type"
+ << resumeDoc.toString(),
+ _keyStringData.getType() == BinData &&
+ _keyStringData.getBinData().type == BinDataGeneral);
+ uassert(40648,
+ str::stream() << "Bad resume token: _typeBits of wrong type" << resumeDoc.toString(),
+ _typeBits.missing() ||
+ (_typeBits.getType() == BinData && _typeBits.getBinData().type == BinDataGeneral));
}
-Document ResumeToken::toDocument() const {
- ResumeTokenClusterTime clusterTime;
- clusterTime.setTimestamp(_timestamp);
- return Document({{ResumeTokenInternal::kClusterTimeFieldName, clusterTime.toBSON()},
- {{ResumeTokenInternal::kUuidFieldName}, _uuid},
- {{ResumeTokenInternal::kDocumentKeyFieldName}, _documentId}});
+// We encode the resume token as a KeyString with the sequence: clusterTime, documentKey, uuid.
+// Only the clusterTime is required.
+ResumeToken::ResumeToken(const ResumeTokenData& data) {
+ BSONObjBuilder builder;
+ builder.append("", data.clusterTime);
+ data.documentKey.addToBsonObj(&builder, "");
+ if (data.uuid) {
+ if (data.documentKey.missing()) {
+ // Never allow a missing document key with a UUID present, as that will mess up
+ // the field order.
+ builder.appendNull("");
+ }
+ data.uuid->appendToBuilder(&builder, "");
+ }
+ auto keyObj = builder.obj();
+ KeyString encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj()));
+ _keyStringData = Value(
+ BSONBinData(encodedToken.getBuffer(), encodedToken.getSize(), BinDataType::BinDataGeneral));
+ const auto& typeBits = encodedToken.getTypeBits();
+ if (!typeBits.isAllZeros())
+ _typeBits = Value(
+ BSONBinData(typeBits.getBuffer(), typeBits.getSize(), BinDataType::BinDataGeneral));
+}
+
+ResumeTokenData ResumeToken::getData() const {
+ KeyString::TypeBits typeBits(KeyString::Version::V1);
+ if (!_typeBits.missing()) {
+ BSONBinData typeBitsBinData = _typeBits.getBinData();
+ BufReader typeBitsReader(typeBitsBinData.data, typeBitsBinData.length);
+ typeBits.resetFromBuffer(&typeBitsReader);
+ }
+ BSONBinData keyStringBinData = _keyStringData.getBinData();
+ auto internalBson = KeyString::toBson(static_cast<const char*>(keyStringBinData.data),
+ keyStringBinData.length,
+ Ordering::make(BSONObj()),
+ typeBits);
+
+ BSONObjIterator i(internalBson);
+ ResumeTokenData result;
+ uassert(40649, "invalid empty resume token", i.more());
+ result.clusterTime = i.next().timestamp();
+ if (i.more())
+ result.documentKey = Value(i.next());
+ if (i.more())
+ result.uuid = uassertStatusOK(UUID::parse(i.next()));
+ uassert(40646, "invalid oversized resume token", !i.more());
+ return result;
+}
+
+int ResumeToken::compare(const ResumeToken& other) const {
+ BSONBinData thisData = _keyStringData.getBinData();
+ BSONBinData otherData = other._keyStringData.getBinData();
+ return StringData(static_cast<const char*>(thisData.data), thisData.length)
+ .compare(StringData(static_cast<const char*>(otherData.data), otherData.length));
+}
+
+bool ResumeToken::operator==(const ResumeToken& other) const {
+ return compare(other) == 0;
+}
+
+bool ResumeToken::operator!=(const ResumeToken& other) const {
+ return compare(other) != 0;
+}
+
+bool ResumeToken::operator<(const ResumeToken& other) const {
+ return compare(other) < 0;
+}
+
+bool ResumeToken::operator<=(const ResumeToken& other) const {
+ return compare(other) <= 0;
}
-BSONObj ResumeToken::toBSON() const {
- return toDocument().toBson();
+bool ResumeToken::operator>(const ResumeToken& other) const {
+ return compare(other) > 0;
+}
+
+bool ResumeToken::operator>=(const ResumeToken& other) const {
+ return compare(other) >= 0;
+}
+
+Document ResumeToken::toDocument() const {
+ return Document{{kDataFieldName, _keyStringData}, {kTypeBitsFieldName, _typeBits}};
}
-ResumeToken ResumeToken::parse(const BSONObj& resumeBson) {
- return ResumeToken(resumeBson);
+ResumeToken ResumeToken::parse(const Document& resumeDoc) {
+ return ResumeToken(resumeDoc);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 6837443d844..3e75bc0942d 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -28,17 +28,48 @@
#pragma once
+#include <boost/optional.hpp>
+
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/util/uuid.h"
namespace mongo {
+
+struct ResumeTokenData {
+ ResumeTokenData(){};
+ ResumeTokenData(Timestamp clusterTimeIn,
+ Value documentKeyIn,
+ const boost::optional<UUID>& uuidIn)
+ : clusterTime(clusterTimeIn), documentKey(std::move(documentKeyIn)), uuid(uuidIn){};
+
+ bool operator==(const ResumeTokenData& other) const;
+ bool operator!=(const ResumeTokenData& other) const {
+ return !(*this == other);
+ };
+
+ Timestamp clusterTime;
+ Value documentKey;
+ boost::optional<UUID> uuid;
+};
+
+std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData);
+
/**
* A token passed in by the user to indicate where in the oplog we should start for
- * $changeStream.
+ * $changeStream. This token has the following format:
+ * {
+ * _data: <binary data>,
+ * _typeBits: <binary data>
+ * }
+ * The _data field data is encoded such that byte by byte comparisons provide the correct
+ * ordering of tokens. The _typeBits field may be missing and should not affect token
+ * comparison.
*/
+
class ResumeToken {
public:
/**
@@ -46,35 +77,49 @@ public:
* fields.
*/
ResumeToken() = default;
- explicit ResumeToken(const Value& resumeValue);
- bool operator==(const ResumeToken&);
- Timestamp getTimestamp() const {
- return _timestamp;
- }
+ explicit ResumeToken(const ResumeTokenData& resumeValue);
- UUID getUuid() const {
- return _uuid;
- }
+ bool operator==(const ResumeToken&) const;
+ bool operator!=(const ResumeToken&) const;
+ bool operator<(const ResumeToken&) const;
+ bool operator<=(const ResumeToken&) const;
+ bool operator>(const ResumeToken&) const;
+ bool operator>=(const ResumeToken&) const;
+
+ /** Three way comparison, returns 0 if *this is equal to other, < 0 if *this is less than
+ * other, and > 0 if *this is greater than other.
+ */
+ int compare(const ResumeToken& other) const;
Document toDocument() const;
- BSONObj toBSON() const;
+ BSONObj toBSON() const {
+ return toDocument().toBson();
+ }
+
+ ResumeTokenData getData() const;
/**
* Parse a resume token from a BSON object; used as an interface to the IDL parser.
*/
- static ResumeToken parse(const BSONObj& obj);
+ static ResumeToken parse(const BSONObj& resumeBson) {
+ return ResumeToken::parse(Document(resumeBson));
+ }
+
+ static ResumeToken parse(const Document& document);
+
+ friend std::ostream& operator<<(std::ostream& out, const ResumeToken& token) {
+ return out << token.getData();
+ }
+
+ constexpr static StringData kDataFieldName = "_data"_sd;
+ constexpr static StringData kTypeBitsFieldName = "_typeBits"_sd;
private:
- /**
- * Construct from a BSON object.
- * External callers should use the static ResumeToken::parse(const BSONObj&) method instead.
- */
- explicit ResumeToken(const BSONObj& resumeBson);
+ explicit ResumeToken(const Document& resumeData);
- Timestamp _timestamp;
- UUID _uuid;
- Value _documentId;
+ Value _keyStringData;
+ Value _typeBits;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp
new file mode 100644
index 00000000000..c8196183aa6
--- /dev/null
+++ b/src/mongo/db/pipeline/resume_token_test.cpp
@@ -0,0 +1,322 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/resume_token.h"
+
+#include <boost/optional/optional_io.hpp>
+
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+namespace {
+
+TEST(ResumeToken, EncodesFullTokenFromData) {
+ Timestamp ts(1000, 2);
+ UUID testUuid = UUID::gen();
+ Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
+
+ ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid);
+ ResumeToken token(resumeTokenDataIn);
+ ResumeTokenData tokenData = token.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+}
+
+TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) {
+ Timestamp ts(1001, 3);
+
+ ResumeTokenData resumeTokenDataIn;
+ resumeTokenDataIn.clusterTime = ts;
+ ResumeToken token(resumeTokenDataIn);
+ ResumeTokenData tokenData = token.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+}
+
+TEST(ResumeToken, RoundTripThroughBsonFullToken) {
+ Timestamp ts(1000, 2);
+ UUID testUuid = UUID::gen();
+ Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
+
+ ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid);
+ auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toBSON());
+ ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+}
+
+TEST(ResumeToken, RoundTripThroughBsonTimestampOnlyToken) {
+ Timestamp ts(1001, 3);
+
+ ResumeTokenData resumeTokenDataIn;
+ resumeTokenDataIn.clusterTime = ts;
+ auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toBSON());
+ ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+}
+
+TEST(ResumeToken, RoundTripThroughDocumentFullToken) {
+ Timestamp ts(1000, 2);
+ UUID testUuid = UUID::gen();
+ Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
+
+ ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid);
+ auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument());
+ ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+}
+
+TEST(ResumeToken, RoundTripThroughDocumentTimestampOnlyToken) {
+ Timestamp ts(1001, 3);
+
+ ResumeTokenData resumeTokenDataIn;
+ resumeTokenDataIn.clusterTime = ts;
+ auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument());
+ ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+}
+
+TEST(ResumeToken, TestMissingTypebitsOptimization) {
+ Timestamp ts(1000, 1);
+ UUID testUuid = UUID::gen();
+
+ ResumeTokenData hasTypeBitsData(ts, Value(Document{{"_id", 1.0}}), testUuid);
+ ResumeTokenData noTypeBitsData(ResumeTokenData(ts, Value(Document{{"_id", 1}}), testUuid));
+ ResumeToken hasTypeBitsToken(hasTypeBitsData);
+ ResumeToken noTypeBitsToken(noTypeBitsData);
+ ASSERT_EQ(noTypeBitsToken, hasTypeBitsToken);
+ auto hasTypeBitsDoc = hasTypeBitsToken.toDocument();
+ auto noTypeBitsDoc = noTypeBitsToken.toDocument();
+ ASSERT_FALSE(hasTypeBitsDoc["_typeBits"].missing());
+ ASSERT_TRUE(noTypeBitsDoc["_typeBits"].missing()) << noTypeBitsDoc["_typeBits"];
+ auto rtHasTypeBitsData = ResumeToken::parse(hasTypeBitsDoc).getData();
+ auto rtNoTypeBitsData = ResumeToken::parse(noTypeBitsDoc).getData();
+ ASSERT_EQ(hasTypeBitsData, rtHasTypeBitsData);
+ ASSERT_EQ(noTypeBitsData, rtNoTypeBitsData);
+ ASSERT_EQ(BSONType::NumberDouble, rtHasTypeBitsData.documentKey["_id"].getType());
+ ASSERT_EQ(BSONType::NumberInt, rtNoTypeBitsData.documentKey["_id"].getType());
+}
+
+// Tests comparison functions for tokens constructed from oplog data.
+TEST(ResumeToken, CompareFromData) {
+ Timestamp ts1(1000, 1);
+ Timestamp ts2(1000, 2);
+ Timestamp ts3(1001, 1);
+ UUID testUuid = UUID::gen();
+ UUID testUuid2 = UUID::gen();
+ Document documentKey1a{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
+ Document documentKey1b{{"_id"_sd, "stuff"_sd},
+ {"otherkey"_sd, Document{{"otherstuff"_sd, 2.0}}}};
+ Document documentKey2{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 3}}}};
+ Document documentKey3{{"_id"_sd, "ztuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 0}}}};
+
+ ResumeToken token1a(ResumeTokenData(ts1, Value(documentKey1a), testUuid));
+ ResumeToken token1b(ResumeTokenData(ts1, Value(documentKey1b), testUuid));
+
+ // Equivalent types don't matter.
+ ASSERT_EQ(token1a, token1b);
+ ASSERT_LTE(token1a, token1b);
+ ASSERT_GTE(token1a, token1b);
+
+ // UUIDs matter, but all that really matters is they compare unequal.
+ ResumeToken tokenOtherCollection(ResumeTokenData(ts1, Value(documentKey1a), testUuid2));
+ ASSERT_NE(token1a, tokenOtherCollection);
+
+ ResumeToken token2(ResumeTokenData(ts1, Value(documentKey2), testUuid));
+
+ // Document keys matter.
+ ASSERT_LT(token1a, token2);
+ ASSERT_LTE(token1a, token2);
+ ASSERT_GT(token2, token1a);
+ ASSERT_GTE(token2, token1a);
+
+ ResumeToken token3(ResumeTokenData(ts1, Value(documentKey3), testUuid));
+
+ // Order within document keys matters.
+ ASSERT_LT(token1a, token3);
+ ASSERT_LTE(token1a, token3);
+ ASSERT_GT(token3, token1a);
+ ASSERT_GTE(token3, token1a);
+
+ ASSERT_LT(token2, token3);
+ ASSERT_LTE(token2, token3);
+ ASSERT_GT(token3, token2);
+ ASSERT_GTE(token3, token2);
+
+ ResumeToken token4(ResumeTokenData(ts2, Value(documentKey1a), testUuid));
+
+ // Timestamps matter.
+ ASSERT_LT(token1a, token4);
+ ASSERT_LTE(token1a, token4);
+ ASSERT_GT(token4, token1a);
+ ASSERT_GTE(token4, token1a);
+
+ // Timestamps matter more than document key.
+ ASSERT_LT(token3, token4);
+ ASSERT_LTE(token3, token4);
+ ASSERT_GT(token4, token3);
+ ASSERT_GTE(token4, token3);
+
+ ResumeToken token5(ResumeTokenData(ts3, Value(documentKey1a), testUuid));
+
+ // Time matters more than increment in timestamp
+ ASSERT_LT(token4, token5);
+ ASSERT_LTE(token4, token5);
+ ASSERT_GT(token5, token4);
+ ASSERT_GTE(token5, token4);
+}
+
+// Tests comparison functions for tokens constructed from the keystring-encoded form.
+TEST(ResumeToken, CompareFromEncodedData) {
+ Timestamp ts1(1000, 1);
+ Timestamp ts2(1000, 2);
+ Timestamp ts3(1001, 1);
+ UUID testUuid = UUID::gen();
+ UUID testUuid2 = UUID::gen();
+ Document documentKey1a{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
+ Document documentKey1b{{"_id"_sd, "stuff"_sd},
+ {"otherkey"_sd, Document{{"otherstuff"_sd, 2.0}}}};
+ Document documentKey2{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 3}}}};
+ Document documentKey3{{"_id"_sd, "ztuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 0}}}};
+
+ auto token1a = ResumeToken::parse(
+ ResumeToken(ResumeTokenData(ts1, Value(documentKey1a), testUuid)).toDocument());
+ auto token1b = ResumeToken::parse(
+ ResumeToken(ResumeTokenData(ts1, Value(documentKey1b), testUuid)).toDocument());
+
+ // Equivalent types don't matter.
+ ASSERT_EQ(token1a, token1b);
+ ASSERT_LTE(token1a, token1b);
+ ASSERT_GTE(token1a, token1b);
+
+ // UUIDs matter, but all that really matters is they compare unequal.
+ auto tokenOtherCollection = ResumeToken::parse(
+ ResumeToken(ResumeTokenData(ts1, Value(documentKey1a), testUuid2)).toDocument());
+ ASSERT_NE(token1a, tokenOtherCollection);
+
+ auto token2 = ResumeToken::parse(
+ ResumeToken(ResumeTokenData(ts1, Value(documentKey2), testUuid)).toDocument());
+
+ // Document keys matter.
+ ASSERT_LT(token1a, token2);
+ ASSERT_LTE(token1a, token2);
+ ASSERT_GT(token2, token1a);
+ ASSERT_GTE(token2, token1a);
+
+ auto token3 = ResumeToken::parse(
+ ResumeToken(ResumeTokenData(ts1, Value(documentKey3), testUuid)).toDocument());
+
+ // Order within document keys matters.
+ ASSERT_LT(token1a, token3);
+ ASSERT_LTE(token1a, token3);
+ ASSERT_GT(token3, token1a);
+ ASSERT_GTE(token3, token1a);
+
+ ASSERT_LT(token2, token3);
+ ASSERT_LTE(token2, token3);
+ ASSERT_GT(token3, token2);
+ ASSERT_GTE(token3, token2);
+
+ auto token4 = ResumeToken::parse(
+ ResumeToken(ResumeTokenData(ts2, Value(documentKey1a), testUuid)).toDocument());
+
+ // Timestamps matter.
+ ASSERT_LT(token1a, token4);
+ ASSERT_LTE(token1a, token4);
+ ASSERT_GT(token4, token1a);
+ ASSERT_GTE(token4, token1a);
+
+ // Timestamps matter more than document key.
+ ASSERT_LT(token3, token4);
+ ASSERT_LTE(token3, token4);
+ ASSERT_GT(token4, token3);
+ ASSERT_GTE(token4, token3);
+
+ auto token5 = ResumeToken::parse(
+ ResumeToken(ResumeTokenData(ts3, Value(documentKey1a), testUuid)).toDocument());
+
+ // Time matters more than increment in timestamp
+ ASSERT_LT(token4, token5);
+ ASSERT_LTE(token4, token5);
+ ASSERT_GT(token5, token4);
+ ASSERT_GTE(token5, token4);
+}
+
+TEST(ResumeToken, CorruptTokens) {
+ // Missing document.
+ ASSERT_THROWS(ResumeToken::parse(Document()), AssertionException);
+ // Missing data field.
+ ASSERT_THROWS(ResumeToken::parse(Document{{"somefield"_sd, "stuff"_sd}}), AssertionException);
+ // Wrong type data field
+ ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, "string"_sd}}), AssertionException);
+ ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, 0}}), AssertionException);
+
+ // Valid data field, but wrong type typeBits.
+ Timestamp ts(1010, 4);
+ ResumeTokenData tokenData;
+ tokenData.clusterTime = ts;
+ auto goodTokenDoc = ResumeToken(tokenData).toDocument();
+ auto goodData = goodTokenDoc["_data"].getBinData();
+ ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, goodData}, {"_typeBits", "string"_sd}}),
+ AssertionException);
+
+ // Valid data except wrong bindata type.
+ ASSERT_THROWS(ResumeToken::parse(
+ Document{{"_data"_sd, BSONBinData(goodData.data, goodData.length, newUUID)}}),
+ AssertionException);
+ // Valid data, wrong typeBits bindata type.
+ ASSERT_THROWS(
+ ResumeToken::parse(Document{{"_data"_sd, goodData},
+ {"_typeBits", BSONBinData(goodData.data, 0, newUUID)}}),
+ AssertionException);
+
+ const char zeroes[] = {0, 0, 0, 0, 0};
+ const char nonsense[] = {-91, 85, 77, 86, -1};
+ // Data of correct type, but empty. This won't fail until we try to decode the data.
+ auto emptyToken =
+ ResumeToken::parse(Document{{"_data"_sd, BSONBinData(zeroes, 0, BinDataGeneral)}});
+ ASSERT_THROWS(emptyToken.getData(), AssertionException);
+
+ // Data of correct type with a bunch of zeros.
+ auto zeroesToken =
+ ResumeToken::parse(Document{{"_data"_sd, BSONBinData(zeroes, 5, BinDataGeneral)}});
+ ASSERT_THROWS(emptyToken.getData(), AssertionException);
+
+ // Data of correct type with a bunch of nonsense
+ auto nonsenseToken =
+ ResumeToken::parse(Document{{"_data"_sd, BSONBinData(nonsense, 5, BinDataGeneral)}});
+ ASSERT_THROWS(emptyToken.getData(), AssertionException);
+
+ // Valid data, bad typeBits; note that an all-zeros typebits is valid so it is not tested here.
+ auto badTypeBitsToken = ResumeToken::parse(
+ Document{{"_data"_sd, goodData}, {"_typeBits", BSONBinData(nonsense, 5, BinDataGeneral)}});
+ ASSERT_THROWS(badTypeBitsToken.getData(), AssertionException);
+}
+
+} // namspace
+} // namspace mongo
diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h
index c77ccc2239a..9a36661e7b5 100644
--- a/src/mongo/db/pipeline/value.h
+++ b/src/mongo/db/pipeline/value.h
@@ -192,6 +192,8 @@ public:
int getInt() const;
long long getLong() const;
UUID getUuid() const;
+ // The returned BSONBinData remains owned by this Value.
+ BSONBinData getBinData() const;
const std::vector<Value>& getArray() const {
return _storage.getArray();
}
@@ -460,4 +462,10 @@ inline UUID Value::getUuid() const {
auto stringData = _storage.getString();
return UUID::fromCDR({stringData.rawData(), stringData.size()});
}
+
+inline BSONBinData Value::getBinData() const {
+ verify(getType() == BinData);
+ auto stringData = _storage.getString();
+ return BSONBinData(stringData.rawData(), stringData.size(), _storage.binDataType());
+}
};