diff options
Diffstat (limited to 'jstests')
8 files changed, 767 insertions, 785 deletions
diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/change_stream_apply_ops.js index 45f4e78fafa..28948ed84b5 100644 --- a/jstests/change_streams/change_stream_apply_ops.js +++ b/jstests/change_streams/change_stream_apply_ops.js @@ -4,160 +4,147 @@ (function() { "use strict"; - load("jstests/libs/change_stream_util.js"); + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - var WatchMode = { - kCollection: 1, - kDb: 2, - kCluster: 3, - }; - - function testChangeStreamsWithTransactions(watchMode) { - let dbToStartTestOn = db; - if (watchMode == WatchMode.kCluster) { - dbToStartTestOn = db.getSiblingDB("admin"); - } - - const otherCollName = "change_stream_apply_ops_2"; - const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops"); - assertDropAndRecreateCollection(db, otherCollName); - - const otherDbName = "change_stream_apply_ops_db"; - const otherDbCollName = "someColl"; - assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName); - - // Insert a document that gets deleted as part of the transaction. - const kDeletedDocumentId = 0; - coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"}); - - let cst = new ChangeStreamTest(dbToStartTestOn); - - let changeStream = null; - if (watchMode == WatchMode.kCluster) { - changeStream = cst.startWatchingAllChangesForCluster(); - } else { - const collArg = (watchMode == WatchMode.kCollection ? coll : 1); - changeStream = - cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collArg}); - } - - const sessionOptions = {causalConsistency: false}; - const session = db.getMongo().startSession(sessionOptions); - const sessionDb = session.getDatabase(db.getName()); - const sessionColl = sessionDb[coll.getName()]; - - session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}}); - assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); - assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); - - // One insert on a collection that we're not watching. This should be skipped by the - // single-collection changestream. - assert.commandWorked( - sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"})); - - // This should be skipped by the single-collection and single-db changestreams. - assert.commandWorked(session.getDatabase(otherDbName)[otherDbCollName].insert( - {_id: 222, a: "Doc on other DB"})); - - assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); - - assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId})); - - session.commitTransaction(); - - // Do applyOps on the collection that we care about. This is an "external" applyOps, though - // (not run as part of a transaction) so its entries should be skipped in the change - // stream. This checks that applyOps that don't have an 'lsid' and 'txnNumber' field do not - // get unwound. - assert.commandWorked(db.runCommand({ - applyOps: [ - {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}}, - ] - })); - - // Check for the first insert. - let change = cst.getOneChange(changeStream); - assert.eq(change.fullDocument._id, 1); - assert.eq(change.operationType, "insert", tojson(change)); - const firstChangeClusterTime = change.clusterTime; - assert(firstChangeClusterTime instanceof Timestamp, tojson(change)); - const firstChangeTxnNumber = change.txnNumber; - const firstChangeLsid = change.lsid; - assert.eq(typeof firstChangeLsid, "object"); - assert.eq(change.ns.coll, coll.getName()); - assert.eq(change.ns.db, db.getName()); - - // Check for the second insert. - change = cst.getOneChange(changeStream); - assert.eq(change.fullDocument._id, 2); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(firstChangeClusterTime, change.clusterTime); - assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); - assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); - assert.eq(change.ns.coll, coll.getName()); - assert.eq(change.ns.db, db.getName()); - - if (watchMode >= WatchMode.kDb) { - // We should see the insert on the other collection. - change = cst.getOneChange(changeStream); - assert.eq(change.fullDocument._id, 111); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(firstChangeClusterTime, change.clusterTime); - assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); - assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); - assert.eq(change.ns.coll, otherCollName); - assert.eq(change.ns.db, db.getName()); - } - - if (watchMode >= WatchMode.kCluster) { - // We should see the insert on the other db. - change = cst.getOneChange(changeStream); - assert.eq(change.fullDocument._id, 222); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(firstChangeClusterTime, change.clusterTime); - assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); - assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); - assert.eq(change.ns.coll, otherDbCollName); - assert.eq(change.ns.db, otherDbName); - } - - // Check for the update. - change = cst.getOneChange(changeStream); - assert.eq(change.operationType, "update", tojson(change)); - assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1})); - assert.eq(firstChangeClusterTime, change.clusterTime); - assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); - assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); - assert.eq(change.ns.coll, coll.getName()); - assert.eq(change.ns.db, db.getName()); - - // Check for the delete. - change = cst.getOneChange(changeStream); - assert.eq(change.documentKey._id, kDeletedDocumentId); - assert.eq(change.operationType, "delete", tojson(change)); - assert.eq(firstChangeClusterTime, change.clusterTime); - assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); - assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); - assert.eq(change.ns.coll, coll.getName()); - assert.eq(change.ns.db, db.getName()); - - // Drop the collection. This will trigger an "invalidate" event. - assert.commandWorked(db.runCommand({drop: coll.getName()})); - - // The drop should have invalidated the change stream. - cst.assertNextChangesEqual({ - cursor: changeStream, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - cst.cleanUp(); - } - - // TODO: SERVER-34302 should allow us to simplify this test, so we're not required to - // explicitly run both against a single collection and against the whole DB. - testChangeStreamsWithTransactions(WatchMode.kCollection); - testChangeStreamsWithTransactions(WatchMode.kDb); - testChangeStreamsWithTransactions(WatchMode.kCluster); + const otherCollName = "change_stream_apply_ops_2"; + const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops"); + assertDropAndRecreateCollection(db, otherCollName); + + const otherDbName = "change_stream_apply_ops_db"; + const otherDbCollName = "someColl"; + assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName); + + // Insert a document that gets deleted as part of the transaction. + const kDeletedDocumentId = 0; + coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"}); + + let cst = new ChangeStreamTest(db); + let changeStream = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], collection: coll}); + + const sessionOptions = {causalConsistency: false}; + const session = db.getMongo().startSession(sessionOptions); + const sessionDb = session.getDatabase(db.getName()); + const sessionColl = sessionDb[coll.getName()]; + + session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}}); + assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); + assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); + + // One insert on a collection that we're not watching. This should be skipped by the + // single-collection changestream. + assert.commandWorked(sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"})); + + // One insert on a collection in a different database. This should be skipped by the single + // collection and single-db changestreams. + assert.commandWorked( + session.getDatabase(otherDbName)[otherDbCollName].insert({_id: 222, a: "Doc on other DB"})); + + assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); + + assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId})); + + session.commitTransaction(); + + // Do applyOps on the collection that we care about. This is an "external" applyOps, though + // (not run as part of a transaction) so its entries should be skipped in the change + // stream. This checks that applyOps that don't have an 'lsid' and 'txnNumber' field do not + // get unwound. + assert.commandWorked(db.runCommand({ + applyOps: [ + {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}}, + ] + })); + + // Drop the collection. This will trigger an "invalidate" event at the end of the stream. + assert.commandWorked(db.runCommand({drop: coll.getName()})); + + // Define the set of changes expected for the single-collection case per the operations above. + const expectedChanges = [ + { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 0}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }, + { + documentKey: {_id: 2}, + fullDocument: {_id: 2, a: 0}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }, + { + documentKey: {_id: 1}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "update", + updateDescription: {removedFields: [], updatedFields: {a: 1}}, + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }, + { + documentKey: {_id: kDeletedDocumentId}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "delete", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }, + {operationType: "invalidate"}, + ]; + + // Verify that the stream returns the expected sequence of changes. + const changes = cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true}); + + // Obtain the clusterTime from the first change. + const startTime = changes[0].clusterTime; + + // Add an entry for the insert on db.otherColl into expectedChanges. + expectedChanges.splice(2, 0, { + documentKey: {_id: 111}, + fullDocument: {_id: 111, a: "Doc on other collection"}, + ns: {db: db.getName(), coll: otherCollName}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }); + + // Verify that a whole-db stream returns the expected sequence of changes, including the insert + // on the other collection but NOT the changes on the other DB or the manual applyOps. + changeStream = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {startAtClusterTime: {ts: startTime}}}, {$project: {"lsid.uid": 0}}], + collection: 1 + }); + cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true}); + + // Add an entry for the insert on otherDb.otherDbColl into expectedChanges. + expectedChanges.splice(3, 0, { + documentKey: {_id: 222}, + fullDocument: {_id: 222, a: "Doc on other DB"}, + ns: {db: otherDbName, coll: otherDbCollName}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }); + + // Verify that a whole-cluster stream returns the expected sequence of changes, including the + // inserts on the other collection and the other database, but NOT the manual applyOps. + cst = new ChangeStreamTest(db.getSiblingDB("admin")); + changeStream = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {startAtClusterTime: {ts: startTime}, allChangesForCluster: true}}, + {$project: {"lsid.uid": 0}} + ], + collection: 1 + }); + cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true}); + + cst.cleanUp(); }()); diff --git a/jstests/change_streams/change_stream_apply_ops_resumability.js b/jstests/change_streams/change_stream_apply_ops_resumability.js index 8a9c8d55a62..8d7ee90b8e6 100644 --- a/jstests/change_streams/change_stream_apply_ops_resumability.js +++ b/jstests/change_streams/change_stream_apply_ops_resumability.js @@ -4,195 +4,169 @@ (function() { "use strict"; - load("jstests/libs/change_stream_util.js"); + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - var WatchMode = { - kCollection: 1, - kDb: 2, - kCluster: 3, - }; - - function getChangeStream({cst, watchMode, coll, resumeToken}) { - const changeStreamDoc = {}; - if (resumeToken) { - changeStreamDoc.resumeAfter = resumeToken; - } - - if (watchMode == WatchMode.kCluster) { - changeStreamDoc.allChangesForCluster = true; - } - const collArg = (watchMode == WatchMode.kCollection ? coll : 1); - - return cst.startWatchingChanges({ - pipeline: [{$changeStream: changeStreamDoc}], - collection: collArg, - // Use a batch size of 0 to prevent any notifications from being returned in the first - // batch. These would be ignored by ChangeStreamTest.getOneChange(). - aggregateOptions: {cursor: {batchSize: 0}}, - }); - } - - function testChangeStreamsWithTransactions(watchMode) { - let dbToStartTestOn = db; - if (watchMode == WatchMode.kCluster) { - dbToStartTestOn = db.getSiblingDB("admin"); - } - - const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops"); - const otherCollName = "change_stream_apply_ops_2"; - assertDropAndRecreateCollection(db, otherCollName); - - const otherDbName = "change_stream_apply_ops_db"; - const otherDbCollName = "someColl"; - assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName); - - const cst = new ChangeStreamTest(dbToStartTestOn); - - let changeStream = getChangeStream({cst: cst, watchMode: watchMode, coll: coll}); - - // Do an insert outside of a transaction. - assert.commandWorked(coll.insert({_id: 0, a: 123})); - const nonTxnChange = cst.getOneChange(changeStream); - assert.eq(nonTxnChange.operationType, "insert"); - assert.eq(nonTxnChange.documentKey, {_id: 0}); - - const sessionOptions = {causalConsistency: false}; - const session = db.getMongo().startSession(sessionOptions); - const sessionDb = session.getDatabase(db.getName()); - const sessionColl = sessionDb[coll.getName()]; - - session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}}); - assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); - assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); - - // One insert on a collection that we're not watching. This should be skipped by the - // single-collection change stream. - assert.commandWorked( - sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"})); - - // This should be skipped by the single-collection and single-db changestreams. - assert.commandWorked(session.getDatabase(otherDbName)[otherDbCollName].insert( - {_id: 222, a: "Doc on other DB"})); - - assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); - - session.commitTransaction(); - - // Now insert another document, not part of a transaction. - assert.commandWorked(coll.insert({_id: 3, a: 123})); - - // Check for the first insert. - const firstTxnChange = cst.getOneChange(changeStream); - assert.eq(firstTxnChange.fullDocument._id, 1); - assert.eq(firstTxnChange.operationType, "insert", tojson(firstTxnChange)); - - // Check for the second insert. - const secondTxnChange = cst.getOneChange(changeStream); - assert.eq(secondTxnChange.fullDocument._id, 2); - assert.eq(secondTxnChange.operationType, "insert", tojson(secondTxnChange)); - - // Resume after the first non-transaction change. Be sure we see the documents from the - // transaction again. - changeStream = getChangeStream( - {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id}); - assert.docEq(cst.getOneChange(changeStream), firstTxnChange); - assert.docEq(cst.getOneChange(changeStream), secondTxnChange); - - // Resume after the first transaction change. Be sure we see the second change again. - changeStream = getChangeStream( - {cst: cst, watchMode: watchMode, coll: coll, resumeToken: firstTxnChange._id}); - assert.docEq(cst.getOneChange(changeStream), secondTxnChange); - - let change = secondTxnChange; - if (watchMode >= WatchMode.kDb) { - // We should see the insert on the other collection. - change = cst.getOneChange(changeStream); - assert.eq(change.fullDocument._id, 111); - assert.eq(change.operationType, "insert", tojson(change)); - - // Resume from the beginning again, be sure we see everything up until now. - changeStream = getChangeStream( - {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id}); - assert.docEq(cst.getOneChange(changeStream), firstTxnChange); - assert.docEq(cst.getOneChange(changeStream), secondTxnChange); - assert.docEq(cst.getOneChange(changeStream), change); - } - - if (watchMode >= WatchMode.kCluster) { - // We should see the insert on the other db. - change = cst.getOneChange(changeStream); - assert.eq(change.fullDocument._id, 222); - assert.eq(change.operationType, "insert", tojson(change)); - - // Resume from the beginning again, be sure we see everything up until now. - changeStream = getChangeStream( - {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id}); - assert.docEq(cst.getOneChange(changeStream), firstTxnChange); - assert.docEq(cst.getOneChange(changeStream), secondTxnChange); - // We should see the document which was inserted on the other _collection_. - const changeFromOtherCollection = cst.getOneChange(changeStream); - assert.eq(changeFromOtherCollection.fullDocument._id, 111); - - // Resume from the document in the other collection. - changeStream = getChangeStream({ - cst: cst, - watchMode: watchMode, - coll: coll, - resumeToken: changeFromOtherCollection._id - }); - - // We should again see the most recent document. - assert.docEq(cst.getOneChange(changeStream), change); - } - - // Try starting another change stream from the latest change, the _last_ change caused by - // the transaction. - let otherCursor = - getChangeStream({cst: cst, watchMode: watchMode, coll: coll, resumeToken: change._id}); - - // Check for the update. - change = cst.getOneChange(changeStream); - assert.eq(change.operationType, "update", tojson(change)); - assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1})); - - // Check for the update on the other stream. - assert.docEq(change, cst.getOneChange(otherCursor)); - - // Now test that we can resume from the _last_ change caused by a transaction. We will - // check that both the initial change stream and the new one find the document that's - // inserted outside of the transaction. - otherCursor = - getChangeStream({cst: cst, watchMode: watchMode, coll: coll, resumeToken: change._id}); - - // Now check that the document inserted after the transaction is found. - change = cst.getOneChange(changeStream); - assert.eq(change.fullDocument._id, 3); - assert.eq(change.operationType, "insert", tojson(change)); - assert.docEq(change, cst.getOneChange(otherCursor)); - - // Drop the collection. This will trigger an "invalidate" event. - assert.commandWorked(db.runCommand({drop: coll.getName()})); - - // The drop should have invalidated the change stream. - cst.assertNextChangesEqual({ - cursor: changeStream, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - cst.assertNextChangesEqual({ - cursor: otherCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - cst.cleanUp(); - } - - // TODO: SERVER-34302 should allow us to simplify this test, so we're not required to - // explicitly run both against a single collection and against the whole DB. - testChangeStreamsWithTransactions(WatchMode.kCollection); - testChangeStreamsWithTransactions(WatchMode.kDb); - testChangeStreamsWithTransactions(WatchMode.kCluster); + const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops"); + const otherCollName = "change_stream_apply_ops_2"; + assertDropAndRecreateCollection(db, otherCollName); + + const otherDbName = "change_stream_apply_ops_db"; + const otherDbCollName = "someColl"; + assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName); + + let cst = new ChangeStreamTest(db); + let changeStream = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], collection: coll}); + + // Do an insert outside of a transaction. + assert.commandWorked(coll.insert({_id: 0, a: 123})); + + // Open a session, and perform two writes within a transaction. + const sessionOptions = {causalConsistency: false}; + const session = db.getMongo().startSession(sessionOptions); + const sessionDb = session.getDatabase(db.getName()); + const sessionColl = sessionDb[coll.getName()]; + + session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}}); + assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); + assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); + + // One insert on a collection that we're not watching. This should be skipped by the + // single-collection change stream. + assert.commandWorked(sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"})); + + // One insert on a collection in a different database. This should be skipped by the single + // collection and single-db changestreams. + assert.commandWorked( + session.getDatabase(otherDbName)[otherDbCollName].insert({_id: 222, a: "Doc on other DB"})); + + assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); + + session.commitTransaction(); + + // Now insert another document, not part of a transaction. + assert.commandWorked(coll.insert({_id: 3, a: 123})); + + // Define the set of changes expected for the single-collection case per the operations above. + const expectedChanges = [ + { + documentKey: {_id: 0}, + fullDocument: {_id: 0, a: 123}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 0}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }, + { + documentKey: {_id: 2}, + fullDocument: {_id: 2, a: 0}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }, + { + documentKey: {_id: 1}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "update", + updateDescription: {removedFields: [], updatedFields: {a: 1}}, + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }, + { + documentKey: {_id: 3}, + fullDocument: {_id: 3, a: 123}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }, + ]; + + // Verify that the stream returns the expected sequence of changes. + const changes = + cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); + + // Record the first (non-transaction) change and the first in-transaction change. + const nonTxnChange = changes[0], firstTxnChange = changes[1], secondTxnChange = changes[2]; + + // Resume after the first non-transaction change. Be sure we see the documents from the + // transaction again. + changeStream = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: nonTxnChange._id}}, {$project: {"lsid.uid": 0}}], + collection: coll + }); + cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(1)}); + + // Resume after the first transaction change. Be sure we see the second change again. + changeStream = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: firstTxnChange._id}}, {$project: {"lsid.uid": 0}}], + collection: coll + }); + cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(2)}); + + // Try starting another change stream from the _last_ change caused by the transaction. Verify + // that we can see the insert performed after the transaction was committed. + let otherCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}], + collection: coll + }); + cst.assertNextChangesEqual({cursor: otherCursor, expectedChanges: expectedChanges.slice(3)}); + + // Drop the collection. This will trigger an "invalidate" event at the end of the stream. + assert.commandWorked(db.runCommand({drop: coll.getName()})); + expectedChanges.push({operationType: "invalidate"}); + + // Add an entry for the insert on db.otherColl into expectedChanges. + expectedChanges.splice(3, 0, { + documentKey: {_id: 111}, + fullDocument: {_id: 111, a: "Doc on other collection"}, + ns: {db: db.getName(), coll: otherCollName}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }); + + // Verify that a whole-db stream can be resumed from the middle of the transaction, and that it + // will see all subsequent changes including the insert on the other collection but NOT the + // changes on the other DB. + changeStream = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}], + collection: 1 + }); + cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedChanges.slice(3), expectInvalidate: true}); + + // Add an entry for the insert on otherDb.otherDbColl into expectedChanges. + expectedChanges.splice(4, 0, { + documentKey: {_id: 222}, + fullDocument: {_id: 222, a: "Doc on other DB"}, + ns: {db: otherDbName, coll: otherDbCollName}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: NumberLong(session._txnNumber), + }); + + // Verify that a whole-db stream can be resumed from the middle of the transaction, and that it + // will see all subsequent changes including the insert on the other collection and the changes + // on the other DB. + cst = new ChangeStreamTest(db.getSiblingDB("admin")); + changeStream = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {resumeAfter: secondTxnChange._id, allChangesForCluster: true}}, + {$project: {"lsid.uid": 0}} + ], + collection: 1 + }); + cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedChanges.slice(3), expectInvalidate: true}); + + cst.cleanUp(); }()); diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js index f8ddcce5545..0150e1ba39d 100644 --- a/jstests/change_streams/change_stream_shell_helper.js +++ b/jstests/change_streams/change_stream_shell_helper.js @@ -1,8 +1,11 @@ -// Test change streams related shell helpers and options passed to them. +// Test change streams related shell helpers and options passed to them. Note that, while we only +// call the DBCollection.watch helper in this file, it will be redirected to the DB.watch or +// Mongo.watch equivalents in the whole_db and whole_cluster passthroughs. (function() { "use strict"; load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper"); @@ -40,96 +43,69 @@ } jsTestLog("Testing watch() without options"); - let singleCollCursor = coll.watch(); - let wholeDbCursor = db.watch(); - let wholeClusterCursor = db.getMongo().watch(); + let changeStreamCursor = coll.watch(); - [singleCollCursor, wholeDbCursor, wholeClusterCursor].forEach((cursor) => - assert(!cursor.hasNext())); + assert(!changeStreamCursor.hasNext()); // Write the first document into the collection. We will save the resume token from this change. assert.writeOK(coll.insert({_id: 0, x: 1})); let resumeToken; // Test that each of the change stream cursors picks up the change. - for (let cursor of[singleCollCursor, wholeDbCursor, wholeClusterCursor]) { - print(`Running test on namespace '${cursor._ns}'`); - assert.soon(() => cursor.hasNext()); - let change = cursor.next(); - assert(!cursor.hasNext()); - let expected = { - documentKey: {_id: 0}, - fullDocument: {_id: 0, x: 1}, - ns: {db: "test", coll: coll.getName()}, - operationType: "insert", - }; - assert("_id" in change, "Got unexpected change: " + tojson(change)); - // Remember the _id of the first op to resume the stream. - resumeToken = change._id; - // Remove the fields we cannot predict, then test that the change is as expected. - delete change._id; - delete change.clusterTime; - assert.docEq(change, expected); - } + assert.soon(() => changeStreamCursor.hasNext()); + let change = changeStreamCursor.next(); + assert(!changeStreamCursor.hasNext()); + let expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, x: 1}, + ns: {db: "test", coll: coll.getName()}, + operationType: "insert", + }; + assert("_id" in change, "Got unexpected change: " + tojson(change)); + // Remember the _id of the first op to resume the stream. + resumeToken = change._id; + // Remove the fields we cannot predict, then test that the change is as expected. + delete change._id; + delete change.clusterTime; + assert.docEq(change, expected); jsTestLog("Testing watch() with pipeline"); - singleCollCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]); - wholeDbCursor = db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]); - wholeClusterCursor = db.getMongo().watch([{$project: {_id: 0, docId: "$documentKey._id"}}]); + changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]); // Store the cluster time of the insert as the timestamp to start from. const resumeTime = assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]})) .$clusterTime.clusterTime; - checkNextChange(singleCollCursor, {docId: 1}); - checkNextChange(wholeDbCursor, {docId: 1}); - checkNextChange(wholeClusterCursor, {docId: 1}); + checkNextChange(changeStreamCursor, {docId: 1}); jsTestLog("Testing watch() with pipeline and resumeAfter"); - singleCollCursor = + changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); - wholeDbCursor = - db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); - wholeClusterCursor = db.getMongo().watch([{$project: {_id: 0, docId: "$documentKey._id"}}], - {resumeAfter: resumeToken}); - checkNextChange(singleCollCursor, {docId: 1}); - checkNextChange(wholeDbCursor, {docId: 1}); - checkNextChange(wholeClusterCursor, {docId: 1}); + checkNextChange(changeStreamCursor, {docId: 1}); jsTestLog("Testing watch() with pipeline and startAtClusterTime"); - singleCollCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], - {startAtClusterTime: {ts: resumeTime}}); - wholeDbCursor = db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], - {startAtClusterTime: {ts: resumeTime}}); - wholeClusterCursor = db.getMongo().watch([{$project: {_id: 0, docId: "$documentKey._id"}}], - {startAtClusterTime: {ts: resumeTime}}); - checkNextChange(singleCollCursor, {docId: 1}); - checkNextChange(wholeDbCursor, {docId: 1}); - checkNextChange(wholeClusterCursor, {docId: 1}); + changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], + {startAtClusterTime: {ts: resumeTime}}); + checkNextChange(changeStreamCursor, {docId: 1}); jsTestLog("Testing watch() with updateLookup"); - singleCollCursor = coll.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"}); - wholeDbCursor = db.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"}); - wholeClusterCursor = - db.getMongo().watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"}); + changeStreamCursor = coll.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"}); assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}})); - let expected = { + expected = { documentKey: {_id: 0}, fullDocument: {_id: 0, x: 10}, ns: {db: "test", coll: coll.getName()}, operationType: "update", updateDescription: {removedFields: [], updatedFields: {x: 10}}, }; - checkNextChange(singleCollCursor, expected); - checkNextChange(wholeDbCursor, expected); - checkNextChange(wholeClusterCursor, expected); + checkNextChange(changeStreamCursor, expected); jsTestLog("Testing watch() with batchSize"); // Only test mongod because mongos uses batch size 0 for aggregate commands internally to // establish cursors quickly. GetMore on mongos doesn't respect batch size due to SERVER-31992. - const isMongos = db.runCommand({isdbgrid: 1}).isdbgrid; + const isMongos = FixtureHelpers.isMongos(db); if (!isMongos) { // Increase a field by 5 times and verify the batch size is respected. for (let i = 0; i < 5; i++) { @@ -137,61 +113,34 @@ } // Only watch the "update" changes of the specific doc since the beginning. - singleCollCursor = coll.watch( - [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}], - {resumeAfter: resumeToken, batchSize: 2}); - wholeDbCursor = db.watch( - [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}], - {resumeAfter: resumeToken, batchSize: 2}); - wholeClusterCursor = db.getMongo().watch( + changeStreamCursor = coll.watch( [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}], {resumeAfter: resumeToken, batchSize: 2}); - for (let cursor of[singleCollCursor, wholeDbCursor, wholeClusterCursor]) { - print(`Running test on namespace '${cursor._ns}'`); - // Check the first batch. - assert.eq(cursor.objsLeftInBatch(), 2); - // Consume the first batch. - assert(cursor.hasNext()); - cursor.next(); - assert(cursor.hasNext()); - cursor.next(); - // Confirm that the batch is empty. - assert.eq(cursor.objsLeftInBatch(), 0); - - // Check the batch returned by getMore. - assert(cursor.hasNext()); - assert.eq(cursor.objsLeftInBatch(), 2); - cursor.next(); - assert(cursor.hasNext()); - cursor.next(); - assert.eq(cursor.objsLeftInBatch(), 0); - // There are more changes coming, just not in the batch. - assert(cursor.hasNext()); - } + // Check the first batch. + assert.eq(changeStreamCursor.objsLeftInBatch(), 2); + // Consume the first batch. + assert(changeStreamCursor.hasNext()); + changeStreamCursor.next(); + assert(changeStreamCursor.hasNext()); + changeStreamCursor.next(); + // Confirm that the batch is empty. + assert.eq(changeStreamCursor.objsLeftInBatch(), 0); + + // Check the batch returned by getMore. + assert(changeStreamCursor.hasNext()); + assert.eq(changeStreamCursor.objsLeftInBatch(), 2); + changeStreamCursor.next(); + assert(changeStreamCursor.hasNext()); + changeStreamCursor.next(); + assert.eq(changeStreamCursor.objsLeftInBatch(), 0); + // There are more changes coming, just not in the batch. + assert(changeStreamCursor.hasNext()); } jsTestLog("Testing watch() with maxAwaitTimeMS"); - singleCollCursor = coll.watch([], {maxAwaitTimeMS: 500}); - testCommandIsCalled(() => assert(!singleCollCursor.hasNext()), (cmdObj) => { - assert.eq("getMore", - Object.keys(cmdObj)[0], - "expected getMore command, but was: " + tojson(cmdObj)); - assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj)); - assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj)); - }); - - wholeDbCursor = db.watch([], {maxAwaitTimeMS: 500}); - testCommandIsCalled(() => assert(!wholeDbCursor.hasNext()), (cmdObj) => { - assert.eq("getMore", - Object.keys(cmdObj)[0], - "expected getMore command, but was: " + tojson(cmdObj)); - assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj)); - assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj)); - }); - - wholeClusterCursor = db.getMongo().watch([], {maxAwaitTimeMS: 500}); - testCommandIsCalled(() => assert(!wholeClusterCursor.hasNext()), (cmdObj) => { + changeStreamCursor = coll.watch([], {maxAwaitTimeMS: 500}); + testCommandIsCalled(() => assert(!changeStreamCursor.hasNext()), (cmdObj) => { assert.eq("getMore", Object.keys(cmdObj)[0], "expected getMore command, but was: " + tojson(cmdObj)); @@ -200,9 +149,7 @@ }); jsTestLog("Testing the cursor gets closed when the collection gets dropped"); - singleCollCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]); - wholeDbCursor = db.watch([{$project: {_id: 0, clusterTime: 0}}]); - wholeClusterCursor = db.getMongo().watch([{$project: {_id: 0, clusterTime: 0}}]); + changeStreamCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]); assert.writeOK(coll.insert({_id: 2, x: 1})); expected = { documentKey: {_id: 2}, @@ -210,32 +157,19 @@ ns: {db: "test", coll: coll.getName()}, operationType: "insert", }; - checkNextChange(singleCollCursor, expected); - assert(!singleCollCursor.hasNext()); - assert(!singleCollCursor.isClosed()); - assert(!singleCollCursor.isExhausted()); - - checkNextChange(wholeDbCursor, expected); - assert(!wholeDbCursor.hasNext()); - assert(!wholeDbCursor.isClosed()); - assert(!wholeDbCursor.isExhausted()); - - checkNextChange(wholeClusterCursor, expected); - assert(!wholeClusterCursor.hasNext()); - assert(!wholeClusterCursor.isClosed()); - assert(!wholeClusterCursor.isExhausted()); + checkNextChange(changeStreamCursor, expected); + assert(!changeStreamCursor.hasNext()); + assert(!changeStreamCursor.isClosed()); + assert(!changeStreamCursor.isExhausted()); // Dropping the collection should invalidate any open change streams. assertDropCollection(db, coll.getName()); - for (let cursor of[singleCollCursor, wholeDbCursor, wholeClusterCursor]) { - print(`Running test on namespace '${cursor._ns}'`); - assert.soon(() => cursor.hasNext()); - assert(cursor.isClosed()); - assert(!cursor.isExhausted()); - expected = {operationType: "invalidate"}; - checkNextChange(cursor, expected); - assert(!cursor.hasNext()); - assert(cursor.isClosed()); - assert(cursor.isExhausted()); - } + assert.soon(() => changeStreamCursor.hasNext()); + assert(changeStreamCursor.isClosed()); + assert(!changeStreamCursor.isExhausted()); + expected = {operationType: "invalidate"}; + checkNextChange(changeStreamCursor, expected); + assert(!changeStreamCursor.hasNext()); + assert(changeStreamCursor.isClosed()); + assert(changeStreamCursor.isExhausted()); }()); diff --git a/jstests/change_streams/change_stream_whole_db_invalidations.js b/jstests/change_streams/change_stream_whole_db_invalidations.js index 455bff853fe..463359ec839 100644 --- a/jstests/change_streams/change_stream_whole_db_invalidations.js +++ b/jstests/change_streams/change_stream_whole_db_invalidations.js @@ -66,7 +66,11 @@ // Create collection on the database being watched. coll = assertDropAndRecreateCollection(testDB, "change_stream_whole_db_invalidations"); - aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + // Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be + // upconverted to a cluster-wide stream, which *would* be invalidated by dropping the collection + // in the other database. + aggCursor = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true}); // Drop the collection on the other database, this should *not* invalidate the change stream. assertDropCollection(otherDB, otherDBColl.getName()); diff --git a/jstests/change_streams/include_cluster_time.js b/jstests/change_streams/include_cluster_time.js index 387c368ecc1..6d0c33785e8 100644 --- a/jstests/change_streams/include_cluster_time.js +++ b/jstests/change_streams/include_cluster_time.js @@ -2,16 +2,12 @@ (function() { "use strict"; - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. // Drop and recreate the collections to be used in this set of tests. const coll = assertDropAndRecreateCollection(db, "include_cluster_time"); - const collectionStream = coll.watch(); - const dbStream = - db.watch([{$match: {$or: [{operationType: "invalidate"}, {"ns.coll": coll.getName()}]}}]); - const clusterStream = db.getMongo().watch( - [{$match: {$or: [{operationType: "invalidate"}, {"ns.coll": coll.getName()}]}}]); + const changeStream = coll.watch(); const insertClusterTime = assert.commandWorked(coll.runCommand("insert", {documents: [{_id: 0}]})) @@ -30,34 +26,30 @@ const dropClusterTime = assert.commandWorked(db.runCommand({drop: coll.getName()})).$clusterTime.clusterTime; - for (let changeStream of[collectionStream, dbStream, clusterStream]) { - jsTestLog(`Testing stream on ns ${changeStream._ns}`); - - // Make sure each operation has a reasonable cluster time. Note that we should not assert - // that the cluster times are equal, because the cluster time returned from the command is - // generated by a second, independent read of the logical clock than the one used to - // generate the oplog entry. It's possible that the system did something to advance the time - // between the two reads of the clock. - assert.soon(() => changeStream.hasNext()); - let next = changeStream.next(); - assert.eq(next.operationType, "insert"); - assert.lte(next.clusterTime, insertClusterTime); - - assert.soon(() => changeStream.hasNext()); - next = changeStream.next(); - assert.eq(next.operationType, "update"); - assert.lte(next.clusterTime, updateClusterTime); - - assert.soon(() => changeStream.hasNext()); - next = changeStream.next(); - assert.eq(next.operationType, "delete"); - assert.lte(next.clusterTime, deleteClusterTime); - - assert.soon(() => changeStream.hasNext()); - next = changeStream.next(); - assert.eq(next.operationType, "invalidate"); - assert.lte(next.clusterTime, dropClusterTime); - - changeStream.close(); - } + // Make sure each operation has a reasonable cluster time. Note that we should not assert + // that the cluster times are equal, because the cluster time returned from the command is + // generated by a second, independent read of the logical clock than the one used to + // generate the oplog entry. It's possible that the system did something to advance the time + // between the two reads of the clock. + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.lte(next.clusterTime, insertClusterTime); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.lte(next.clusterTime, updateClusterTime); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "delete"); + assert.lte(next.clusterTime, deleteClusterTime); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "invalidate"); + assert.lte(next.clusterTime, dropClusterTime); + + changeStream.close(); }()); diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index 9b41cbbf7f0..61e51f989d2 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -11,229 +11,208 @@ load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'. const coll = assertDropAndRecreateCollection(db, "change_post_image"); - - function testUpdateLookup(coll, collToWatch, changeStreamDB = db, changeStreamSpec = {}) { - coll.drop(); - - const cst = new ChangeStreamTest(changeStreamDB); - - jsTestLog("Testing change streams without 'fullDocument' specified"); - // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for - // an insert. - let cursor = cst.startWatchingChanges( - {pipeline: [{$changeStream: changeStreamSpec}], collection: collToWatch}); - assert.writeOK(coll.insert({_id: "fullDocument not specified"})); - let latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "insert"); - assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified"}); - - // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for a - // replacement-style update. - assert.writeOK(coll.update({_id: "fullDocument not specified"}, - {_id: "fullDocument not specified", replaced: true})); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "replace"); - assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified", replaced: true}); - - // Test that not specifying 'fullDocument' does not include a 'fullDocument' in the result - // for a non-replacement update. - assert.writeOK(coll.update({_id: "fullDocument not specified"}, {$set: {updated: true}})); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "update"); - assert(!latestChange.hasOwnProperty("fullDocument")); - - jsTestLog("Testing change streams with 'fullDocument' specified as 'default'"); - - // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the - // result for an insert. - const defaultFullDocSpec = Object.assign({fullDocument: "default"}, changeStreamSpec); - cursor = cst.startWatchingChanges( - {collection: collToWatch, pipeline: [{$changeStream: defaultFullDocSpec}]}); - assert.writeOK(coll.insert({_id: "fullDocument is default"})); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "insert"); - assert.eq(latestChange.fullDocument, {_id: "fullDocument is default"}); - - // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the - // result for a replacement-style update. - assert.writeOK(coll.update({_id: "fullDocument is default"}, - {_id: "fullDocument is default", replaced: true})); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "replace"); - assert.eq(latestChange.fullDocument, {_id: "fullDocument is default", replaced: true}); - - // Test that specifying 'fullDocument' as 'default' does not include a 'fullDocument' in the - // result for a non-replacement update. - assert.writeOK(coll.update({_id: "fullDocument is default"}, {$set: {updated: true}})); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "update"); - assert(!latestChange.hasOwnProperty("fullDocument")); - - jsTestLog("Testing change streams with 'fullDocument' specified as 'updateLookup'"); - - // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in - // the result for an insert. - const updateLookupSpec = Object.assign({fullDocument: "updateLookup"}, changeStreamSpec); - cursor = cst.startWatchingChanges( - {collection: collToWatch, pipeline: [{$changeStream: updateLookupSpec}]}); - assert.writeOK(coll.insert({_id: "fullDocument is lookup"})); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "insert"); - assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup"}); - - // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in - // the result for a replacement-style update. - assert.writeOK(coll.update({_id: "fullDocument is lookup"}, - {_id: "fullDocument is lookup", replaced: true})); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "replace"); - assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup", replaced: true}); - - // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in - // the result for a non-replacement update. - assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updated: true}})); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "update"); - assert.eq(latestChange.fullDocument, - {_id: "fullDocument is lookup", replaced: true, updated: true}); - - // Test that looking up the post image of an update after deleting the document will result - // in a 'fullDocument' with a value of null. - cursor = cst.startWatchingChanges({ - collection: collToWatch, - pipeline: [{$changeStream: updateLookupSpec}, {$match: {operationType: "update"}}] - }); - assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}})); - assert.writeOK(coll.remove({_id: "fullDocument is lookup"})); - // If this test is running with secondary read preference, it's necessary for the remove - // to propagate to all secondary nodes and be available for majority reads before we can - // assume looking up the document will fail. - FixtureHelpers.awaitLastOpCommitted(db); - - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "update"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, null); - const deleteDocResumePoint = latestChange._id; - - // Test that looking up the post image of an update after the collection has been dropped - // will result in 'fullDocument' with a value of null. This must be done using getMore - // because new cursors cannot be established after a collection drop. - assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); - assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}})); - - // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet. - const resumeAfterDeleteAndUpdateLookupSpec = Object.assign( - {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}, changeStreamSpec); - cursor = cst.startWatchingChanges({ - collection: collToWatch, - pipeline: [ - {$changeStream: resumeAfterDeleteAndUpdateLookupSpec}, - {$match: {operationType: {$ne: "delete"}}} - ], - aggregateOptions: {cursor: {batchSize: 0}} - }); - - // Save another stream to test post-image lookup after the collection is recreated. - let cursorBeforeDrop = cst.startWatchingChanges({ - collection: collToWatch, - pipeline: [ - {$changeStream: resumeAfterDeleteAndUpdateLookupSpec}, - {$match: {operationType: {$ne: "delete"}}} - ], - aggregateOptions: {cursor: {batchSize: 0}} - }); - - // Retrieve the 'insert' operation from the latter stream. This is necessary on a sharded - // collection so that the documentKey is retrieved before the collection is recreated; - // otherwise, per SERVER-31691, a uassert will occur. - // TODO SERVER-31847: all remaining operations on the old UUID should be visible even if we - // have not retrieved the first oplog entry before the collection is recreated. - latestChange = cst.getOneChange(cursorBeforeDrop); - assert.eq(latestChange.operationType, "insert"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); - - // Drop the collection and wait until two-phase drop finishes. - assertDropCollection(db, coll.getName()); - assert.soon(function() { - return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, - coll.getName()); - }); - // If this test is running with secondary read preference, it's necessary for the drop - // to propagate to all secondary nodes and be available for majority reads before we can - // assume looking up the document will fail. - FixtureHelpers.awaitLastOpCommitted(db); - - // Check the next $changeStream entry; this is the test document inserted above. - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "insert"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); - - // The next entry is the 'update' operation. Because the collection has been dropped, our - // attempt to look up the post-image results in a null document. - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "update"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, null); - - // Test that looking up the post image of an update after the collection has been dropped - // and created again will result in 'fullDocument' with a value of null. This must be done - // using getMore because new cursors cannot be established after a collection drop. - - // Insert a document with the same _id, verify the change stream won't return it due to - // different UUID. - assertCreateCollection(db, coll.getName()); - assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); - - // Confirm that the next entry's post-image is null since new collection has a different - // UUID. - latestChange = cst.getOneChange(cursorBeforeDrop); - assert.eq(latestChange.operationType, "update"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, null); - - // Test that invalidate entries don't have 'fullDocument' even if 'updateLookup' is - // specified. - const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate"); - cursor = cst.startWatchingChanges({ - collection: isNumber(collToWatch) ? collToWatch : collInvalidate.getName(), - pipeline: [{$changeStream: updateLookupSpec}], - aggregateOptions: {cursor: {batchSize: 0}} - }); - assert.writeOK(collInvalidate.insert({_id: "testing invalidate"})); - assertDropCollection(db, collInvalidate.getName()); - // Wait until two-phase drop finishes. - assert.soon(function() { - return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( - db, collInvalidate.getName()); - }); - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "insert"); - latestChange = cst.getOneChange(cursor, true); - assert.eq(latestChange.operationType, "invalidate"); - assert(!latestChange.hasOwnProperty("fullDocument")); - - jsTestLog("Testing full document lookup with a real getMore"); - assert.writeOK(coll.insert({_id: "getMoreEnabled"})); - - cursor = cst.startWatchingChanges({ - collection: collToWatch, - pipeline: [{$changeStream: updateLookupSpec}], - }); - assert.writeOK(coll.update({_id: "getMoreEnabled"}, {$set: {updated: true}})); - - let doc = cst.getOneChange(cursor); - assert.docEq(doc["fullDocument"], {_id: "getMoreEnabled", updated: true}); - } - - // Test update lookup with a change stream on a single collection. - testUpdateLookup(coll, coll.getName()); - - // Test update lookup with a change stream on a whole database. - testUpdateLookup(coll, 1); - - // Test update lookup with a change stream on the whole cluster. - testUpdateLookup(coll, 1, db.getSiblingDB("admin"), {allChangesForCluster: true}); + const cst = new ChangeStreamTest(db); + + jsTestLog("Testing change streams without 'fullDocument' specified"); + // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for + // an insert. + let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: coll}); + assert.writeOK(coll.insert({_id: "fullDocument not specified"})); + let latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "insert"); + assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified"}); + + // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for a + // replacement-style update. + assert.writeOK(coll.update({_id: "fullDocument not specified"}, + {_id: "fullDocument not specified", replaced: true})); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "replace"); + assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified", replaced: true}); + + // Test that not specifying 'fullDocument' does not include a 'fullDocument' in the result + // for a non-replacement update. + assert.writeOK(coll.update({_id: "fullDocument not specified"}, {$set: {updated: true}})); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "update"); + assert(!latestChange.hasOwnProperty("fullDocument")); + + jsTestLog("Testing change streams with 'fullDocument' specified as 'default'"); + + // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the + // result for an insert. + cursor = cst.startWatchingChanges( + {collection: coll, pipeline: [{$changeStream: {fullDocument: "default"}}]}); + assert.writeOK(coll.insert({_id: "fullDocument is default"})); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "insert"); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is default"}); + + // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the + // result for a replacement-style update. + assert.writeOK(coll.update({_id: "fullDocument is default"}, + {_id: "fullDocument is default", replaced: true})); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "replace"); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is default", replaced: true}); + + // Test that specifying 'fullDocument' as 'default' does not include a 'fullDocument' in the + // result for a non-replacement update. + assert.writeOK(coll.update({_id: "fullDocument is default"}, {$set: {updated: true}})); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "update"); + assert(!latestChange.hasOwnProperty("fullDocument")); + + jsTestLog("Testing change streams with 'fullDocument' specified as 'updateLookup'"); + + // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in + // the result for an insert. + cursor = cst.startWatchingChanges( + {collection: coll, pipeline: [{$changeStream: {fullDocument: "updateLookup"}}]}); + assert.writeOK(coll.insert({_id: "fullDocument is lookup"})); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "insert"); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup"}); + + // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in + // the result for a replacement-style update. + assert.writeOK(coll.update({_id: "fullDocument is lookup"}, + {_id: "fullDocument is lookup", replaced: true})); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "replace"); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup", replaced: true}); + + // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in + // the result for a non-replacement update. + assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updated: true}})); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "update"); + assert.eq(latestChange.fullDocument, + {_id: "fullDocument is lookup", replaced: true, updated: true}); + + // Test that looking up the post image of an update after deleting the document will result + // in a 'fullDocument' with a value of null. + cursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [ + {$changeStream: {fullDocument: "updateLookup"}}, + {$match: {operationType: "update"}} + ] + }); + assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}})); + assert.writeOK(coll.remove({_id: "fullDocument is lookup"})); + // If this test is running with secondary read preference, it's necessary for the remove + // to propagate to all secondary nodes and be available for majority reads before we can + // assume looking up the document will fail. + FixtureHelpers.awaitLastOpCommitted(db); + + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "update"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, null); + const deleteDocResumePoint = latestChange._id; + + // Test that looking up the post image of an update after the collection has been dropped + // will result in 'fullDocument' with a value of null. This must be done using getMore + // because new cursors cannot be established after a collection drop. + assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); + assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}})); + + // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet. + cursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [ + {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$match: {operationType: {$ne: "delete"}}} + ], + aggregateOptions: {cursor: {batchSize: 0}} + }); + + // Save another stream to test post-image lookup after the collection is recreated. + const cursorBeforeDrop = cst.startWatchingChanges({ + collection: coll, + pipeline: [ + {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$match: {operationType: {$ne: "delete"}}} + ], + aggregateOptions: {cursor: {batchSize: 0}} + }); + + // Retrieve the 'insert' operation from the latter stream. This is necessary on a sharded + // collection so that the documentKey is retrieved before the collection is recreated; + // otherwise, per SERVER-31691, a uassert will occur. + latestChange = cst.getOneChange(cursorBeforeDrop); + assert.eq(latestChange.operationType, "insert"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); + + // Drop the collection and wait until two-phase drop finishes. + assertDropCollection(db, coll.getName()); + assert.soon(function() { + return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName()); + }); + // If this test is running with secondary read preference, it's necessary for the drop + // to propagate to all secondary nodes and be available for majority reads before we can + // assume looking up the document will fail. + FixtureHelpers.awaitLastOpCommitted(db); + + // Check the next $changeStream entry; this is the test document inserted above. + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "insert"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); + + // The next entry is the 'update' operation. Because the collection has been dropped, our + // attempt to look up the post-image results in a null document. + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "update"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, null); + + // Test that looking up the post image of an update after the collection has been dropped + // and created again will result in 'fullDocument' with a value of null. This must be done + // using getMore because new cursors cannot be established after a collection drop. + + // Insert a document with the same _id, verify the change stream won't return it due to + // different UUID. + assertCreateCollection(db, coll.getName()); + assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); + + // Confirm that the next entry's post-image is null since new collection has a different + // UUID. + latestChange = cst.getOneChange(cursorBeforeDrop); + assert.eq(latestChange.operationType, "update"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, null); + + jsTestLog("Testing full document lookup with a real getMore"); + assert.writeOK(coll.insert({_id: "getMoreEnabled"})); + + cursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], + }); + assert.writeOK(coll.update({_id: "getMoreEnabled"}, {$set: {updated: true}})); + + const doc = cst.getOneChange(cursor); + assert.docEq(doc["fullDocument"], {_id: "getMoreEnabled", updated: true}); + + // Test that invalidate entries don't have 'fullDocument' even if 'updateLookup' is + // specified. + cursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], + aggregateOptions: {cursor: {batchSize: 0}} + }); + assert.writeOK(coll.insert({_id: "testing invalidate"})); + assertDropCollection(db, coll.getName()); + // Wait until two-phase drop finishes. + assert.soon(function() { + return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName()); + }); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "insert"); + latestChange = cst.getOneChange(cursor, true); + assert.eq(latestChange.operationType, "invalidate"); + assert(!latestChange.hasOwnProperty("fullDocument")); }()); diff --git a/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js new file mode 100644 index 00000000000..3ee93a6fb24 --- /dev/null +++ b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js @@ -0,0 +1,54 @@ +/** + * Loading this file overrides DB.prototype._runCommandImpl with a function that converts any + * attempt to run $changeStream on a single collection or single database into a cluster-wide + * $changeStream filtered by that database or namespace. Single-collection/db rules regarding + * internal collections and views are respected. Non-$changeStream commands and commands which + * explicitly request to be exempted from modification by setting the 'noPassthrough' flag, are + * passed through as-is. + */ + +// For the whole_cluster passthrough, we simply override the necessary methods in the whole_db +// passthrough's ChangeStreamPassthroughHelpers. +load("jstests/libs/override_methods/implicit_whole_db_changestreams.js"); + +// Any valid single-collection or single-database request is upconvertable to cluster-wide. +ChangeStreamPassthroughHelpers.isUpconvertableChangeStreamRequest = + ChangeStreamPassthroughHelpers.isValidChangeStreamRequest; + +ChangeStreamPassthroughHelpers.nsMatchFilter = function(db, collName) { + // The $match filter we inject into the pipeline will depend on whether this is a + // single-collection or whole-db stream. + const isSingleCollectionStream = (typeof collName === 'string'); + + return { + $match: { + $or: [ + { + "ns.db": db.getName(), + "ns.coll": (isSingleCollectionStream ? collName : {$exists: true}) + }, + {operationType: "invalidate"} + ] + } + }; +}; + +ChangeStreamPassthroughHelpers.execDBName = function(db) { + return "admin"; +}; + +ChangeStreamPassthroughHelpers.changeStreamSpec = function() { + return {allChangesForCluster: true}; +}; + +// Redirect the DB's 'watch' function to use the cluster-wide version. The Collection.watch helper +// has already been overridden to use DB.watch when we loaded 'implicit_whole_db_changestreams.js', +// so this ensures that both the Collection and DB helpers will actually call the Mongo function. +// Although calls to the shell helpers will ultimately resolve to the overridden runCommand anyway, +// we need to override the helper to ensure that the Mongo.watch function itself is exercised by the +// passthrough wherever Collection.watch or DB.watch is called. +DB.prototype.watch = function(pipeline, options) { + pipeline = Object.assign([], pipeline); + pipeline.unshift(ChangeStreamPassthroughHelpers.nsMatchFilter(this, 1)); + return this.getMongo().watch(pipeline, options); +}; diff --git a/jstests/libs/override_methods/implicit_whole_db_changestreams.js b/jstests/libs/override_methods/implicit_whole_db_changestreams.js index deaf8f7c216..ae4ed192033 100644 --- a/jstests/libs/override_methods/implicit_whole_db_changestreams.js +++ b/jstests/libs/override_methods/implicit_whole_db_changestreams.js @@ -7,75 +7,121 @@ * 'noPassthrough' flag, are passed through as-is. */ +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. + // Helper function which tests can call to explicitly request that the command not be modified by // the passthrough code. When defined, ChangeStreamTest will adopt this as its default runCommand // implementation to allow individual tests to exempt themselves from modification. const changeStreamPassthroughAwareRunCommand = (db, cmdObj, noPassthrough) => db.runCommand(cmdObj, undefined, undefined, noPassthrough); +// Defines a set of functions to validate incoming $changeStream requests and transform +// single-collection streams into equivalent whole-db streams. Separating these functions allows the +// runCommand override to generically upconvert $changeStream requests, and the +// ChangeStreamPassthroughHelpers may themselves be overridden by other passthroughs in order to +// alter the behaviour of runCommand. +const ChangeStreamPassthroughHelpers = { + isValidChangeStreamRequest: function(db, cmdObj) { + // Determine whether this command is a valid $changeStream aggregation on a single + // collection or database. + if (!(cmdObj && cmdObj.aggregate && Array.isArray(cmdObj.pipeline) && + cmdObj.pipeline.length > 0 && cmdObj.pipeline[0].$changeStream)) { + return false; + } + // Single-collection and whole-db streams cannot be opened on internal databases. + if (db.getName() == "admin" || db.getName() == "config" || db.getName() == "local") { + return false; + } + // If the client's $changeStream spec already contains everything we intend to modify, pass + // the command through as-is. + const testSpec = this.changeStreamSpec(), csParams = Object.keys(testSpec); + if (csParams.length > 0 && + csParams.every((csParam) => + testSpec[csParam] === cmdObj.pipeline[0].$changeStream[csParam])) { + return false; + } + // The remaining checks are only relevant to single-collection streams. If the 'aggregate' + // field of the command object is not a string, validate that it is equal to 1. + if (typeof cmdObj.aggregate !== 'string') { + return cmdObj.aggregate == 1; + } + // Single-collection streams cannot be opened on internal collections in any database. + if (cmdObj.aggregate.startsWith("system.")) { + return false; + } + // Single-collection streams cannot be opened on views. + if (FixtureHelpers.getViewDefinition(db, cmdObj.aggregate)) { + return false; + } + // This is a well-formed request. + return true; + }, + // All valid single-collection change stream requests are upconvertable in this passthrough. + isUpconvertableChangeStreamRequest: function(db, cmdObj) { + return this.isValidChangeStreamRequest(db, cmdObj) && + (typeof cmdObj.aggregate === 'string'); + }, + nsMatchFilter: function(db, collName) { + return { + $match: { + $or: + [{"ns.db": db.getName(), "ns.coll": collName}, {operationType: "invalidate"}] + } + }; + }, + execDBName: function(db) { + return db.getName(); + }, + changeStreamSpec: function() { + return {}; + }, + upconvertChangeStreamRequest: function(db, cmdObj) { + // Take a copy of the command object such that the original is not altered. + cmdObj = Object.assign({}, cmdObj); + + // To convert this command into a whole-db stream, we insert a $match stage just after + // the $changeStream stage that filters by database and collection name, and we update + // the command's execution 'namespace' to 1. + let pipeline = [{ + $changeStream: + Object.assign({}, cmdObj.pipeline[0].$changeStream, this.changeStreamSpec()) + }]; + pipeline.push(this.nsMatchFilter(db, cmdObj.aggregate)); + pipeline = pipeline.concat(cmdObj.pipeline.slice(1)); + cmdObj.pipeline = pipeline; + cmdObj.aggregate = 1; + + return [this.execDBName(db), cmdObj]; + }, + upconvertGetMoreRequest: function(db, cmdObj) { + return [this.execDBName(db), Object.assign({}, cmdObj, {collection: "$cmd.aggregate"})]; + } +}; + (function() { 'use strict'; - load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - const originalRunCommandImpl = DB.prototype._runCommandImpl; const originalRunCommand = DB.prototype.runCommand; const upconvertedCursors = new Set(); - function isValidChangeStreamRequest(db, cmdObj) { - // Determine whether this command is a $changeStream aggregation on a single collection. - if (cmdObj && typeof cmdObj.aggregate === 'string' && Array.isArray(cmdObj.pipeline) && - cmdObj.pipeline.length > 0 && cmdObj.pipeline[0].$changeStream) { - // Single-collection streams cannot be opened on internal databases. - if (db.getName() == "admin" || db.getName() == "config" || db.getName() == "local") { - return false; - } - // Single-collection streams cannot be opened on internal collections in any database. - if (cmdObj.aggregate.startsWith("system.")) { - return false; - } - // Single-collection streams cannot be opened on views. - if (FixtureHelpers.getViewDefinition(db, cmdObj.aggregate)) { - return false; - } - // This is a well-formed single-collection request. - return true; - } - - return false; - } + const db = null; const passthroughRunCommandImpl = function(dbName, cmdObj, options) { - // Check whether this command is a valid $changeStream request. - let upconvertCursor = isValidChangeStreamRequest(this, cmdObj); + // Check whether this command is an upconvertable $changeStream request. + const upconvertCursor = + ChangeStreamPassthroughHelpers.isUpconvertableChangeStreamRequest(this, cmdObj); if (upconvertCursor) { - // Having validated the legality of the stream, take a copy of the command object such - // that the original object is not altered. - cmdObj = Object.assign({}, cmdObj); - - // To convert this command into a whole-db stream, we insert a $match stage just after - // the $changeStream stage that filters by database and collection name, and we update - // the command's execution 'namespace' to 1. - let pipeline = [{$changeStream: Object.assign({}, cmdObj.pipeline[0].$changeStream)}]; - pipeline.push({ - $match: { - $or: [ - {"ns.db": dbName, "ns.coll": cmdObj.aggregate}, - {operationType: "invalidate"} - ] - } - }); - pipeline = pipeline.concat(cmdObj.pipeline.slice(1)); - cmdObj.pipeline = pipeline; - cmdObj.aggregate = 1; + [dbName, cmdObj] = + ChangeStreamPassthroughHelpers.upconvertChangeStreamRequest(this, cmdObj); } // If the command is a getMore, it may be a $changeStream that we upconverted to run // whole-db. Ensure that we update the 'collection' field to be the collectionless // namespace. if (cmdObj && cmdObj.getMore && upconvertedCursors.has(cmdObj.getMore.toString())) { - cmdObj = Object.assign({}, cmdObj, {collection: "$cmd.aggregate"}); + [dbName, cmdObj] = ChangeStreamPassthroughHelpers.upconvertGetMoreRequest(this, cmdObj); } // Pass the modified command to the original runCommand implementation. @@ -89,6 +135,18 @@ const changeStreamPassthroughAwareRunCommand = (db, cmdObj, noPassthrough) => return res; }; + // Redirect the Collection's 'watch' function to use the whole-DB version. Although calls to the + // shell helpers will ultimately resolve to the overridden runCommand anyway, we need to + // override the helpers to ensure that the DB.watch function itself is exercised by the + // passthrough wherever Collection.watch is called. + DBCollection.prototype.watch = function(pipeline, options) { + pipeline = Object.assign([], pipeline); + pipeline.unshift( + ChangeStreamPassthroughHelpers.nsMatchFilter(this.getDB(), this.getName())); + return this.getDB().watch(pipeline, options); + }; + + // Override DB.runCommand to use the custom or original _runCommandImpl. DB.prototype.runCommand = function(cmdObj, extra, queryOptions, noPassthrough) { this._runCommandImpl = (noPassthrough ? originalRunCommandImpl : passthroughRunCommandImpl); return originalRunCommand.apply(this, [cmdObj, extra, queryOptions]); |