summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-04-16 14:46:23 -0400
committerIan Boros <ian.boros@10gen.com>2018-04-16 19:10:56 -0400
commit9a27a0fd9668231601d4d6cdb324eece306b91d1 (patch)
tree3877de3ff468e268812d616f80c6e29b892de5a8
parenta58a24439b972bb8af00caf0cf6c0a8696a5899c (diff)
downloadmongo-9a27a0fd9668231601d4d6cdb324eece306b91d1.tar.gz
SERVER-34314 Ensure change stream can resume between entries in applyOps entry
-rw-r--r--jstests/change_streams/change_stream_apply_ops.js7
-rw-r--r--jstests/change_streams/change_stream_apply_ops_resumability.js198
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp51
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h19
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp32
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp74
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp2
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp29
-rw-r--r--src/mongo/db/pipeline/resume_token.h12
-rw-r--r--src/mongo/db/pipeline/resume_token_test.cpp154
10 files changed, 493 insertions, 85 deletions
diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/change_stream_apply_ops.js
index 85ea4b5ed8c..45f4e78fafa 100644
--- a/jstests/change_streams/change_stream_apply_ops.js
+++ b/jstests/change_streams/change_stream_apply_ops.js
@@ -80,6 +80,8 @@
let change = cst.getOneChange(changeStream);
assert.eq(change.fullDocument._id, 1);
assert.eq(change.operationType, "insert", tojson(change));
+ const firstChangeClusterTime = change.clusterTime;
+ assert(firstChangeClusterTime instanceof Timestamp, tojson(change));
const firstChangeTxnNumber = change.txnNumber;
const firstChangeLsid = change.lsid;
assert.eq(typeof firstChangeLsid, "object");
@@ -90,6 +92,7 @@
change = cst.getOneChange(changeStream);
assert.eq(change.fullDocument._id, 2);
assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(firstChangeClusterTime, change.clusterTime);
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
assert.eq(change.ns.coll, coll.getName());
@@ -100,6 +103,7 @@
change = cst.getOneChange(changeStream);
assert.eq(change.fullDocument._id, 111);
assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(firstChangeClusterTime, change.clusterTime);
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
assert.eq(change.ns.coll, otherCollName);
@@ -111,6 +115,7 @@
change = cst.getOneChange(changeStream);
assert.eq(change.fullDocument._id, 222);
assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(firstChangeClusterTime, change.clusterTime);
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
assert.eq(change.ns.coll, otherDbCollName);
@@ -121,6 +126,7 @@
change = cst.getOneChange(changeStream);
assert.eq(change.operationType, "update", tojson(change));
assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1}));
+ assert.eq(firstChangeClusterTime, change.clusterTime);
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
assert.eq(change.ns.coll, coll.getName());
@@ -130,6 +136,7 @@
change = cst.getOneChange(changeStream);
assert.eq(change.documentKey._id, kDeletedDocumentId);
assert.eq(change.operationType, "delete", tojson(change));
+ assert.eq(firstChangeClusterTime, change.clusterTime);
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
assert.eq(change.ns.coll, coll.getName());
diff --git a/jstests/change_streams/change_stream_apply_ops_resumability.js b/jstests/change_streams/change_stream_apply_ops_resumability.js
new file mode 100644
index 00000000000..8a9c8d55a62
--- /dev/null
+++ b/jstests/change_streams/change_stream_apply_ops_resumability.js
@@ -0,0 +1,198 @@
+// Tests that a change stream will correctly unwind applyOps entries generated by a transaction.
+// @tags: [uses_transactions]
+
+(function() {
+ "use strict";
+
+ load("jstests/libs/change_stream_util.js");
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ var WatchMode = {
+ kCollection: 1,
+ kDb: 2,
+ kCluster: 3,
+ };
+
+ function getChangeStream({cst, watchMode, coll, resumeToken}) {
+ const changeStreamDoc = {};
+ if (resumeToken) {
+ changeStreamDoc.resumeAfter = resumeToken;
+ }
+
+ if (watchMode == WatchMode.kCluster) {
+ changeStreamDoc.allChangesForCluster = true;
+ }
+ const collArg = (watchMode == WatchMode.kCollection ? coll : 1);
+
+ return cst.startWatchingChanges({
+ pipeline: [{$changeStream: changeStreamDoc}],
+ collection: collArg,
+ // Use a batch size of 0 to prevent any notifications from being returned in the first
+ // batch. These would be ignored by ChangeStreamTest.getOneChange().
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ }
+
+ function testChangeStreamsWithTransactions(watchMode) {
+ let dbToStartTestOn = db;
+ if (watchMode == WatchMode.kCluster) {
+ dbToStartTestOn = db.getSiblingDB("admin");
+ }
+
+ const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
+ const otherCollName = "change_stream_apply_ops_2";
+ assertDropAndRecreateCollection(db, otherCollName);
+
+ const otherDbName = "change_stream_apply_ops_db";
+ const otherDbCollName = "someColl";
+ assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
+
+ const cst = new ChangeStreamTest(dbToStartTestOn);
+
+ let changeStream = getChangeStream({cst: cst, watchMode: watchMode, coll: coll});
+
+ // Do an insert outside of a transaction.
+ assert.commandWorked(coll.insert({_id: 0, a: 123}));
+ const nonTxnChange = cst.getOneChange(changeStream);
+ assert.eq(nonTxnChange.operationType, "insert");
+ assert.eq(nonTxnChange.documentKey, {_id: 0});
+
+ const sessionOptions = {causalConsistency: false};
+ const session = db.getMongo().startSession(sessionOptions);
+ const sessionDb = session.getDatabase(db.getName());
+ const sessionColl = sessionDb[coll.getName()];
+
+ session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}});
+ assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
+ assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
+
+ // One insert on a collection that we're not watching. This should be skipped by the
+ // single-collection change stream.
+ assert.commandWorked(
+ sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"}));
+
+ // This should be skipped by the single-collection and single-db changestreams.
+ assert.commandWorked(session.getDatabase(otherDbName)[otherDbCollName].insert(
+ {_id: 222, a: "Doc on other DB"}));
+
+ assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
+
+ session.commitTransaction();
+
+ // Now insert another document, not part of a transaction.
+ assert.commandWorked(coll.insert({_id: 3, a: 123}));
+
+ // Check for the first insert.
+ const firstTxnChange = cst.getOneChange(changeStream);
+ assert.eq(firstTxnChange.fullDocument._id, 1);
+ assert.eq(firstTxnChange.operationType, "insert", tojson(firstTxnChange));
+
+ // Check for the second insert.
+ const secondTxnChange = cst.getOneChange(changeStream);
+ assert.eq(secondTxnChange.fullDocument._id, 2);
+ assert.eq(secondTxnChange.operationType, "insert", tojson(secondTxnChange));
+
+ // Resume after the first non-transaction change. Be sure we see the documents from the
+ // transaction again.
+ changeStream = getChangeStream(
+ {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id});
+ assert.docEq(cst.getOneChange(changeStream), firstTxnChange);
+ assert.docEq(cst.getOneChange(changeStream), secondTxnChange);
+
+ // Resume after the first transaction change. Be sure we see the second change again.
+ changeStream = getChangeStream(
+ {cst: cst, watchMode: watchMode, coll: coll, resumeToken: firstTxnChange._id});
+ assert.docEq(cst.getOneChange(changeStream), secondTxnChange);
+
+ let change = secondTxnChange;
+ if (watchMode >= WatchMode.kDb) {
+ // We should see the insert on the other collection.
+ change = cst.getOneChange(changeStream);
+ assert.eq(change.fullDocument._id, 111);
+ assert.eq(change.operationType, "insert", tojson(change));
+
+ // Resume from the beginning again, be sure we see everything up until now.
+ changeStream = getChangeStream(
+ {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id});
+ assert.docEq(cst.getOneChange(changeStream), firstTxnChange);
+ assert.docEq(cst.getOneChange(changeStream), secondTxnChange);
+ assert.docEq(cst.getOneChange(changeStream), change);
+ }
+
+ if (watchMode >= WatchMode.kCluster) {
+ // We should see the insert on the other db.
+ change = cst.getOneChange(changeStream);
+ assert.eq(change.fullDocument._id, 222);
+ assert.eq(change.operationType, "insert", tojson(change));
+
+ // Resume from the beginning again, be sure we see everything up until now.
+ changeStream = getChangeStream(
+ {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id});
+ assert.docEq(cst.getOneChange(changeStream), firstTxnChange);
+ assert.docEq(cst.getOneChange(changeStream), secondTxnChange);
+ // We should see the document which was inserted on the other _collection_.
+ const changeFromOtherCollection = cst.getOneChange(changeStream);
+ assert.eq(changeFromOtherCollection.fullDocument._id, 111);
+
+ // Resume from the document in the other collection.
+ changeStream = getChangeStream({
+ cst: cst,
+ watchMode: watchMode,
+ coll: coll,
+ resumeToken: changeFromOtherCollection._id
+ });
+
+ // We should again see the most recent document.
+ assert.docEq(cst.getOneChange(changeStream), change);
+ }
+
+ // Try starting another change stream from the latest change, the _last_ change caused by
+ // the transaction.
+ let otherCursor =
+ getChangeStream({cst: cst, watchMode: watchMode, coll: coll, resumeToken: change._id});
+
+ // Check for the update.
+ change = cst.getOneChange(changeStream);
+ assert.eq(change.operationType, "update", tojson(change));
+ assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1}));
+
+ // Check for the update on the other stream.
+ assert.docEq(change, cst.getOneChange(otherCursor));
+
+ // Now test that we can resume from the _last_ change caused by a transaction. We will
+ // check that both the initial change stream and the new one find the document that's
+ // inserted outside of the transaction.
+ otherCursor =
+ getChangeStream({cst: cst, watchMode: watchMode, coll: coll, resumeToken: change._id});
+
+ // Now check that the document inserted after the transaction is found.
+ change = cst.getOneChange(changeStream);
+ assert.eq(change.fullDocument._id, 3);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.docEq(change, cst.getOneChange(otherCursor));
+
+ // Drop the collection. This will trigger an "invalidate" event.
+ assert.commandWorked(db.runCommand({drop: coll.getName()}));
+
+ // The drop should have invalidated the change stream.
+ cst.assertNextChangesEqual({
+ cursor: changeStream,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ cst.assertNextChangesEqual({
+ cursor: otherCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ cst.cleanUp();
+ }
+
+ // TODO: SERVER-34302 should allow us to simplify this test, so we're not required to
+ // explicitly run both against a single collection and against the whole DB.
+ testChangeStreamsWithTransactions(WatchMode.kCollection);
+ testChangeStreamsWithTransactions(WatchMode.kDb);
+ testChangeStreamsWithTransactions(WatchMode.kCluster);
+}());
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 922b01d00c2..99729a4d0de 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -122,6 +122,7 @@ void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Doc
checkValueType(input["o"], "o", BSONType::Object);
Value applyOps = input.getNestedField("o.applyOps");
+
checkValueType(applyOps, "applyOps", BSONType::Array);
invariant(applyOps.getArrayLength() > 0);
@@ -131,7 +132,36 @@ void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Doc
Value txnNumber = input["txnNumber"];
checkValueType(txnNumber, "txnNumber", BSONType::NumberLong);
- _txnContext.emplace(applyOps, lsid.getDocument(), txnNumber.getLong());
+ Value ts = input[repl::OplogEntry::kTimestampFieldName];
+ Timestamp clusterTime = ts.getTimestamp();
+
+ _txnContext.emplace(applyOps, clusterTime, lsid.getDocument(), txnNumber.getLong());
+}
+
+ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts,
+ Value uuid,
+ Value documentKey) {
+ ResumeTokenData resumeTokenData;
+ if (_txnContext) {
+ // We're in the middle of unwinding an 'applyOps'.
+
+ // Use the clusterTime from the higher level applyOps
+ resumeTokenData.clusterTime = _txnContext->clusterTime;
+
+ // 'pos' points to the _next_ applyOps index, so we must subtract one to get the index of
+ // the entry being examined right now.
+ invariant(_txnContext->pos >= 1);
+ resumeTokenData.applyOpsIndex = _txnContext->pos - 1;
+ } else {
+ resumeTokenData.clusterTime = ts.getTimestamp();
+ resumeTokenData.applyOpsIndex = 0;
+ }
+
+ resumeTokenData.documentKey = documentKey;
+ if (!uuid.missing())
+ resumeTokenData.uuid = uuid.getUuid();
+
+ return resumeTokenData;
}
Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) {
@@ -277,21 +307,11 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
documentKey = Value();
}
- // Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will
- // not appear in the output.
- ResumeTokenData resumeTokenData;
- if (_txnContext) {
- // We're in the middle of unwinding an 'applyOps'.
-
- // TODO: SERVER-34314
- // For now we return an empty resumeToken.
- } else {
- resumeTokenData.clusterTime = ts.getTimestamp();
- resumeTokenData.documentKey = documentKey;
- if (!uuid.missing())
- resumeTokenData.uuid = uuid.getUuid();
- }
+ // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear
+ // in the output.
+ ResumeTokenData resumeTokenData = getResumeToken(ts, uuid, documentKey);
+ // Add some additional fields only relevant to transactions.
if (_txnContext) {
doc.addField(DocumentSourceChangeStream::kTxnNumberField,
Value(static_cast<long long>(_txnContext->txnNumber)));
@@ -306,7 +326,6 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
// If we're in a sharded environment, we'll need to merge the results by their sort key, so add
// that as metadata.
if (pExpCtx->needsMerge) {
- // TODO SERVER-34314: Sort key may have to be _id.data in FCV 4.0.
doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 5ae0ea3fc6f..200c489cf07 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -88,14 +88,29 @@ private:
// Our current place in the 'opArray'.
size_t pos;
+ // The clusterTime of the applyOps.
+ Timestamp clusterTime;
+
// Fields that were taken from the 'applyOps' oplog entry.
Document lsid;
TxnNumber txnNumber;
- TransactionContext(const Value& applyOpsVal, const Document& lsidDoc, TxnNumber n)
- : opArray(applyOpsVal), arr(opArray.getArray()), pos(0), lsid(lsidDoc), txnNumber(n) {}
+ TransactionContext(const Value& applyOpsVal,
+ Timestamp ts,
+ const Document& lsidDoc,
+ TxnNumber n)
+ : opArray(applyOpsVal),
+ arr(opArray.getArray()),
+ pos(0),
+ clusterTime(ts),
+ lsid(lsidDoc),
+ txnNumber(n) {}
};
+ /**
+ * Helper used for determining what resume token to return.
+ */
+ ResumeTokenData getResumeToken(Value ts, Value uuid, Value documentKey);
void initializeTransactionContext(const Document& input);
/**
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 6b5e386051b..45a4e735f1e 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -41,17 +41,17 @@ using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus;
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
// and ResumeToken::kCannotResume if it is more recent than the client's resume token (indicating
// that we will never see the token). If the resume token's documentKey contains only the _id field
-// while the pipeline documentKey contains additional fields, then the collection has become sharded
-// since the resume token was generated. In that case, we relax the requirements such that only the
-// timestamp, UUID and documentKey._id need match. This remains correct, since the only
-// circumstances under which the resume token omits the shard key is if it was generated either (1)
-// before the collection was sharded, (2) after the collection was sharded but before the primary
-// shard became aware of that fact, implying that it was before the first chunk moved off the shard,
-// or (3) by a malicious client who has constructed their own resume token. In the first two cases,
-// we can be guaranteed that the _id is unique and the stream can therefore be resumed seamlessly;
-// in the third case, the worst that can happen is that some entries are missed or duplicated. Note
-// that the simple collation is used to compare the resume tokens, and that we purposefully avoid
-// the user's requested collation if present.
+// while the pipeline documentKey contains additional fields, then the collection has become
+// sharded since the resume token was generated. In that case, we relax the requirements such that
+// only the timestamp, version, applyOpsIndex, UUID and documentKey._id need match. This remains
+// correct, since the only circumstances under which the resume token omits the shard key is if it
+// was generated either (1) before the collection was sharded, (2) after the collection was sharded
+// but before the primary shard became aware of that fact, implying that it was before the first
+// chunk moved off the shard, or (3) by a malicious client who has constructed their own resume
+// token. In the first two cases, we can be guaranteed that the _id is unique and the stream can
+// therefore be resumed seamlessly; in the third case, the worst that can happen is that some
+// entries are missed or duplicated. Note that the simple collation is used to compare the resume
+// tokens, and that we purposefully avoid the user's requested collation if present.
ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionContext>& expCtx,
const Document& documentFromResumedStream,
const ResumeToken& tokenFromClient) {
@@ -68,6 +68,16 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
if (tokenDataFromResumedStream.clusterTime != tokenDataFromClient.clusterTime) {
return ResumeStatus::kCannotResume;
}
+
+ if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) {
+ return ResumeStatus::kCheckNextDoc;
+ } else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) {
+ // This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in
+ // the applyOps was irrelevant (meaning it was an operation on a collection or DB not being
+ // watched). This indicates a corrupt resume token.
+ uasserted(50792, "Invalid resumeToken: applyOpsIndex was skipped");
+ }
+
// It is acceptable for the stream UUID to differ from the client's, if this is a whole-database
// or cluster-wide stream and we are comparing operations from different shards at the same
// clusterTime. If the stream UUID sorts after the client's, however, then the stream is not
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index c216c81b52d..73f52287122 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -61,21 +61,30 @@ public:
protected:
/**
- * Pushes a document with a resume token corresponding to the given timestamp, docKey, and
- * namespace into the mock queue.
+ * Pushes a document with a resume token corresponding to the given timestamp, version,
+ * applyOpsIndex, docKey, and namespace into the mock queue.
*/
- void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
+ void addDocument(
+ Timestamp ts, int version, std::size_t applyOpsIndex, Document docKey, UUID uuid) {
_mock->queue.push_back(
Document{{"_id",
- ResumeToken(ResumeTokenData(ts, Value(docKey), uuid))
+ ResumeToken(ResumeTokenData(ts, version, applyOpsIndex, Value(docKey), uuid))
.toDocument(ResumeToken::SerializationFormat::kHexString)}});
}
+
+ /**
+ * Pushes a document with a resume token corresponding to the given timestamp, version,
+ * applyOpsIndex, docKey, and namespace into the mock queue.
+ */
+ void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) {
+ addDocument(ts, 0, 0, docKey, uuid);
+ }
/**
* Pushes a document with a resume token corresponding to the given timestamp, _id string, and
* namespace into the mock queue.
*/
void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) {
- addDocument(ts, Document{{"_id", id}}, uuid);
+ addDocument(ts, 0, 0, Document{{"_id", id}}, uuid);
}
void addPause() {
@@ -87,19 +96,34 @@ protected:
* namespace.
*/
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
- Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) {
- ResumeToken token(ResumeTokenData(ts, docKey ? Value(*docKey) : Value(), uuid));
+ Timestamp ts,
+ int version,
+ std::size_t applyOpsIndex,
+ boost::optional<Document> docKey,
+ UUID uuid) {
+ ResumeToken token(
+ ResumeTokenData(ts, version, applyOpsIndex, docKey ? Value(*docKey) : Value(), uuid));
auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), token);
checkResumeToken->setSource(_mock.get());
return checkResumeToken;
}
+
+ /**
+ * Convenience method to create the class under test with a given timestamp, docKey, and
+ * namespace.
+ */
+ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
+ Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) {
+ return createCheckResumeToken(ts, 0, 0, docKey, uuid);
+ }
+
/**
* Convenience method to create the class under test with a given timestamp, _id string, and
* namespace.
*/
intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken(
Timestamp ts, StringData id, UUID uuid = testUuid()) {
- return createCheckResumeToken(ts, Document{{"_id", id}}, uuid);
+ return createCheckResumeToken(ts, 0, 0, Document{{"_id", id}}, uuid);
}
/**
@@ -380,6 +404,40 @@ TEST_F(CheckResumeTokenTest,
ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585);
}
+TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) {
+ Timestamp resumeTimestamp(100, 1);
+
+ // Create an ordered array of 3 UUIDs.
+ std::vector<UUID> uuids = {UUID::gen(), UUID::gen(), UUID::gen()};
+
+ std::sort(uuids.begin(), uuids.end());
+
+ auto checkResumeToken =
+ createCheckResumeToken(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]);
+
+ // Add two documents which have the same clusterTime and version but a lower applyOps index. One
+ // of the documents has a lower uuid than the resume token, the other has a higher uuid; this
+ // demonstrates that the applyOps index is the discriminating factor.
+ addDocument(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]);
+ addDocument(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]);
+
+ // Add a third document that matches the resume token.
+ addDocument(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]);
+
+ // Add a fourth document with the same timestamp and version whose applyOps sorts after the
+ // resume token.
+ auto expectedDocKey = Document{{"_id"_sd, 3}};
+ addDocument(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]);
+
+ // We should skip the first two docs, swallow the resume token, and return the fourth doc.
+ const auto firstDocAfterResume = checkResumeToken->getNext();
+ const auto tokenFromFirstDocAfterResume =
+ ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData();
+
+ ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, resumeTimestamp);
+ ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey);
+}
+
/**
* We should _error_ on the no-document case, because that means the resume token was not found.
*/
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 15d236fd409..d5ebad0e611 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -70,7 +70,7 @@ public:
tokenData.clusterTime = ts;
return ResumeToken(tokenData).toDocument(ResumeToken::SerializationFormat::kHexString);
}
- return ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), testUuid()))
+ return ResumeToken(ResumeTokenData(ts, 0, 0, Value(Document{{"_id", id}}), testUuid()))
.toDocument(ResumeToken::SerializationFormat::kHexString);
}
};
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index fd1e8cd3ce7..0fb316bcf4d 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -104,11 +104,14 @@ ResumeToken::ResumeToken(const Document& resumeDoc) {
_typeBits.getBinData().type == BinDataGeneral));
}
-// We encode the resume token as a KeyString with the sequence: clusterTime, uuid, documentKey.
+// We encode the resume token as a KeyString with the sequence:
+// clusterTime, version, applyOpsIndex, uuid, documentKey
// Only the clusterTime is required.
ResumeToken::ResumeToken(const ResumeTokenData& data) {
BSONObjBuilder builder;
builder.append("", data.clusterTime);
+ builder.append("", data.version);
+ builder.appendNumber("", data.applyOpsIndex);
uassert(50788,
"Unexpected resume token with a documentKey but no UUID",
data.uuid || data.documentKey.missing());
@@ -189,6 +192,30 @@ ResumeTokenData ResumeToken::getData() const {
break;
}
case BSONType::String: {
+ // Next comes the resume token version.
+ auto versionElt = i.next();
+ uassert(50790,
+ "Resume Token does not contain applyOpsIndex",
+ versionElt.type() == BSONType::NumberInt);
+ result.version = versionElt.numberInt();
+ uassert(50791, "Invalid Resume Token: only supports version 0", result.version == 0);
+
+ // The new format has applyOpsIndex next.
+ auto applyOpsElt = i.next();
+ uassert(50793,
+ "Resume Token does not contain applyOpsIndex",
+ applyOpsElt.type() == BSONType::NumberInt);
+ const int applyOpsInd = applyOpsElt.numberInt();
+ uassert(50794,
+ "Invalid Resume Token: applyOpsIndex should be non-negative",
+ applyOpsInd >= 0);
+ result.applyOpsIndex = applyOpsInd;
+
+ // The the UUID and documentKey are not required.
+ if (!i.more()) {
+ return result;
+ }
+
// In the new format, the UUID comes first, then the documentKey.
result.uuid = uassertStatusOK(UUID::parse(i.next()));
if (i.more()) {
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 7d1bb3c432f..25b30788fd3 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -42,9 +42,15 @@ namespace mongo {
struct ResumeTokenData {
ResumeTokenData(){};
ResumeTokenData(Timestamp clusterTimeIn,
+ int versionIn,
+ size_t applyOpsIndexIn,
Value documentKeyIn,
const boost::optional<UUID>& uuidIn)
- : clusterTime(clusterTimeIn), documentKey(std::move(documentKeyIn)), uuid(uuidIn){};
+ : clusterTime(clusterTimeIn),
+ version(versionIn),
+ applyOpsIndex(applyOpsIndexIn),
+ documentKey(std::move(documentKeyIn)),
+ uuid(uuidIn){};
bool operator==(const ResumeTokenData& other) const;
bool operator!=(const ResumeTokenData& other) const {
@@ -52,6 +58,8 @@ struct ResumeTokenData {
};
Timestamp clusterTime;
+ int version = 0;
+ size_t applyOpsIndex = 0;
Value documentKey;
boost::optional<UUID> uuid;
};
@@ -69,7 +77,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData);
* 2. Using a hex-encoded string in a similar format:
* {
* _data: String, A hex encoding of the binary generated by keystring encoding the clusterTime,
- * UUID, then documentKey in that order.
+ * version, applyOpsIndex, UUID, then documentKey in that order.
* _typeBits: BinData - The keystring type bits used for deserialization.
* }
* The _data field data is encoded such that string comparisons provide the correct ordering of
diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp
index 2fe8c0ac33c..6f7eaf4bbb4 100644
--- a/src/mongo/db/pipeline/resume_token_test.cpp
+++ b/src/mongo/db/pipeline/resume_token_test.cpp
@@ -46,7 +46,7 @@ TEST(ResumeToken, EncodesFullTokenFromData) {
UUID testUuid = UUID::gen();
Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
- ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid);
+ ResumeTokenData resumeTokenDataIn(ts, 0, 0, Value(documentKey), testUuid);
ResumeToken token(resumeTokenDataIn);
ResumeTokenData tokenData = token.getData();
ASSERT_EQ(resumeTokenDataIn, tokenData);
@@ -67,7 +67,7 @@ TEST(ResumeToken, ShouldRoundTripThroughHexStringEncoding) {
UUID testUuid = UUID::gen();
Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
- ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid);
+ ResumeTokenData resumeTokenDataIn(ts, 0, 0, Value(documentKey), testUuid);
// Test serialization/parsing through Document.
auto rtToken =
@@ -87,7 +87,7 @@ TEST(ResumeToken, ShouldRoundTripThroughBinDataEncoding) {
UUID testUuid = UUID::gen();
Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}};
- ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid);
+ ResumeTokenData resumeTokenDataIn(ts, 0, 0, Value(documentKey), testUuid);
// Test serialization/parsing through Document.
auto rtToken =
@@ -144,8 +144,9 @@ TEST(ResumeToken, TestMissingTypebitsOptimization) {
Timestamp ts(1000, 1);
UUID testUuid = UUID::gen();
- ResumeTokenData hasTypeBitsData(ts, Value(Document{{"_id", 1.0}}), testUuid);
- ResumeTokenData noTypeBitsData(ResumeTokenData(ts, Value(Document{{"_id", 1}}), testUuid));
+ ResumeTokenData hasTypeBitsData(ts, 0, 0, Value(Document{{"_id", 1.0}}), testUuid);
+ ResumeTokenData noTypeBitsData(
+ ResumeTokenData(ts, 0, 0, Value(Document{{"_id", 1}}), testUuid));
ResumeToken hasTypeBitsToken(hasTypeBitsData);
ResumeToken noTypeBitsToken(noTypeBitsData);
ASSERT_EQ(noTypeBitsToken, hasTypeBitsToken);
@@ -224,9 +225,51 @@ TEST(ResumeToken, CorruptTokens) {
ASSERT_THROWS(badTypeBitsToken.getData(), AssertionException);
}
+TEST(ResumeToken, WrongVersionToken) {
+ Timestamp ts(1001, 3);
+
+ ResumeTokenData resumeTokenDataIn;
+ resumeTokenDataIn.clusterTime = ts;
+ resumeTokenDataIn.version = 0;
+
+ // This one with version 0 should succeed.
+ auto rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson());
+ ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+
+ // With version 1 it should fail.
+ resumeTokenDataIn.version = 1;
+ rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson());
+
+ ASSERT_THROWS(rtToken.getData(), AssertionException);
+}
+
+TEST(ResumeToken, InvalidApplyOpsIndex) {
+ Timestamp ts(1001, 3);
+
+ ResumeTokenData resumeTokenDataIn;
+ resumeTokenDataIn.clusterTime = ts;
+ resumeTokenDataIn.applyOpsIndex = 1234;
+
+ // Should round trip with a non-negative applyOpsIndex.
+ auto rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson());
+ ResumeTokenData tokenData = rtToken.getData();
+ ASSERT_EQ(resumeTokenDataIn, tokenData);
+
+ // Should fail with a negative applyOpsIndex.
+ resumeTokenDataIn.applyOpsIndex = std::numeric_limits<size_t>::max();
+ rtToken =
+ ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson());
+
+ ASSERT_THROWS(rtToken.getData(), AssertionException);
+}
+
TEST(ResumeToken, StringEncodingSortsCorrectly) {
// Make sure that the string encoding of the resume tokens will compare in the correct order,
- // namely timestamp, uuid, then documentKey.
+ // namely timestamp, version, applyOpsIndex, uuid, then documentKey.
Timestamp ts2_2(2, 2);
Timestamp ts10_4(10, 4);
Timestamp ts10_5(10, 5);
@@ -247,44 +290,67 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) {
};
// Test using only Timestamps.
- assertLt({ts2_2, Value(), boost::none}, {ts10_4, Value(), boost::none});
- assertLt({ts2_2, Value(), boost::none}, {ts10_5, Value(), boost::none});
- assertLt({ts2_2, Value(), boost::none}, {ts11_3, Value(), boost::none});
- assertLt({ts10_4, Value(), boost::none}, {ts10_5, Value(), boost::none});
- assertLt({ts10_4, Value(), boost::none}, {ts11_3, Value(), boost::none});
- assertLt({ts10_5, Value(), boost::none}, {ts11_3, Value(), boost::none});
-
- // Test that the Timestamp is more important than the UUID and documentKey.
- assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid},
- {ts10_5, Value(Document{{"_id", 0}}), lower_uuid});
- assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid},
- {ts10_5, Value(Document{{"_id", 0}}), lower_uuid});
- assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid},
- {ts10_5, Value(Document{{"_id", 0}}), lower_uuid});
- assertLt({ts10_4, Value(Document{{"_id", 0}}), higher_uuid},
- {ts10_5, Value(Document{{"_id", 0}}), lower_uuid});
- assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid},
- {ts10_5, Value(Document{{"_id", 0}}), higher_uuid});
-
- // Test that when the Timestamp is the same, the UUID breaks the tie.
- assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid},
- {ts2_2, Value(Document{{"_id", 0}}), higher_uuid});
- assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid},
- {ts10_4, Value(Document{{"_id", 0}}), higher_uuid});
- assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid},
- {ts10_4, Value(Document{{"_id", 0}}), higher_uuid});
- assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid},
- {ts10_4, Value(Document{{"_id", 2}}), higher_uuid});
-
- // Test that when the Timestamp and the UUID are the same, the documentKey breaks the tie.
- assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid},
- {ts2_2, Value(Document{{"_id", 1}}), lower_uuid});
- assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid},
- {ts10_4, Value(Document{{"_id", 1}}), lower_uuid});
- assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid},
- {ts10_4, Value(Document{{"_id", "string"_sd}}), lower_uuid});
- assertLt({ts10_4, Value(Document{{"_id", BSONNULL}}), lower_uuid},
- {ts10_4, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts2_2, 0, 0, Value(), boost::none}, {ts10_4, 0, 0, Value(), boost::none});
+ assertLt({ts2_2, 0, 0, Value(), boost::none}, {ts10_5, 0, 0, Value(), boost::none});
+ assertLt({ts2_2, 0, 0, Value(), boost::none}, {ts11_3, 0, 0, Value(), boost::none});
+ assertLt({ts10_4, 0, 0, Value(), boost::none}, {ts10_5, 0, 0, Value(), boost::none});
+ assertLt({ts10_4, 0, 0, Value(), boost::none}, {ts11_3, 0, 0, Value(), boost::none});
+ assertLt({ts10_5, 0, 0, Value(), boost::none}, {ts11_3, 0, 0, Value(), boost::none});
+
+ // Test using Timestamps and version.
+ assertLt({ts2_2, 0, 0, Value(), boost::none}, {ts2_2, 1, 0, Value(), boost::none});
+ assertLt({ts10_4, 5, 0, Value(), boost::none}, {ts10_4, 10, 0, Value(), boost::none});
+
+ // Test that the Timestamp is more important than the version, applyOpsIndex, UUID and
+ // documentKey.
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_5, 0, 0, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts2_2, 0, 0, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_5, 0, 0, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_5, 0, 0, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), higher_uuid},
+ {ts10_5, 0, 0, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_5, 0, 0, Value(Document{{"_id", 0}}), higher_uuid});
+
+ // Test that when the Timestamp is the same, the version breaks the tie.
+ assertLt({ts10_4, 1, 50, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_4, 5, 1, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts2_2, 1, 0, Value(Document{{"_id", 0}}), higher_uuid},
+ {ts2_2, 2, 0, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts10_4, 1, 0, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_4, 2, 0, Value(Document{{"_id", 0}}), lower_uuid});
+
+ // Test that when the Timestamp and version are the same, the applyOpsIndex breaks the tie.
+ assertLt({ts10_4, 1, 6, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_4, 1, 50, Value(Document{{"_id", 0}}), lower_uuid});
+ assertLt({ts2_2, 0, 0, Value(Document{{"_id", 0}}), higher_uuid},
+ {ts2_2, 0, 4, Value(Document{{"_id", 0}}), lower_uuid});
+
+ // Test that when the Timestamp, version, and applyOpsIndex are the same, the UUID breaks the
+ // tie.
+ assertLt({ts2_2, 0, 0, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts2_2, 0, 0, Value(Document{{"_id", 0}}), higher_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_4, 0, 0, Value(Document{{"_id", 0}}), higher_uuid});
+ assertLt({ts10_4, 1, 2, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_4, 1, 2, Value(Document{{"_id", 0}}), higher_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_4, 0, 0, Value(Document{{"_id", 0}}), higher_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_4, 0, 0, Value(Document{{"_id", 2}}), higher_uuid});
+
+ // Test that when the Timestamp, version, applyOpsIndex, and UUID are the same, the documentKey
+ // breaks the tie.
+ assertLt({ts2_2, 0, 0, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts2_2, 0, 0, Value(Document{{"_id", 1}}), lower_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid},
+ {ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid},
+ {ts10_4, 0, 0, Value(Document{{"_id", "string"_sd}}), lower_uuid});
+ assertLt({ts10_4, 0, 0, Value(Document{{"_id", BSONNULL}}), lower_uuid},
+ {ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid});
}
} // namspace