diff options
author | Ian Boros <ian.boros@10gen.com> | 2018-04-16 14:46:23 -0400 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2018-04-16 19:10:56 -0400 |
commit | 9a27a0fd9668231601d4d6cdb324eece306b91d1 (patch) | |
tree | 3877de3ff468e268812d616f80c6e29b892de5a8 | |
parent | a58a24439b972bb8af00caf0cf6c0a8696a5899c (diff) | |
download | mongo-9a27a0fd9668231601d4d6cdb324eece306b91d1.tar.gz |
SERVER-34314 Ensure change stream can resume between entries in applyOps entry
10 files changed, 493 insertions, 85 deletions
diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/change_stream_apply_ops.js index 85ea4b5ed8c..45f4e78fafa 100644 --- a/jstests/change_streams/change_stream_apply_ops.js +++ b/jstests/change_streams/change_stream_apply_ops.js @@ -80,6 +80,8 @@ let change = cst.getOneChange(changeStream); assert.eq(change.fullDocument._id, 1); assert.eq(change.operationType, "insert", tojson(change)); + const firstChangeClusterTime = change.clusterTime; + assert(firstChangeClusterTime instanceof Timestamp, tojson(change)); const firstChangeTxnNumber = change.txnNumber; const firstChangeLsid = change.lsid; assert.eq(typeof firstChangeLsid, "object"); @@ -90,6 +92,7 @@ change = cst.getOneChange(changeStream); assert.eq(change.fullDocument._id, 2); assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(firstChangeClusterTime, change.clusterTime); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); assert.eq(change.ns.coll, coll.getName()); @@ -100,6 +103,7 @@ change = cst.getOneChange(changeStream); assert.eq(change.fullDocument._id, 111); assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(firstChangeClusterTime, change.clusterTime); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); assert.eq(change.ns.coll, otherCollName); @@ -111,6 +115,7 @@ change = cst.getOneChange(changeStream); assert.eq(change.fullDocument._id, 222); assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(firstChangeClusterTime, change.clusterTime); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); assert.eq(change.ns.coll, otherDbCollName); @@ -121,6 +126,7 @@ change = cst.getOneChange(changeStream); assert.eq(change.operationType, "update", tojson(change)); assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1})); + assert.eq(firstChangeClusterTime, change.clusterTime); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); assert.eq(change.ns.coll, coll.getName()); @@ -130,6 +136,7 @@ change = cst.getOneChange(changeStream); assert.eq(change.documentKey._id, kDeletedDocumentId); assert.eq(change.operationType, "delete", tojson(change)); + assert.eq(firstChangeClusterTime, change.clusterTime); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); assert.eq(change.ns.coll, coll.getName()); diff --git a/jstests/change_streams/change_stream_apply_ops_resumability.js b/jstests/change_streams/change_stream_apply_ops_resumability.js new file mode 100644 index 00000000000..8a9c8d55a62 --- /dev/null +++ b/jstests/change_streams/change_stream_apply_ops_resumability.js @@ -0,0 +1,198 @@ +// Tests that a change stream will correctly unwind applyOps entries generated by a transaction. +// @tags: [uses_transactions] + +(function() { + "use strict"; + + load("jstests/libs/change_stream_util.js"); + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + var WatchMode = { + kCollection: 1, + kDb: 2, + kCluster: 3, + }; + + function getChangeStream({cst, watchMode, coll, resumeToken}) { + const changeStreamDoc = {}; + if (resumeToken) { + changeStreamDoc.resumeAfter = resumeToken; + } + + if (watchMode == WatchMode.kCluster) { + changeStreamDoc.allChangesForCluster = true; + } + const collArg = (watchMode == WatchMode.kCollection ? coll : 1); + + return cst.startWatchingChanges({ + pipeline: [{$changeStream: changeStreamDoc}], + collection: collArg, + // Use a batch size of 0 to prevent any notifications from being returned in the first + // batch. These would be ignored by ChangeStreamTest.getOneChange(). + aggregateOptions: {cursor: {batchSize: 0}}, + }); + } + + function testChangeStreamsWithTransactions(watchMode) { + let dbToStartTestOn = db; + if (watchMode == WatchMode.kCluster) { + dbToStartTestOn = db.getSiblingDB("admin"); + } + + const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops"); + const otherCollName = "change_stream_apply_ops_2"; + assertDropAndRecreateCollection(db, otherCollName); + + const otherDbName = "change_stream_apply_ops_db"; + const otherDbCollName = "someColl"; + assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName); + + const cst = new ChangeStreamTest(dbToStartTestOn); + + let changeStream = getChangeStream({cst: cst, watchMode: watchMode, coll: coll}); + + // Do an insert outside of a transaction. + assert.commandWorked(coll.insert({_id: 0, a: 123})); + const nonTxnChange = cst.getOneChange(changeStream); + assert.eq(nonTxnChange.operationType, "insert"); + assert.eq(nonTxnChange.documentKey, {_id: 0}); + + const sessionOptions = {causalConsistency: false}; + const session = db.getMongo().startSession(sessionOptions); + const sessionDb = session.getDatabase(db.getName()); + const sessionColl = sessionDb[coll.getName()]; + + session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}}); + assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); + assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); + + // One insert on a collection that we're not watching. This should be skipped by the + // single-collection change stream. + assert.commandWorked( + sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"})); + + // This should be skipped by the single-collection and single-db changestreams. + assert.commandWorked(session.getDatabase(otherDbName)[otherDbCollName].insert( + {_id: 222, a: "Doc on other DB"})); + + assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); + + session.commitTransaction(); + + // Now insert another document, not part of a transaction. + assert.commandWorked(coll.insert({_id: 3, a: 123})); + + // Check for the first insert. + const firstTxnChange = cst.getOneChange(changeStream); + assert.eq(firstTxnChange.fullDocument._id, 1); + assert.eq(firstTxnChange.operationType, "insert", tojson(firstTxnChange)); + + // Check for the second insert. + const secondTxnChange = cst.getOneChange(changeStream); + assert.eq(secondTxnChange.fullDocument._id, 2); + assert.eq(secondTxnChange.operationType, "insert", tojson(secondTxnChange)); + + // Resume after the first non-transaction change. Be sure we see the documents from the + // transaction again. + changeStream = getChangeStream( + {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id}); + assert.docEq(cst.getOneChange(changeStream), firstTxnChange); + assert.docEq(cst.getOneChange(changeStream), secondTxnChange); + + // Resume after the first transaction change. Be sure we see the second change again. + changeStream = getChangeStream( + {cst: cst, watchMode: watchMode, coll: coll, resumeToken: firstTxnChange._id}); + assert.docEq(cst.getOneChange(changeStream), secondTxnChange); + + let change = secondTxnChange; + if (watchMode >= WatchMode.kDb) { + // We should see the insert on the other collection. + change = cst.getOneChange(changeStream); + assert.eq(change.fullDocument._id, 111); + assert.eq(change.operationType, "insert", tojson(change)); + + // Resume from the beginning again, be sure we see everything up until now. + changeStream = getChangeStream( + {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id}); + assert.docEq(cst.getOneChange(changeStream), firstTxnChange); + assert.docEq(cst.getOneChange(changeStream), secondTxnChange); + assert.docEq(cst.getOneChange(changeStream), change); + } + + if (watchMode >= WatchMode.kCluster) { + // We should see the insert on the other db. + change = cst.getOneChange(changeStream); + assert.eq(change.fullDocument._id, 222); + assert.eq(change.operationType, "insert", tojson(change)); + + // Resume from the beginning again, be sure we see everything up until now. + changeStream = getChangeStream( + {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id}); + assert.docEq(cst.getOneChange(changeStream), firstTxnChange); + assert.docEq(cst.getOneChange(changeStream), secondTxnChange); + // We should see the document which was inserted on the other _collection_. + const changeFromOtherCollection = cst.getOneChange(changeStream); + assert.eq(changeFromOtherCollection.fullDocument._id, 111); + + // Resume from the document in the other collection. + changeStream = getChangeStream({ + cst: cst, + watchMode: watchMode, + coll: coll, + resumeToken: changeFromOtherCollection._id + }); + + // We should again see the most recent document. + assert.docEq(cst.getOneChange(changeStream), change); + } + + // Try starting another change stream from the latest change, the _last_ change caused by + // the transaction. + let otherCursor = + getChangeStream({cst: cst, watchMode: watchMode, coll: coll, resumeToken: change._id}); + + // Check for the update. + change = cst.getOneChange(changeStream); + assert.eq(change.operationType, "update", tojson(change)); + assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1})); + + // Check for the update on the other stream. + assert.docEq(change, cst.getOneChange(otherCursor)); + + // Now test that we can resume from the _last_ change caused by a transaction. We will + // check that both the initial change stream and the new one find the document that's + // inserted outside of the transaction. + otherCursor = + getChangeStream({cst: cst, watchMode: watchMode, coll: coll, resumeToken: change._id}); + + // Now check that the document inserted after the transaction is found. + change = cst.getOneChange(changeStream); + assert.eq(change.fullDocument._id, 3); + assert.eq(change.operationType, "insert", tojson(change)); + assert.docEq(change, cst.getOneChange(otherCursor)); + + // Drop the collection. This will trigger an "invalidate" event. + assert.commandWorked(db.runCommand({drop: coll.getName()})); + + // The drop should have invalidated the change stream. + cst.assertNextChangesEqual({ + cursor: changeStream, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + cst.assertNextChangesEqual({ + cursor: otherCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + cst.cleanUp(); + } + + // TODO: SERVER-34302 should allow us to simplify this test, so we're not required to + // explicitly run both against a single collection and against the whole DB. + testChangeStreamsWithTransactions(WatchMode.kCollection); + testChangeStreamsWithTransactions(WatchMode.kDb); + testChangeStreamsWithTransactions(WatchMode.kCluster); +}()); diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 922b01d00c2..99729a4d0de 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -122,6 +122,7 @@ void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Doc checkValueType(input["o"], "o", BSONType::Object); Value applyOps = input.getNestedField("o.applyOps"); + checkValueType(applyOps, "applyOps", BSONType::Array); invariant(applyOps.getArrayLength() > 0); @@ -131,7 +132,36 @@ void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Doc Value txnNumber = input["txnNumber"]; checkValueType(txnNumber, "txnNumber", BSONType::NumberLong); - _txnContext.emplace(applyOps, lsid.getDocument(), txnNumber.getLong()); + Value ts = input[repl::OplogEntry::kTimestampFieldName]; + Timestamp clusterTime = ts.getTimestamp(); + + _txnContext.emplace(applyOps, clusterTime, lsid.getDocument(), txnNumber.getLong()); +} + +ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts, + Value uuid, + Value documentKey) { + ResumeTokenData resumeTokenData; + if (_txnContext) { + // We're in the middle of unwinding an 'applyOps'. + + // Use the clusterTime from the higher level applyOps + resumeTokenData.clusterTime = _txnContext->clusterTime; + + // 'pos' points to the _next_ applyOps index, so we must subtract one to get the index of + // the entry being examined right now. + invariant(_txnContext->pos >= 1); + resumeTokenData.applyOpsIndex = _txnContext->pos - 1; + } else { + resumeTokenData.clusterTime = ts.getTimestamp(); + resumeTokenData.applyOpsIndex = 0; + } + + resumeTokenData.documentKey = documentKey; + if (!uuid.missing()) + resumeTokenData.uuid = uuid.getUuid(); + + return resumeTokenData; } Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) { @@ -277,21 +307,11 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document documentKey = Value(); } - // Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will - // not appear in the output. - ResumeTokenData resumeTokenData; - if (_txnContext) { - // We're in the middle of unwinding an 'applyOps'. - - // TODO: SERVER-34314 - // For now we return an empty resumeToken. - } else { - resumeTokenData.clusterTime = ts.getTimestamp(); - resumeTokenData.documentKey = documentKey; - if (!uuid.missing()) - resumeTokenData.uuid = uuid.getUuid(); - } + // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear + // in the output. + ResumeTokenData resumeTokenData = getResumeToken(ts, uuid, documentKey); + // Add some additional fields only relevant to transactions. if (_txnContext) { doc.addField(DocumentSourceChangeStream::kTxnNumberField, Value(static_cast<long long>(_txnContext->txnNumber))); @@ -306,7 +326,6 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document // If we're in a sharded environment, we'll need to merge the results by their sort key, so add // that as metadata. if (pExpCtx->needsMerge) { - // TODO SERVER-34314: Sort key may have to be _id.data in FCV 4.0. doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey)); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 5ae0ea3fc6f..200c489cf07 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -88,14 +88,29 @@ private: // Our current place in the 'opArray'. size_t pos; + // The clusterTime of the applyOps. + Timestamp clusterTime; + // Fields that were taken from the 'applyOps' oplog entry. Document lsid; TxnNumber txnNumber; - TransactionContext(const Value& applyOpsVal, const Document& lsidDoc, TxnNumber n) - : opArray(applyOpsVal), arr(opArray.getArray()), pos(0), lsid(lsidDoc), txnNumber(n) {} + TransactionContext(const Value& applyOpsVal, + Timestamp ts, + const Document& lsidDoc, + TxnNumber n) + : opArray(applyOpsVal), + arr(opArray.getArray()), + pos(0), + clusterTime(ts), + lsid(lsidDoc), + txnNumber(n) {} }; + /** + * Helper used for determining what resume token to return. + */ + ResumeTokenData getResumeToken(Value ts, Value uuid, Value documentKey); void initializeTransactionContext(const Document& input); /** diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 6b5e386051b..45a4e735f1e 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -41,17 +41,17 @@ using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus; // the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token, // and ResumeToken::kCannotResume if it is more recent than the client's resume token (indicating // that we will never see the token). If the resume token's documentKey contains only the _id field -// while the pipeline documentKey contains additional fields, then the collection has become sharded -// since the resume token was generated. In that case, we relax the requirements such that only the -// timestamp, UUID and documentKey._id need match. This remains correct, since the only -// circumstances under which the resume token omits the shard key is if it was generated either (1) -// before the collection was sharded, (2) after the collection was sharded but before the primary -// shard became aware of that fact, implying that it was before the first chunk moved off the shard, -// or (3) by a malicious client who has constructed their own resume token. In the first two cases, -// we can be guaranteed that the _id is unique and the stream can therefore be resumed seamlessly; -// in the third case, the worst that can happen is that some entries are missed or duplicated. Note -// that the simple collation is used to compare the resume tokens, and that we purposefully avoid -// the user's requested collation if present. +// while the pipeline documentKey contains additional fields, then the collection has become +// sharded since the resume token was generated. In that case, we relax the requirements such that +// only the timestamp, version, applyOpsIndex, UUID and documentKey._id need match. This remains +// correct, since the only circumstances under which the resume token omits the shard key is if it +// was generated either (1) before the collection was sharded, (2) after the collection was sharded +// but before the primary shard became aware of that fact, implying that it was before the first +// chunk moved off the shard, or (3) by a malicious client who has constructed their own resume +// token. In the first two cases, we can be guaranteed that the _id is unique and the stream can +// therefore be resumed seamlessly; in the third case, the worst that can happen is that some +// entries are missed or duplicated. Note that the simple collation is used to compare the resume +// tokens, and that we purposefully avoid the user's requested collation if present. ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionContext>& expCtx, const Document& documentFromResumedStream, const ResumeToken& tokenFromClient) { @@ -68,6 +68,16 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte if (tokenDataFromResumedStream.clusterTime != tokenDataFromClient.clusterTime) { return ResumeStatus::kCannotResume; } + + if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) { + return ResumeStatus::kCheckNextDoc; + } else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) { + // This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in + // the applyOps was irrelevant (meaning it was an operation on a collection or DB not being + // watched). This indicates a corrupt resume token. + uasserted(50792, "Invalid resumeToken: applyOpsIndex was skipped"); + } + // It is acceptable for the stream UUID to differ from the client's, if this is a whole-database // or cluster-wide stream and we are comparing operations from different shards at the same // clusterTime. If the stream UUID sorts after the client's, however, then the stream is not diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index c216c81b52d..73f52287122 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -61,21 +61,30 @@ public: protected: /** - * Pushes a document with a resume token corresponding to the given timestamp, docKey, and - * namespace into the mock queue. + * Pushes a document with a resume token corresponding to the given timestamp, version, + * applyOpsIndex, docKey, and namespace into the mock queue. */ - void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) { + void addDocument( + Timestamp ts, int version, std::size_t applyOpsIndex, Document docKey, UUID uuid) { _mock->queue.push_back( Document{{"_id", - ResumeToken(ResumeTokenData(ts, Value(docKey), uuid)) + ResumeToken(ResumeTokenData(ts, version, applyOpsIndex, Value(docKey), uuid)) .toDocument(ResumeToken::SerializationFormat::kHexString)}}); } + + /** + * Pushes a document with a resume token corresponding to the given timestamp, version, + * applyOpsIndex, docKey, and namespace into the mock queue. + */ + void addDocument(Timestamp ts, Document docKey, UUID uuid = testUuid()) { + addDocument(ts, 0, 0, docKey, uuid); + } /** * Pushes a document with a resume token corresponding to the given timestamp, _id string, and * namespace into the mock queue. */ void addDocument(Timestamp ts, std::string id, UUID uuid = testUuid()) { - addDocument(ts, Document{{"_id", id}}, uuid); + addDocument(ts, 0, 0, Document{{"_id", id}}, uuid); } void addPause() { @@ -87,19 +96,34 @@ protected: * namespace. */ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( - Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) { - ResumeToken token(ResumeTokenData(ts, docKey ? Value(*docKey) : Value(), uuid)); + Timestamp ts, + int version, + std::size_t applyOpsIndex, + boost::optional<Document> docKey, + UUID uuid) { + ResumeToken token( + ResumeTokenData(ts, version, applyOpsIndex, docKey ? Value(*docKey) : Value(), uuid)); auto checkResumeToken = DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), token); checkResumeToken->setSource(_mock.get()); return checkResumeToken; } + + /** + * Convenience method to create the class under test with a given timestamp, docKey, and + * namespace. + */ + intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( + Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) { + return createCheckResumeToken(ts, 0, 0, docKey, uuid); + } + /** * Convenience method to create the class under test with a given timestamp, _id string, and * namespace. */ intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createCheckResumeToken( Timestamp ts, StringData id, UUID uuid = testUuid()) { - return createCheckResumeToken(ts, Document{{"_id", id}}, uuid); + return createCheckResumeToken(ts, 0, 0, Document{{"_id", id}}, uuid); } /** @@ -380,6 +404,40 @@ TEST_F(CheckResumeTokenTest, ASSERT_THROWS_CODE(checkResumeToken->getNext(), AssertionException, 40585); } +TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierApplyOpsIndex) { + Timestamp resumeTimestamp(100, 1); + + // Create an ordered array of 3 UUIDs. + std::vector<UUID> uuids = {UUID::gen(), UUID::gen(), UUID::gen()}; + + std::sort(uuids.begin(), uuids.end()); + + auto checkResumeToken = + createCheckResumeToken(resumeTimestamp, 0, 2, Document{{"_id"_sd, 1}}, uuids[1]); + + // Add two documents which have the same clusterTime and version but a lower applyOps index. One + // of the documents has a lower uuid than the resume token, the other has a higher uuid; this + // demonstrates that the applyOps index is the discriminating factor. + addDocument(resumeTimestamp, 0, 0, {{"_id"_sd, 0}}, uuids[0]); + addDocument(resumeTimestamp, 0, 1, {{"_id"_sd, 2}}, uuids[2]); + + // Add a third document that matches the resume token. + addDocument(resumeTimestamp, 0, 2, {{"_id"_sd, 1}}, uuids[1]); + + // Add a fourth document with the same timestamp and version whose applyOps sorts after the + // resume token. + auto expectedDocKey = Document{{"_id"_sd, 3}}; + addDocument(resumeTimestamp, 0, 3, expectedDocKey, uuids[1]); + + // We should skip the first two docs, swallow the resume token, and return the fourth doc. + const auto firstDocAfterResume = checkResumeToken->getNext(); + const auto tokenFromFirstDocAfterResume = + ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData(); + + ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, resumeTimestamp); + ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey); +} + /** * We should _error_ on the no-document case, because that means the resume token was not found. */ diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 15d236fd409..d5ebad0e611 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -70,7 +70,7 @@ public: tokenData.clusterTime = ts; return ResumeToken(tokenData).toDocument(ResumeToken::SerializationFormat::kHexString); } - return ResumeToken(ResumeTokenData(ts, Value(Document{{"_id", id}}), testUuid())) + return ResumeToken(ResumeTokenData(ts, 0, 0, Value(Document{{"_id", id}}), testUuid())) .toDocument(ResumeToken::SerializationFormat::kHexString); } }; diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index fd1e8cd3ce7..0fb316bcf4d 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -104,11 +104,14 @@ ResumeToken::ResumeToken(const Document& resumeDoc) { _typeBits.getBinData().type == BinDataGeneral)); } -// We encode the resume token as a KeyString with the sequence: clusterTime, uuid, documentKey. +// We encode the resume token as a KeyString with the sequence: +// clusterTime, version, applyOpsIndex, uuid, documentKey // Only the clusterTime is required. ResumeToken::ResumeToken(const ResumeTokenData& data) { BSONObjBuilder builder; builder.append("", data.clusterTime); + builder.append("", data.version); + builder.appendNumber("", data.applyOpsIndex); uassert(50788, "Unexpected resume token with a documentKey but no UUID", data.uuid || data.documentKey.missing()); @@ -189,6 +192,30 @@ ResumeTokenData ResumeToken::getData() const { break; } case BSONType::String: { + // Next comes the resume token version. + auto versionElt = i.next(); + uassert(50790, + "Resume Token does not contain applyOpsIndex", + versionElt.type() == BSONType::NumberInt); + result.version = versionElt.numberInt(); + uassert(50791, "Invalid Resume Token: only supports version 0", result.version == 0); + + // The new format has applyOpsIndex next. + auto applyOpsElt = i.next(); + uassert(50793, + "Resume Token does not contain applyOpsIndex", + applyOpsElt.type() == BSONType::NumberInt); + const int applyOpsInd = applyOpsElt.numberInt(); + uassert(50794, + "Invalid Resume Token: applyOpsIndex should be non-negative", + applyOpsInd >= 0); + result.applyOpsIndex = applyOpsInd; + + // The the UUID and documentKey are not required. + if (!i.more()) { + return result; + } + // In the new format, the UUID comes first, then the documentKey. result.uuid = uassertStatusOK(UUID::parse(i.next())); if (i.more()) { diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 7d1bb3c432f..25b30788fd3 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -42,9 +42,15 @@ namespace mongo { struct ResumeTokenData { ResumeTokenData(){}; ResumeTokenData(Timestamp clusterTimeIn, + int versionIn, + size_t applyOpsIndexIn, Value documentKeyIn, const boost::optional<UUID>& uuidIn) - : clusterTime(clusterTimeIn), documentKey(std::move(documentKeyIn)), uuid(uuidIn){}; + : clusterTime(clusterTimeIn), + version(versionIn), + applyOpsIndex(applyOpsIndexIn), + documentKey(std::move(documentKeyIn)), + uuid(uuidIn){}; bool operator==(const ResumeTokenData& other) const; bool operator!=(const ResumeTokenData& other) const { @@ -52,6 +58,8 @@ struct ResumeTokenData { }; Timestamp clusterTime; + int version = 0; + size_t applyOpsIndex = 0; Value documentKey; boost::optional<UUID> uuid; }; @@ -69,7 +77,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData); * 2. Using a hex-encoded string in a similar format: * { * _data: String, A hex encoding of the binary generated by keystring encoding the clusterTime, - * UUID, then documentKey in that order. + * version, applyOpsIndex, UUID, then documentKey in that order. * _typeBits: BinData - The keystring type bits used for deserialization. * } * The _data field data is encoded such that string comparisons provide the correct ordering of diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp index 2fe8c0ac33c..6f7eaf4bbb4 100644 --- a/src/mongo/db/pipeline/resume_token_test.cpp +++ b/src/mongo/db/pipeline/resume_token_test.cpp @@ -46,7 +46,7 @@ TEST(ResumeToken, EncodesFullTokenFromData) { UUID testUuid = UUID::gen(); Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; - ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid); + ResumeTokenData resumeTokenDataIn(ts, 0, 0, Value(documentKey), testUuid); ResumeToken token(resumeTokenDataIn); ResumeTokenData tokenData = token.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); @@ -67,7 +67,7 @@ TEST(ResumeToken, ShouldRoundTripThroughHexStringEncoding) { UUID testUuid = UUID::gen(); Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; - ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid); + ResumeTokenData resumeTokenDataIn(ts, 0, 0, Value(documentKey), testUuid); // Test serialization/parsing through Document. auto rtToken = @@ -87,7 +87,7 @@ TEST(ResumeToken, ShouldRoundTripThroughBinDataEncoding) { UUID testUuid = UUID::gen(); Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; - ResumeTokenData resumeTokenDataIn(ts, Value(documentKey), testUuid); + ResumeTokenData resumeTokenDataIn(ts, 0, 0, Value(documentKey), testUuid); // Test serialization/parsing through Document. auto rtToken = @@ -144,8 +144,9 @@ TEST(ResumeToken, TestMissingTypebitsOptimization) { Timestamp ts(1000, 1); UUID testUuid = UUID::gen(); - ResumeTokenData hasTypeBitsData(ts, Value(Document{{"_id", 1.0}}), testUuid); - ResumeTokenData noTypeBitsData(ResumeTokenData(ts, Value(Document{{"_id", 1}}), testUuid)); + ResumeTokenData hasTypeBitsData(ts, 0, 0, Value(Document{{"_id", 1.0}}), testUuid); + ResumeTokenData noTypeBitsData( + ResumeTokenData(ts, 0, 0, Value(Document{{"_id", 1}}), testUuid)); ResumeToken hasTypeBitsToken(hasTypeBitsData); ResumeToken noTypeBitsToken(noTypeBitsData); ASSERT_EQ(noTypeBitsToken, hasTypeBitsToken); @@ -224,9 +225,51 @@ TEST(ResumeToken, CorruptTokens) { ASSERT_THROWS(badTypeBitsToken.getData(), AssertionException); } +TEST(ResumeToken, WrongVersionToken) { + Timestamp ts(1001, 3); + + ResumeTokenData resumeTokenDataIn; + resumeTokenDataIn.clusterTime = ts; + resumeTokenDataIn.version = 0; + + // This one with version 0 should succeed. + auto rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson()); + ResumeTokenData tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); + + // With version 1 it should fail. + resumeTokenDataIn.version = 1; + rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson()); + + ASSERT_THROWS(rtToken.getData(), AssertionException); +} + +TEST(ResumeToken, InvalidApplyOpsIndex) { + Timestamp ts(1001, 3); + + ResumeTokenData resumeTokenDataIn; + resumeTokenDataIn.clusterTime = ts; + resumeTokenDataIn.applyOpsIndex = 1234; + + // Should round trip with a non-negative applyOpsIndex. + auto rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson()); + ResumeTokenData tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); + + // Should fail with a negative applyOpsIndex. + resumeTokenDataIn.applyOpsIndex = std::numeric_limits<size_t>::max(); + rtToken = + ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument(Format::kHexString).toBson()); + + ASSERT_THROWS(rtToken.getData(), AssertionException); +} + TEST(ResumeToken, StringEncodingSortsCorrectly) { // Make sure that the string encoding of the resume tokens will compare in the correct order, - // namely timestamp, uuid, then documentKey. + // namely timestamp, version, applyOpsIndex, uuid, then documentKey. Timestamp ts2_2(2, 2); Timestamp ts10_4(10, 4); Timestamp ts10_5(10, 5); @@ -247,44 +290,67 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) { }; // Test using only Timestamps. - assertLt({ts2_2, Value(), boost::none}, {ts10_4, Value(), boost::none}); - assertLt({ts2_2, Value(), boost::none}, {ts10_5, Value(), boost::none}); - assertLt({ts2_2, Value(), boost::none}, {ts11_3, Value(), boost::none}); - assertLt({ts10_4, Value(), boost::none}, {ts10_5, Value(), boost::none}); - assertLt({ts10_4, Value(), boost::none}, {ts11_3, Value(), boost::none}); - assertLt({ts10_5, Value(), boost::none}, {ts11_3, Value(), boost::none}); - - // Test that the Timestamp is more important than the UUID and documentKey. - assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid}, - {ts10_5, Value(Document{{"_id", 0}}), lower_uuid}); - assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid}, - {ts10_5, Value(Document{{"_id", 0}}), lower_uuid}); - assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid}, - {ts10_5, Value(Document{{"_id", 0}}), lower_uuid}); - assertLt({ts10_4, Value(Document{{"_id", 0}}), higher_uuid}, - {ts10_5, Value(Document{{"_id", 0}}), lower_uuid}); - assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid}, - {ts10_5, Value(Document{{"_id", 0}}), higher_uuid}); - - // Test that when the Timestamp is the same, the UUID breaks the tie. - assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid}, - {ts2_2, Value(Document{{"_id", 0}}), higher_uuid}); - assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid}, - {ts10_4, Value(Document{{"_id", 0}}), higher_uuid}); - assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid}, - {ts10_4, Value(Document{{"_id", 0}}), higher_uuid}); - assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid}, - {ts10_4, Value(Document{{"_id", 2}}), higher_uuid}); - - // Test that when the Timestamp and the UUID are the same, the documentKey breaks the tie. - assertLt({ts2_2, Value(Document{{"_id", 0}}), lower_uuid}, - {ts2_2, Value(Document{{"_id", 1}}), lower_uuid}); - assertLt({ts10_4, Value(Document{{"_id", 0}}), lower_uuid}, - {ts10_4, Value(Document{{"_id", 1}}), lower_uuid}); - assertLt({ts10_4, Value(Document{{"_id", 1}}), lower_uuid}, - {ts10_4, Value(Document{{"_id", "string"_sd}}), lower_uuid}); - assertLt({ts10_4, Value(Document{{"_id", BSONNULL}}), lower_uuid}, - {ts10_4, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts2_2, 0, 0, Value(), boost::none}, {ts10_4, 0, 0, Value(), boost::none}); + assertLt({ts2_2, 0, 0, Value(), boost::none}, {ts10_5, 0, 0, Value(), boost::none}); + assertLt({ts2_2, 0, 0, Value(), boost::none}, {ts11_3, 0, 0, Value(), boost::none}); + assertLt({ts10_4, 0, 0, Value(), boost::none}, {ts10_5, 0, 0, Value(), boost::none}); + assertLt({ts10_4, 0, 0, Value(), boost::none}, {ts11_3, 0, 0, Value(), boost::none}); + assertLt({ts10_5, 0, 0, Value(), boost::none}, {ts11_3, 0, 0, Value(), boost::none}); + + // Test using Timestamps and version. + assertLt({ts2_2, 0, 0, Value(), boost::none}, {ts2_2, 1, 0, Value(), boost::none}); + assertLt({ts10_4, 5, 0, Value(), boost::none}, {ts10_4, 10, 0, Value(), boost::none}); + + // Test that the Timestamp is more important than the version, applyOpsIndex, UUID and + // documentKey. + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_5, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts2_2, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_5, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_5, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), higher_uuid}, + {ts10_5, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_5, 0, 0, Value(Document{{"_id", 0}}), higher_uuid}); + + // Test that when the Timestamp is the same, the version breaks the tie. + assertLt({ts10_4, 1, 50, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_4, 5, 1, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts2_2, 1, 0, Value(Document{{"_id", 0}}), higher_uuid}, + {ts2_2, 2, 0, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts10_4, 1, 0, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_4, 2, 0, Value(Document{{"_id", 0}}), lower_uuid}); + + // Test that when the Timestamp and version are the same, the applyOpsIndex breaks the tie. + assertLt({ts10_4, 1, 6, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_4, 1, 50, Value(Document{{"_id", 0}}), lower_uuid}); + assertLt({ts2_2, 0, 0, Value(Document{{"_id", 0}}), higher_uuid}, + {ts2_2, 0, 4, Value(Document{{"_id", 0}}), lower_uuid}); + + // Test that when the Timestamp, version, and applyOpsIndex are the same, the UUID breaks the + // tie. + assertLt({ts2_2, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}, + {ts2_2, 0, 0, Value(Document{{"_id", 0}}), higher_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_4, 0, 0, Value(Document{{"_id", 0}}), higher_uuid}); + assertLt({ts10_4, 1, 2, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_4, 1, 2, Value(Document{{"_id", 0}}), higher_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_4, 0, 0, Value(Document{{"_id", 0}}), higher_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_4, 0, 0, Value(Document{{"_id", 2}}), higher_uuid}); + + // Test that when the Timestamp, version, applyOpsIndex, and UUID are the same, the documentKey + // breaks the tie. + assertLt({ts2_2, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}, + {ts2_2, 0, 0, Value(Document{{"_id", 1}}), lower_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}, + {ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", 1}}), lower_uuid}, + {ts10_4, 0, 0, Value(Document{{"_id", "string"_sd}}), lower_uuid}); + assertLt({ts10_4, 0, 0, Value(Document{{"_id", BSONNULL}}), lower_uuid}, + {ts10_4, 0, 0, Value(Document{{"_id", 0}}), lower_uuid}); } } // namspace |