// Tests that each change in the stream will include the cluster time at which it happened. // // This test expects each change stream result to have an operationTime based on the clusterTime in // the oplog entry. When operations get bundled into a transaction, their operationTime is instead // based on the commit oplog entry, which would cause this test to fail. // @tags: [change_stream_does_not_expect_txns] (function() { "use strict"; load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp. load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. // Drop and recreate the collections to be used in this set of tests. const coll = assertDropAndRecreateCollection(db, "include_cluster_time"); const changeStream = coll.watch(); const insertClusterTime = assert.commandWorked(coll.runCommand("insert", {documents: [{_id: 0}]})).operationTime; const updateClusterTime = assert .commandWorked(coll.runCommand( "update", {updates: [{q: {_id: 0}, u: {$set: {updated: true}}}]})) .operationTime; const deleteClusterTime = assert.commandWorked(coll.runCommand("delete", {deletes: [{q: {_id: 0}, limit: 1}]})) .operationTime; const dropClusterTime = assert.commandWorked(db.runCommand({drop: coll.getName()})).operationTime; // Make sure each operation has a reasonable cluster time. Note that we should not assert // that the cluster times are equal, because the cluster time returned from the command is // generated by a second, independent read of the logical clock than the one used to // generate the oplog entry. It's possible that the system did something to advance the time // between the two reads of the clock. assert.soon(() => changeStream.hasNext()); let next = changeStream.next(); assert.eq(next.operationType, "insert"); assert.lte(next.clusterTime, insertClusterTime); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "update"); assert.lte(next.clusterTime, updateClusterTime); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "delete"); assert.lte(next.clusterTime, deleteClusterTime); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "drop"); assert.lte(next.clusterTime, dropClusterTime); assertInvalidateOp({cursor: changeStream, opType: "drop"}); changeStream.close(); }());